MyMidas/backend/app/api/v1/predictions.py
megaproxy 61a7884ee5 Initial commit: MyMidas personal finance tracker
Full-stack self-hosted finance app with FastAPI backend and React frontend.

Features:
- Accounts, transactions, budgets, investments with GBP base currency
- CSV import with auto-detection for 10 UK bank formats
- ML predictions: spending forecast, net worth projection, Monte Carlo
- 7 selectable themes (Obsidian, Arctic, Midnight, Vault, Terminal, Synthwave, Ledger)
- Receipt/document attachments on transactions (JPEG, PNG, WebP, PDF)
- AES-256-GCM field encryption, RS256 JWT, TOTP 2FA, RLS, audit log
- Encrypted nightly backups + key rotation script
- Mobile-responsive layout with bottom nav

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 11:56:10 +00:00

236 lines
7.9 KiB
Python

from __future__ import annotations
from datetime import date
import calendar
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from redis.asyncio import Redis
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.rate_limiter import is_rate_limited
from app.dependencies import get_current_user, get_db, get_redis
from app.ml.feature_engineering import (
get_monthly_category_spending,
get_monthly_net_worth,
get_current_month_spending,
get_portfolio_monthly_returns,
get_daily_cash_flow,
)
from app.ml.spending_forecast import forecast_spending
from app.ml.net_worth_projection import project_net_worth
from app.ml.monte_carlo import run_monte_carlo
router = APIRouter(prefix="/predictions", tags=["predictions"])
async def _check_prediction_rate(redis: Redis, user_id: str) -> None:
limited, _ = await is_rate_limited(redis, f"rate:pred:{user_id}", limit=20, window_seconds=60)
if limited:
raise HTTPException(status_code=429, detail="Too many prediction requests — try again shortly")
@router.get("/spending")
async def spending_forecast(
request: Request,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
user=Depends(get_current_user),
):
await _check_prediction_rate(redis, str(user.id))
df = await get_monthly_category_spending(db, user.id)
categories = forecast_spending(df)
return {"categories": categories}
@router.get("/net-worth")
async def net_worth_projection(
years: int = 5,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
user=Depends(get_current_user),
):
await _check_prediction_rate(redis, str(user.id))
years = max(1, min(10, years))
df = await get_monthly_net_worth(db, user.id)
result = project_net_worth(df, years=years)
return result
class MonteCarloRequest(BaseModel):
years: int = 5
n_simulations: int = 1000
annual_contribution: float = 0.0
@router.post("/monte-carlo")
async def monte_carlo(
body: MonteCarloRequest,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
user=Depends(get_current_user),
):
await _check_prediction_rate(redis, str(user.id))
years = max(1, min(10, body.years))
n_sims = max(100, min(5000, body.n_simulations))
# Get portfolio holdings
result = await db.execute(text("""
SELECT h.id, a.symbol, h.quantity::float, a.last_price::float,
(h.quantity * COALESCE(a.last_price, h.avg_cost_basis))::float AS current_value,
h.currency
FROM investment_holdings h
JOIN assets a ON a.id = h.asset_id
WHERE h.user_id = CAST(:uid AS uuid)
AND h.deleted_at IS NULL
AND h.quantity > 0
"""), {"uid": str(user.id)})
holdings = [
{"symbol": r[1], "quantity": r[2], "last_price": r[3], "current_value": r[4]}
for r in result.fetchall()
]
prices_df = await get_portfolio_monthly_returns(db, user.id)
mc = run_monte_carlo(
prices_df=prices_df,
holdings=holdings,
years=years,
n_sims=n_sims,
annual_contribution=body.annual_contribution,
)
return mc
@router.get("/budget-forecast")
async def budget_forecast(
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
user=Depends(get_current_user),
):
await _check_prediction_rate(redis, str(user.id))
today = date.today()
days_in_month = calendar.monthrange(today.year, today.month)[1]
day_of_month = today.day
days_remaining = days_in_month - day_of_month
# Get budgets
bgt_result = await db.execute(text("""
SELECT b.id::text, COALESCE(c.name, 'Uncategorised') AS cat_name,
b.category_id::text, b.amount::float
FROM budgets b
LEFT JOIN categories c ON c.id = b.category_id
WHERE b.user_id = CAST(:uid AS uuid)
AND b.period = 'monthly'
AND (b.end_date IS NULL OR b.end_date >= CURRENT_DATE)
"""), {"uid": str(user.id)})
budgets = {r[2]: {"budget_id": r[0], "category_name": r[1], "amount": r[3]} for r in bgt_result.fetchall()}
if not budgets:
return {"forecasts": [], "message": "No monthly budgets set"}
# Get current month spending per category
spent_df = await get_current_month_spending(db, user.id)
spent_map = {row["category_id"]: row["spent"] for _, row in spent_df.iterrows()}
forecasts = []
for cat_id, bgt in budgets.items():
spent = spent_map.get(cat_id, 0.0)
budget_amt = bgt["amount"]
# Daily velocity
velocity = spent / day_of_month if day_of_month > 0 else 0.0
forecast_total = spent + velocity * days_remaining
# Probability of overspend using a rough normal distribution
# Assume uncertainty grows with days remaining
import math
sigma = velocity * math.sqrt(days_remaining) * 0.3 if velocity > 0 else 1.0
if sigma > 0:
z = (budget_amt - forecast_total) / sigma
# CDF of normal
import scipy.stats
prob_overspend = float(1 - scipy.stats.norm.cdf(z))
else:
prob_overspend = 1.0 if forecast_total > budget_amt else 0.0
forecasts.append({
"category_id": cat_id,
"category_name": bgt["category_name"],
"budget_amount": round(budget_amt, 2),
"spent_so_far": round(spent, 2),
"forecast_month_total": round(max(spent, forecast_total), 2),
"daily_velocity": round(velocity, 2),
"probability_overspend": round(prob_overspend, 3),
"days_remaining": days_remaining,
})
forecasts.sort(key=lambda x: x["probability_overspend"], reverse=True)
return {"forecasts": forecasts}
@router.get("/cashflow")
async def cashflow_forecast(
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
user=Depends(get_current_user),
):
await _check_prediction_rate(redis, str(user.id))
from datetime import timedelta
import numpy as np
# Historical daily cash flows (last 90 days)
hist_df = await get_daily_cash_flow(db, user.id, days=90)
# Get current account balances
acct_result = await db.execute(text("""
SELECT SUM(
CASE WHEN type IN ('credit_card','loan','mortgage') THEN -ABS(current_balance)
ELSE current_balance END
)::float AS total_balance
FROM accounts
WHERE user_id = CAST(:uid AS uuid)
AND is_active = TRUE
AND include_in_net_worth = TRUE
AND deleted_at IS NULL
"""), {"uid": str(user.id)})
row = acct_result.fetchone()
current_balance = float(row[0] or 0.0)
# Compute average daily inflow / outflow from history
if not hist_df.empty:
avg_inflow = float(hist_df["inflow"].mean())
avg_outflow = float(hist_df["outflow"].mean())
std_net = float((hist_df["inflow"] - hist_df["outflow"]).std())
else:
avg_inflow = 0.0
avg_outflow = 0.0
std_net = 0.0
# Project 30 days forward
today = date.today()
daily = []
running_balance = current_balance
for i in range(1, 31):
d = today + timedelta(days=i)
net = avg_inflow - avg_outflow
running_balance += net
daily.append({
"date": d.strftime("%Y-%m-%d"),
"balance": round(running_balance, 2),
"avg_inflow": round(avg_inflow, 2),
"avg_outflow": round(avg_outflow, 2),
"negative_risk": running_balance < 0,
})
negative_days = [d["date"] for d in daily if d["negative_risk"]]
return {
"current_balance": round(current_balance, 2),
"avg_daily_inflow": round(avg_inflow, 2),
"avg_daily_outflow": round(avg_outflow, 2),
"forecast": daily,
"negative_risk_days": negative_days,
"history_days": len(hist_df),
}