ML predictions Phase 1 & 2: Budget Forecast tab and Cash Flow upgrades

Phase 1 — Budget Forecast: adds a dedicated first-class tab showing all
monthly budgets with velocity (£/day), forecast month-end total, dual
progress bars, and colour-coded overspend probability badges. Summary
bar shows budgets tracked / at-risk count / total forecast overspend.
Removes the old BudgetAlerts widget embedded in the Spending tab.

Phase 2 — Cash Flow: incorporates known recurring transactions into the
30-day projection (outflows hit on their predicted dates rather than
being smeared as averages), adds sqrt-of-time confidence bands to the
chart, and shows an upcoming recurring payments list with at-risk
highlighting for payments falling on or after the first negative-balance day.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
megaproxy 2026-04-28 10:21:57 +00:00
parent 1a2c8efd01
commit 3b4787d8b9
3 changed files with 281 additions and 58 deletions

View file

@ -178,7 +178,9 @@ async def cashflow_forecast(
):
await _check_prediction_rate(redis, str(user.id))
from datetime import timedelta
import math
import numpy as np
from app.core.security import decrypt_field
# Historical daily cash flows (last 90 days)
hist_df = await get_daily_cash_flow(db, user.id, days=90)
@ -203,28 +205,95 @@ async def cashflow_forecast(
avg_inflow = float(hist_df["inflow"].mean())
avg_outflow = float(hist_df["outflow"].mean())
std_net = float((hist_df["inflow"] - hist_df["outflow"]).std())
if math.isnan(std_net):
std_net = 0.0
else:
avg_inflow = 0.0
avg_outflow = 0.0
std_net = 0.0
# Project 30 days forward
# Fetch recurring transactions (expenses)
rec_result = await db.execute(text("""
SELECT description_enc, amount::float, recurring_rule, date
FROM transactions
WHERE user_id = CAST(:uid AS uuid)
AND is_recurring = TRUE
AND type = 'expense'
AND deleted_at IS NULL
AND status != 'void'
ORDER BY date DESC
"""), {"uid": str(user.id)})
rec_rows = rec_result.fetchall()
_FREQ_DAYS = {"daily": 1, "weekly": 7, "fortnightly": 14, "monthly": 30, "quarterly": 91, "yearly": 365}
# Build a map of date_str -> extra outflow from recurring payments
today = date.today()
horizon = today + timedelta(days=30)
recurring_by_date: dict[str, float] = {}
upcoming_payments: list[dict] = []
# Track which (name, freq) pairs we've already scheduled to avoid duplicates
seen_recurring: set[tuple[str, str]] = set()
for desc_enc, amount, rule, last_date in rec_rows:
rule = rule or {}
freq = rule.get("frequency", "monthly")
interval_days = _FREQ_DAYS.get(freq, 30)
try:
name = decrypt_field(desc_enc) or "Recurring payment"
except Exception:
name = "Recurring payment"
dedup_key = (name.lower()[:30], freq)
if dedup_key in seen_recurring:
continue
seen_recurring.add(dedup_key)
# Walk forward from last_date until past the horizon
next_d = last_date + timedelta(days=interval_days)
while next_d <= horizon:
if next_d > today:
ds = next_d.strftime("%Y-%m-%d")
recurring_by_date[ds] = recurring_by_date.get(ds, 0.0) + abs(amount)
upcoming_payments.append({
"name": name,
"date": ds,
"amount": round(abs(amount), 2),
})
next_d += timedelta(days=interval_days)
upcoming_payments.sort(key=lambda x: x["date"])
# Project 30 days forward: baseline (avg) + known recurring hits + confidence bands
daily = []
running_balance = current_balance
for i in range(1, 31):
d = today + timedelta(days=i)
net = avg_inflow - avg_outflow
running_balance += net
ds = d.strftime("%Y-%m-%d")
base_net = avg_inflow - avg_outflow
known_outflow = recurring_by_date.get(ds, 0.0)
running_balance += base_net - known_outflow
# Confidence band widens with sqrt of days elapsed
band = std_net * math.sqrt(i) if std_net > 0 else 0.0
daily.append({
"date": d.strftime("%Y-%m-%d"),
"date": ds,
"balance": round(running_balance, 2),
"upper": round(running_balance + band, 2),
"lower": round(running_balance - band, 2),
"avg_inflow": round(avg_inflow, 2),
"avg_outflow": round(avg_outflow, 2),
"avg_outflow": round(avg_outflow + known_outflow, 2),
"negative_risk": running_balance < 0,
})
negative_days = [d["date"] for d in daily if d["negative_risk"]]
negative_day_set = set(negative_days)
# Flag upcoming payments that fall on or after the first negative-risk day
first_negative = min(negative_day_set) if negative_day_set else None
for p in upcoming_payments:
p["at_risk"] = first_negative is not None and p["date"] >= first_negative
return {
"current_balance": round(current_balance, 2),
@ -232,5 +301,6 @@ async def cashflow_forecast(
"avg_daily_outflow": round(avg_outflow, 2),
"forecast": daily,
"negative_risk_days": negative_days,
"upcoming_payments": upcoming_payments,
"history_days": len(hist_df),
}