from __future__ import annotations from datetime import date import calendar from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel from redis.asyncio import Redis from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limiter import is_rate_limited from app.dependencies import get_current_user, get_db, get_redis from app.ml.feature_engineering import ( get_monthly_category_spending, get_monthly_net_worth, get_current_month_spending, get_portfolio_monthly_returns, get_daily_cash_flow, ) from app.ml.spending_forecast import forecast_spending from app.ml.net_worth_projection import project_net_worth from app.ml.monte_carlo import run_monte_carlo router = APIRouter(prefix="/predictions", tags=["predictions"]) async def _check_prediction_rate(redis: Redis, user_id: str) -> None: limited, _ = await is_rate_limited(redis, f"rate:pred:{user_id}", limit=20, window_seconds=60) if limited: raise HTTPException(status_code=429, detail="Too many prediction requests — try again shortly") @router.get("/spending") async def spending_forecast( request: Request, db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), user=Depends(get_current_user), ): await _check_prediction_rate(redis, str(user.id)) df = await get_monthly_category_spending(db, user.id) categories = forecast_spending(df) return {"categories": categories} @router.get("/net-worth") async def net_worth_projection( years: int = 5, db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), user=Depends(get_current_user), ): await _check_prediction_rate(redis, str(user.id)) years = max(1, min(10, years)) df = await get_monthly_net_worth(db, user.id) result = project_net_worth(df, years=years) return result class MonteCarloRequest(BaseModel): years: int = 5 n_simulations: int = 1000 annual_contribution: float = 0.0 @router.post("/monte-carlo") async def monte_carlo( body: MonteCarloRequest, db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), user=Depends(get_current_user), ): await _check_prediction_rate(redis, str(user.id)) years = max(1, min(10, body.years)) n_sims = max(100, min(5000, body.n_simulations)) # Get portfolio holdings result = await db.execute(text(""" SELECT h.id, a.symbol, h.quantity::float, a.last_price::float, (h.quantity * COALESCE(a.last_price, h.avg_cost_basis))::float AS current_value, h.currency FROM investment_holdings h JOIN assets a ON a.id = h.asset_id WHERE h.user_id = CAST(:uid AS uuid) AND h.deleted_at IS NULL AND h.quantity > 0 """), {"uid": str(user.id)}) holdings = [ {"symbol": r[1], "quantity": r[2], "last_price": r[3], "current_value": r[4]} for r in result.fetchall() ] prices_df = await get_portfolio_monthly_returns(db, user.id) mc = run_monte_carlo( prices_df=prices_df, holdings=holdings, years=years, n_sims=n_sims, annual_contribution=body.annual_contribution, ) return mc @router.get("/budget-forecast") async def budget_forecast( db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), user=Depends(get_current_user), ): await _check_prediction_rate(redis, str(user.id)) today = date.today() days_in_month = calendar.monthrange(today.year, today.month)[1] day_of_month = today.day days_remaining = days_in_month - day_of_month # Get budgets bgt_result = await db.execute(text(""" SELECT b.id::text, COALESCE(c.name, 'Uncategorised') AS cat_name, b.category_id::text, b.amount::float FROM budgets b LEFT JOIN categories c ON c.id = b.category_id WHERE b.user_id = CAST(:uid AS uuid) AND b.period = 'monthly' AND (b.end_date IS NULL OR b.end_date >= CURRENT_DATE) """), {"uid": str(user.id)}) budgets = {r[2]: {"budget_id": r[0], "category_name": r[1], "amount": r[3]} for r in bgt_result.fetchall()} if not budgets: return {"forecasts": [], "message": "No monthly budgets set"} # Get current month spending per category spent_df = await get_current_month_spending(db, user.id) spent_map = {row["category_id"]: row["spent"] for _, row in spent_df.iterrows()} forecasts = [] for cat_id, bgt in budgets.items(): spent = spent_map.get(cat_id, 0.0) budget_amt = bgt["amount"] # Daily velocity velocity = spent / day_of_month if day_of_month > 0 else 0.0 forecast_total = spent + velocity * days_remaining # Probability of overspend using a rough normal distribution # Assume uncertainty grows with days remaining import math sigma = velocity * math.sqrt(days_remaining) * 0.3 if velocity > 0 else 1.0 if sigma > 0: z = (budget_amt - forecast_total) / sigma # CDF of normal import scipy.stats prob_overspend = float(1 - scipy.stats.norm.cdf(z)) else: prob_overspend = 1.0 if forecast_total > budget_amt else 0.0 forecasts.append({ "category_id": cat_id, "category_name": bgt["category_name"], "budget_amount": round(budget_amt, 2), "spent_so_far": round(spent, 2), "forecast_month_total": round(max(spent, forecast_total), 2), "daily_velocity": round(velocity, 2), "probability_overspend": round(prob_overspend, 3), "days_remaining": days_remaining, }) forecasts.sort(key=lambda x: x["probability_overspend"], reverse=True) return {"forecasts": forecasts} @router.get("/cashflow") async def cashflow_forecast( db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), user=Depends(get_current_user), ): await _check_prediction_rate(redis, str(user.id)) from datetime import timedelta import numpy as np # Historical daily cash flows (last 90 days) hist_df = await get_daily_cash_flow(db, user.id, days=90) # Get current account balances acct_result = await db.execute(text(""" SELECT SUM( CASE WHEN type IN ('credit_card','loan','mortgage') THEN -ABS(current_balance) ELSE current_balance END )::float AS total_balance FROM accounts WHERE user_id = CAST(:uid AS uuid) AND is_active = TRUE AND include_in_net_worth = TRUE AND deleted_at IS NULL """), {"uid": str(user.id)}) row = acct_result.fetchone() current_balance = float(row[0] or 0.0) # Compute average daily inflow / outflow from history if not hist_df.empty: avg_inflow = float(hist_df["inflow"].mean()) avg_outflow = float(hist_df["outflow"].mean()) std_net = float((hist_df["inflow"] - hist_df["outflow"]).std()) else: avg_inflow = 0.0 avg_outflow = 0.0 std_net = 0.0 # Project 30 days forward today = date.today() daily = [] running_balance = current_balance for i in range(1, 31): d = today + timedelta(days=i) net = avg_inflow - avg_outflow running_balance += net daily.append({ "date": d.strftime("%Y-%m-%d"), "balance": round(running_balance, 2), "avg_inflow": round(avg_inflow, 2), "avg_outflow": round(avg_outflow, 2), "negative_risk": running_balance < 0, }) negative_days = [d["date"] for d in daily if d["negative_risk"]] return { "current_balance": round(current_balance, 2), "avg_daily_inflow": round(avg_inflow, 2), "avg_daily_outflow": round(avg_outflow, 2), "forecast": daily, "negative_risk_days": negative_days, "history_days": len(hist_df), }