from __future__ import annotations from datetime import date from dateutil.relativedelta import relativedelta import numpy as np import pandas as pd DEFAULT_MU = 0.07 / 12 # 7% annual expected return, monthly DEFAULT_SIGMA = 0.15 / (12 ** 0.5) # 15% annual vol, monthly DT = 1.0 / 12 def _project_months(from_date: date, n: int) -> list[str]: d = from_date.replace(day=1) return [(d + relativedelta(months=i + 1)).strftime("%Y-%m") for i in range(n)] def run_monte_carlo( prices_df: pd.DataFrame, holdings: list[dict], years: int = 5, n_sims: int = 1000, annual_contribution: float = 0.0, ) -> dict: """ prices_df: columns [symbol, month, close] holdings: [{"symbol": str, "quantity": float, "current_value": float}] Returns percentile paths and summary stats. """ n_months = years * 12 today = date.today() future_dates = _project_months(today, n_months) monthly_contribution = annual_contribution / 12.0 symbols = [h["symbol"] for h in holdings] current_values = np.array([float(h.get("current_value") or 0) for h in holdings]) total_value = float(current_values.sum()) if total_value <= 0: return { "dates": future_dates, "percentiles": {}, "current_value": 0.0, "expected_value": 0.0, "probability_of_gain": 0.5, "insufficient_data": True, } # Compute per-asset parameters from price history n_assets = len(symbols) mus = np.full(n_assets, DEFAULT_MU) sigmas = np.full(n_assets, DEFAULT_SIGMA) corr = np.eye(n_assets) if not prices_df.empty: for i, sym in enumerate(symbols): sym_prices = prices_df[prices_df["symbol"] == sym].sort_values("month") if len(sym_prices) >= 3: closes = sym_prices["close"].values.astype(float) log_rets = np.diff(np.log(closes[closes > 0])) if len(log_rets) >= 2: mus[i] = float(np.mean(log_rets)) sigmas[i] = float(np.std(log_rets)) # Build correlation matrix from overlapping return series if n_assets > 1: ret_series = {} for sym in symbols: sym_prices = prices_df[prices_df["symbol"] == sym].sort_values("month") if len(sym_prices) >= 3: closes = sym_prices["close"].values.astype(float) log_rets = np.diff(np.log(closes[closes > 0])) ret_series[sym] = log_rets if len(ret_series) == n_assets: min_len = min(len(v) for v in ret_series.values()) if min_len >= 3: matrix = np.array([v[-min_len:] for v in ret_series.values()]) corr = np.corrcoef(matrix) corr = np.clip(corr, -0.99, 0.99) np.fill_diagonal(corr, 1.0) # Covariance matrix and Cholesky decomposition cov = np.outer(sigmas, sigmas) * corr try: L = np.linalg.cholesky(cov) except np.linalg.LinAlgError: # Fall back to diagonal covariance L = np.diag(sigmas) # Portfolio weights weights = current_values / total_value # GBM simulation rng = np.random.default_rng(42) portfolio_paths = np.zeros((n_sims, n_months)) for sim in range(n_sims): asset_values = current_values.copy() for t in range(n_months): # Add monthly contribution before GBM step so it compounds if monthly_contribution > 0: asset_values = asset_values + weights * monthly_contribution Z = rng.standard_normal(n_assets) corr_Z = L @ Z asset_values = asset_values * np.exp( (mus - 0.5 * sigmas ** 2) * DT + sigmas * np.sqrt(DT) * corr_Z ) portfolio_paths[sim, t] = max(0.0, float(asset_values.sum())) # Compute percentile paths pcts = { "p10": np.percentile(portfolio_paths, 10, axis=0), "p25": np.percentile(portfolio_paths, 25, axis=0), "p50": np.percentile(portfolio_paths, 50, axis=0), "p75": np.percentile(portfolio_paths, 75, axis=0), "p90": np.percentile(portfolio_paths, 90, axis=0), } final_values = portfolio_paths[:, -1] prob_gain = float(np.mean(final_values > total_value)) expected_value = float(np.median(final_values)) # Year-by-year milestones (at each 12-month boundary) milestones = [] for yr in range(1, years + 1): idx = min(yr * 12 - 1, n_months - 1) milestones.append({ "year": yr, "date": future_dates[idx], "p10": round(float(np.percentile(portfolio_paths[:, idx], 10)), 2), "p50": round(float(np.percentile(portfolio_paths[:, idx], 50)), 2), "p90": round(float(np.percentile(portfolio_paths[:, idx], 90)), 2), }) return { "dates": future_dates, "percentiles": { k: [{"date": d, "value": round(float(v), 2)} for d, v in zip(future_dates, arr)] for k, arr in pcts.items() }, "current_value": round(total_value, 2), "expected_value": round(expected_value, 2), "probability_of_gain": round(prob_gain, 3), "milestones": milestones, "insufficient_data": False, }