MyMidas/backend/app/ml/feature_engineering.py
megaproxy a7e58d6aa1 Fix cashflow endpoint: pass date object instead of integer to SQL
asyncpg infers :days as an integer, causing a 'date >= integer' type
error in PostgreSQL. Compute the cutoff date in Python and bind it as
a date parameter instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-28 10:45:57 +00:00

122 lines
4.7 KiB
Python

from __future__ import annotations
from datetime import date, timedelta
import pandas as pd
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def get_monthly_category_spending(db: AsyncSession, user_id: str) -> pd.DataFrame:
result = await db.execute(text("""
SELECT
COALESCE(t.category_id::text, 'uncategorised') AS category_id,
COALESCE(c.name, 'Uncategorised') AS category_name,
DATE_TRUNC('month', t.date)::date AS ds,
SUM(ABS(t.amount))::float AS y
FROM transactions t
LEFT JOIN categories c ON c.id = t.category_id
WHERE t.user_id = CAST(:uid AS uuid)
AND t.type = 'expense'
AND t.deleted_at IS NULL
AND t.status != 'void'
GROUP BY t.category_id, c.name, DATE_TRUNC('month', t.date)
ORDER BY ds ASC
"""), {"uid": str(user_id)})
rows = result.fetchall()
if not rows:
return pd.DataFrame(columns=["category_id", "category_name", "ds", "y"])
df = pd.DataFrame(rows, columns=["category_id", "category_name", "ds", "y"])
df["ds"] = pd.to_datetime(df["ds"])
df["y"] = df["y"].astype(float)
return df
async def get_monthly_net_worth(db: AsyncSession, user_id: str) -> pd.DataFrame:
result = await db.execute(text("""
SELECT date::text AS ds, net_worth::float AS y
FROM net_worth_snapshots
WHERE user_id = CAST(:uid AS uuid)
ORDER BY date ASC
"""), {"uid": str(user_id)})
rows = result.fetchall()
if not rows:
return pd.DataFrame(columns=["ds", "y"])
df = pd.DataFrame(rows, columns=["ds", "y"])
df["ds"] = pd.to_datetime(df["ds"])
df["y"] = df["y"].astype(float)
# Resample to monthly end, keeping last value
df = df.set_index("ds").resample("ME").last().dropna().reset_index()
df.columns = ["ds", "y"]
return df
async def get_current_month_spending(db: AsyncSession, user_id: str) -> pd.DataFrame:
result = await db.execute(text("""
SELECT
COALESCE(t.category_id::text, 'uncategorised') AS category_id,
COALESCE(c.name, 'Uncategorised') AS category_name,
SUM(ABS(t.amount))::float AS spent
FROM transactions t
LEFT JOIN categories c ON c.id = t.category_id
WHERE t.user_id = CAST(:uid AS uuid)
AND t.type = 'expense'
AND t.deleted_at IS NULL
AND t.status != 'void'
AND DATE_TRUNC('month', t.date) = DATE_TRUNC('month', CURRENT_DATE)
GROUP BY t.category_id, c.name
"""), {"uid": str(user_id)})
rows = result.fetchall()
if not rows:
return pd.DataFrame(columns=["category_id", "category_name", "spent"])
df = pd.DataFrame(rows, columns=["category_id", "category_name", "spent"])
df["spent"] = df["spent"].astype(float)
return df
async def get_portfolio_monthly_returns(db: AsyncSession, user_id: str) -> pd.DataFrame:
"""Monthly close prices for each asset in user's portfolio."""
result = await db.execute(text("""
SELECT
a.symbol,
DATE_TRUNC('month', ap.date)::date AS month,
(ARRAY_AGG(ap.close ORDER BY ap.date DESC))[1]::float AS close
FROM investment_holdings h
JOIN assets a ON a.id = h.asset_id
JOIN asset_prices ap ON ap.asset_id = h.asset_id
WHERE h.user_id = CAST(:uid AS uuid)
AND h.deleted_at IS NULL
GROUP BY a.symbol, DATE_TRUNC('month', ap.date)
ORDER BY a.symbol, month ASC
"""), {"uid": str(user_id)})
rows = result.fetchall()
if not rows:
return pd.DataFrame(columns=["symbol", "month", "close"])
df = pd.DataFrame(rows, columns=["symbol", "month", "close"])
df["month"] = pd.to_datetime(df["month"])
df["close"] = df["close"].astype(float)
return df
async def get_daily_cash_flow(db: AsyncSession, user_id: str, days: int = 90) -> pd.DataFrame:
cutoff = date.today() - timedelta(days=days)
result = await db.execute(text("""
SELECT
t.date::date AS ds,
SUM(CASE WHEN t.amount > 0 THEN t.amount ELSE 0 END)::float AS inflow,
SUM(CASE WHEN t.amount < 0 THEN ABS(t.amount) ELSE 0 END)::float AS outflow
FROM transactions t
WHERE t.user_id = CAST(:uid AS uuid)
AND t.deleted_at IS NULL
AND t.status != 'void'
AND t.type IN ('income', 'expense')
AND t.date >= :cutoff
GROUP BY t.date
ORDER BY t.date ASC
"""), {"uid": str(user_id), "cutoff": cutoff})
rows = result.fetchall()
if not rows:
return pd.DataFrame(columns=["ds", "inflow", "outflow"])
df = pd.DataFrame(rows, columns=["ds", "inflow", "outflow"])
df["ds"] = pd.to_datetime(df["ds"])
return df