from __future__ import annotations import warnings from datetime import date from dateutil.relativedelta import relativedelta import numpy as np import pandas as pd warnings.filterwarnings("ignore") MIN_POINTS = 3 FORECAST_MONTHS = 6 def _next_month_starts(from_date: date, n: int) -> list[str]: months = [] d = from_date.replace(day=1) + relativedelta(months=1) for _ in range(n): months.append(d.strftime("%Y-%m-%d")) d += relativedelta(months=1) return months def _fit_sarima(values: list[float], n: int) -> tuple[list[float], list[float], list[float], list[float], list[float], str]: """ Primary algorithm. Uses SARIMAX with seasonal component when enough data exists, plain ARIMA otherwise. Returns (forecast, lower_80, upper_80, lower_95, upper_95, algorithm). """ from statsmodels.tsa.statespace.sarimax import SARIMAX series = np.array(values, dtype=float) algo = "sarima" try: if len(series) >= 12: # Seasonal ARIMA with annual period model = SARIMAX(series, order=(1, 1, 1), seasonal_order=(1, 0, 1, 12), enforce_stationarity=False, enforce_invertibility=False) else: model = SARIMAX(series, order=(1, 1, 1), enforce_stationarity=False, enforce_invertibility=False) fit = model.fit(disp=False, maxiter=200) forecast_obj = fit.get_forecast(steps=n) mean = forecast_obj.predicted_mean ci_80 = forecast_obj.conf_int(alpha=0.20) # 80% interval ci_95 = forecast_obj.conf_int(alpha=0.05) # 95% interval lower_80 = np.maximum(0, ci_80.iloc[:, 0].values).tolist() upper_80 = ci_80.iloc[:, 1].values.tolist() lower_95 = np.maximum(0, ci_95.iloc[:, 0].values).tolist() upper_95 = ci_95.iloc[:, 1].values.tolist() return mean.tolist(), lower_80, upper_80, lower_95, upper_95, algo except Exception: return _fit_holt(values, n) def _fit_holt(values: list[float], n: int) -> tuple[list[float], list[float], list[float], list[float], list[float], str]: """Holt-Winters fallback.""" from statsmodels.tsa.holtwinters import ExponentialSmoothing try: if len(values) >= 12: model = ExponentialSmoothing(values, trend="add", seasonal="add", seasonal_periods=12) elif len(values) >= 4: model = ExponentialSmoothing(values, trend="add", seasonal=None) else: model = ExponentialSmoothing(values, trend=None, seasonal=None) fit = model.fit(optimized=True, disp=False) forecast = fit.forecast(n) sigma = float(np.std(fit.resid)) if len(fit.resid) > 1 else float(np.mean(values) * 0.15) lower_80 = np.maximum(0, forecast - 1.28 * sigma).tolist() upper_80 = (forecast + 1.28 * sigma).tolist() lower_95 = np.maximum(0, forecast - 1.96 * sigma).tolist() upper_95 = (forecast + 1.96 * sigma).tolist() return forecast.tolist(), lower_80, upper_80, lower_95, upper_95, "holt_winters" except Exception: avg = float(np.mean(values)) sigma = float(np.std(values)) if len(values) > 1 else avg * 0.15 fcast = [avg] * n lower_80 = [max(0.0, avg - 1.28 * sigma)] * n upper_80 = [(avg + 1.28 * sigma)] * n lower_95 = [max(0.0, avg - 1.96 * sigma)] * n upper_95 = [(avg + 1.96 * sigma)] * n return fcast, lower_80, upper_80, lower_95, upper_95, "average" def forecast_spending(df: pd.DataFrame) -> list[dict]: """ df columns: category_id, category_name, ds (monthly), y (amount) Returns list of category forecast dicts. """ if df.empty: return [] today = date.today() future_dates = _next_month_starts(today, FORECAST_MONTHS) results = [] for (cat_id, cat_name), group in df.groupby(["category_id", "category_name"]): group = group.sort_values("ds") values = group["y"].tolist() actuals = [ {"date": row["ds"].strftime("%Y-%m-%d"), "amount": round(float(row["y"]), 2)} for _, row in group.iterrows() ] if len(values) < MIN_POINTS: avg = float(np.mean(values)) sigma = avg * 0.15 forecast_pts = [ { "date": d, "amount": round(avg, 2), "lower": round(max(0.0, avg - 1.28 * sigma), 2), "upper": round(avg + 1.28 * sigma, 2), "lower_95": round(max(0.0, avg - 1.96 * sigma), 2), "upper_95": round(avg + 1.96 * sigma, 2), } for d in future_dates ] algo = "average" else: fcast, lower_80, upper_80, lower_95, upper_95, algo = _fit_sarima(values, FORECAST_MONTHS) forecast_pts = [ { "date": d, "amount": round(max(0.0, f), 2), "lower": round(l80, 2), "upper": round(u80, 2), "lower_95": round(l95, 2), "upper_95": round(u95, 2), } for d, f, l80, u80, l95, u95 in zip(future_dates, fcast, lower_80, upper_80, lower_95, upper_95) ] results.append({ "category_id": str(cat_id), "category_name": cat_name, "monthly_avg": round(float(np.mean(values)), 2), "algorithm": algo, "actuals": actuals[-12:], # last 12 months for display "forecast": forecast_pts, }) results.sort(key=lambda x: x["monthly_avg"], reverse=True) return results