MyMidas/backend/app/services/report_service.py
megaproxy dd66b2d5fe Add Balance Sheet report
New first-tab report showing a full breakdown of where money sits:
- Three KPI cards: total assets, total liabilities, net worth
- Proportional stacked bars showing asset and liability composition
- Side-by-side account lists grouped by type (Cash, Savings, ISAs,
  Investments, Pension, Crypto vs Credit Cards, Loans, Mortgages)
- Backend endpoint GET /api/v1/reports/balance-sheet with typed schema
- Balance Sheet is now the default tab on the Reports page

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 15:33:34 +00:00

429 lines
14 KiB
Python

import uuid
from datetime import date, datetime, timezone
from decimal import Decimal
from dateutil.relativedelta import relativedelta
from sqlalchemy import and_, func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.account import Account
from app.db.models.budget import Budget
from app.db.models.category import Category
from app.db.models.net_worth_snapshot import NetWorthSnapshot
from app.db.models.transaction import Transaction
from app.core.security import decrypt_field
from app.schemas.report import (
BalanceSheetAccount,
BalanceSheetGroup,
BalanceSheetReport,
BudgetVsActualItem,
BudgetVsActualReport,
CashFlowPoint,
CashFlowReport,
CategoryBreakdownItem,
CategoryBreakdownReport,
IncomeExpensePoint,
IncomeExpenseReport,
NetWorthPoint,
NetWorthReport,
SpendingTrendPoint,
SpendingTrendsReport,
)
LIABILITY_TYPES = {"credit_card", "loan", "mortgage"}
async def _current_net_worth(db: AsyncSession, user_id: uuid.UUID) -> tuple[Decimal, Decimal]:
result = await db.execute(
select(Account).where(
Account.user_id == user_id,
Account.include_in_net_worth == True, # noqa: E712
Account.is_active == True, # noqa: E712
Account.deleted_at.is_(None),
)
)
accounts = result.scalars().all()
assets = Decimal("0")
liabilities = Decimal("0")
for acc in accounts:
bal = acc.current_balance or Decimal("0")
if acc.type in LIABILITY_TYPES:
liabilities += bal
else:
assets += bal
return assets, liabilities
async def get_net_worth_report(
db: AsyncSession, user_id: uuid.UUID, base_currency: str, months: int = 12
) -> NetWorthReport:
cutoff = date.today() - relativedelta(months=months)
result = await db.execute(
select(NetWorthSnapshot)
.where(NetWorthSnapshot.user_id == user_id, NetWorthSnapshot.date >= cutoff)
.order_by(NetWorthSnapshot.date.asc())
)
snapshots = result.scalars().all()
points = [
NetWorthPoint(
date=s.date,
total_assets=s.total_assets,
total_liabilities=s.total_liabilities,
net_worth=s.net_worth,
base_currency=s.base_currency,
)
for s in snapshots
]
assets, liabilities = await _current_net_worth(db, user_id)
current_nw = assets - liabilities
change_30d = Decimal("0")
change_30d_pct = Decimal("0")
if points:
past_nw = points[0].net_worth
change_30d = current_nw - past_nw
if past_nw != 0:
change_30d_pct = (change_30d / abs(past_nw) * 100).quantize(Decimal("0.01"))
return NetWorthReport(
points=points,
current_net_worth=current_nw,
change_30d=change_30d,
change_30d_pct=change_30d_pct,
base_currency=base_currency,
)
async def get_income_expense_report(
db: AsyncSession, user_id: uuid.UUID, months: int = 12
) -> IncomeExpenseReport:
cutoff = (date.today().replace(day=1) - relativedelta(months=months - 1))
result = await db.execute(
text("""
SELECT
TO_CHAR(date, 'YYYY-MM') AS month,
SUM(CASE WHEN type = 'income' THEN amount ELSE 0 END) AS income,
SUM(CASE WHEN type = 'expense' THEN ABS(amount) ELSE 0 END) AS expenses
FROM transactions
WHERE user_id = CAST(:uid AS uuid)
AND status != 'void'
AND deleted_at IS NULL
AND date >= :cutoff
GROUP BY TO_CHAR(date, 'YYYY-MM')
ORDER BY month ASC
""").bindparams(uid=str(user_id), cutoff=cutoff)
)
rows = result.fetchall()
points = []
total_income = Decimal("0")
total_expenses = Decimal("0")
for row in rows:
inc = Decimal(str(row.income or 0))
exp = Decimal(str(row.expenses or 0))
points.append(IncomeExpensePoint(month=row.month, income=inc, expenses=exp, net=inc - exp))
total_income += inc
total_expenses += exp
n = len(points) or 1
return IncomeExpenseReport(
points=points,
total_income=total_income,
total_expenses=total_expenses,
avg_monthly_income=(total_income / n).quantize(Decimal("0.01")),
avg_monthly_expenses=(total_expenses / n).quantize(Decimal("0.01")),
currency="GBP",
)
async def get_cash_flow_report(
db: AsyncSession, user_id: uuid.UUID, date_from: date, date_to: date
) -> CashFlowReport:
result = await db.execute(
text("""
SELECT
date,
SUM(CASE WHEN amount > 0 THEN amount ELSE 0 END) AS inflow,
SUM(CASE WHEN amount < 0 THEN ABS(amount) ELSE 0 END) AS outflow
FROM transactions
WHERE user_id = CAST(:uid AS uuid)
AND status != 'void'
AND deleted_at IS NULL
AND date BETWEEN :df AND :dt
AND type IN ('income', 'expense')
GROUP BY date
ORDER BY date ASC
""").bindparams(uid=str(user_id), df=date_from, dt=date_to)
)
rows = result.fetchall()
points = []
running = Decimal("0")
total_inflow = Decimal("0")
total_outflow = Decimal("0")
for row in rows:
inflow = Decimal(str(row.inflow or 0))
outflow = Decimal(str(row.outflow or 0))
running += inflow - outflow
total_inflow += inflow
total_outflow += outflow
points.append(
CashFlowPoint(
date=row.date,
inflow=inflow,
outflow=outflow,
net=inflow - outflow,
running_balance=running,
)
)
return CashFlowReport(
points=points,
total_inflow=total_inflow,
total_outflow=total_outflow,
currency="GBP",
)
async def get_category_breakdown(
db: AsyncSession,
user_id: uuid.UUID,
date_from: date,
date_to: date,
txn_type: str = "expense",
) -> CategoryBreakdownReport:
result = await db.execute(
select(
Transaction.category_id,
func.sum(func.abs(Transaction.amount)).label("total"),
func.count(Transaction.id).label("cnt"),
)
.where(
Transaction.user_id == user_id,
Transaction.type == txn_type,
Transaction.status != "void",
Transaction.date >= date_from,
Transaction.date <= date_to,
Transaction.deleted_at.is_(None),
)
.group_by(Transaction.category_id)
.order_by(func.sum(func.abs(Transaction.amount)).desc())
)
rows = result.fetchall()
grand_total = Decimal("0")
raw = []
for row in rows:
amt = Decimal(str(row.total or 0))
grand_total += amt
if row.category_id:
cat_result = await db.execute(select(Category).where(Category.id == row.category_id))
category = cat_result.scalar_one_or_none()
cat_name = category.name if category else "Uncategorised"
else:
cat_name = "Uncategorised"
raw.append((row.category_id, cat_name, amt, row.cnt))
items = [
CategoryBreakdownItem(
category_id=str(cat_id) if cat_id else None,
category_name=name,
amount=amt,
percent=(amt / grand_total * 100).quantize(Decimal("0.01")) if grand_total > 0 else Decimal("0"),
transaction_count=cnt,
)
for cat_id, name, amt, cnt in raw
]
return CategoryBreakdownReport(
items=items,
total=grand_total,
currency="GBP",
date_from=date_from,
date_to=date_to,
)
async def get_budget_vs_actual(db: AsyncSession, user_id: uuid.UUID) -> BudgetVsActualReport:
from app.services.budget_service import list_budgets, _period_bounds
today = date.today()
budgets = await list_budgets(db, user_id, active_only=True)
items = []
total_budgeted = Decimal("0")
total_actual = Decimal("0")
for budget in budgets:
period_start, period_end = _period_bounds(budget.period, today)
cat_result = await db.execute(select(Category).where(Category.id == budget.category_id))
category = cat_result.scalar_one_or_none()
cat_name = category.name if category else "Unknown"
spent_result = await db.execute(
select(func.coalesce(func.sum(func.abs(Transaction.amount)), Decimal("0")))
.where(
and_(
Transaction.user_id == user_id,
Transaction.category_id == budget.category_id,
Transaction.type == "expense",
Transaction.status != "void",
Transaction.date >= period_start,
Transaction.date <= period_end,
Transaction.deleted_at.is_(None),
)
)
)
actual = Decimal(str(spent_result.scalar() or 0))
variance = budget.amount - actual
pct = (actual / budget.amount * 100).quantize(Decimal("0.01")) if budget.amount > 0 else Decimal("0")
items.append(
BudgetVsActualItem(
budget_id=str(budget.id),
budget_name=budget.name,
category_name=cat_name,
budgeted=budget.amount,
actual=actual,
variance=variance,
percent_used=pct,
)
)
total_budgeted += budget.amount
total_actual += actual
return BudgetVsActualReport(
items=items,
total_budgeted=total_budgeted,
total_actual=total_actual,
currency="GBP",
)
async def get_spending_trends(
db: AsyncSession, user_id: uuid.UUID, months: int = 6
) -> SpendingTrendsReport:
cutoff = (date.today().replace(day=1) - relativedelta(months=months - 1))
result = await db.execute(
text("""
SELECT
TO_CHAR(t.date, 'YYYY-MM') AS month,
COALESCE(c.name, 'Uncategorised') AS category_name,
SUM(ABS(t.amount)) AS amount
FROM transactions t
LEFT JOIN categories c ON c.id = t.category_id
WHERE t.user_id = CAST(:uid AS uuid)
AND t.type = 'expense'
AND t.status != 'void'
AND t.deleted_at IS NULL
AND t.date >= :cutoff
GROUP BY TO_CHAR(t.date, 'YYYY-MM'), c.name
ORDER BY month ASC, amount DESC
""").bindparams(uid=str(user_id), cutoff=cutoff)
)
rows = result.fetchall()
points = [
SpendingTrendPoint(month=row.month, category_name=row.category_name, amount=Decimal(str(row.amount or 0)))
for row in rows
]
categories = list(dict.fromkeys(p.category_name for p in points))
return SpendingTrendsReport(points=points, categories=categories, currency="GBP")
async def get_balance_sheet(
db: AsyncSession, user_id: uuid.UUID, base_currency: str
) -> BalanceSheetReport:
result = await db.execute(
select(Account).where(
Account.user_id == user_id,
Account.is_active == True, # noqa: E712
Account.deleted_at.is_(None),
)
)
accounts = result.scalars().all()
ASSET_GROUPS = [
("Cash & Current Accounts", ["checking", "cash"]),
("Savings", ["savings"]),
("ISAs", ["cash_isa", "stocks_shares_isa"]),
("Investments & Pension", ["investment", "pension"]),
("Crypto", ["crypto_wallet"]),
("Other Assets", ["other"]),
]
LIABILITY_GROUPS = [
("Credit Cards", ["credit_card"]),
("Loans", ["loan"]),
("Mortgages", ["mortgage"]),
]
def build_groups(group_defs: list) -> list[BalanceSheetGroup]:
groups = []
covered: set[str] = set()
for label, type_keys in group_defs:
covered.update(type_keys)
members = [a for a in accounts if a.type in type_keys]
if not members:
continue
acct_items = []
for a in members:
name = decrypt_field(bytes(a.name_enc)) if a.name_enc else "Account"
bal = abs(a.current_balance or Decimal("0"))
acct_items.append(BalanceSheetAccount(
id=str(a.id),
name=name,
type=a.type,
balance=bal,
currency=a.currency or base_currency,
))
groups.append(BalanceSheetGroup(
label=label,
type_keys=type_keys,
accounts=acct_items,
subtotal=sum(i.balance for i in acct_items),
))
return groups
asset_groups = build_groups(ASSET_GROUPS)
liability_groups = build_groups(LIABILITY_GROUPS)
total_assets = sum(g.subtotal for g in asset_groups)
total_liabilities = sum(g.subtotal for g in liability_groups)
return BalanceSheetReport(
asset_groups=asset_groups,
liability_groups=liability_groups,
total_assets=total_assets,
total_liabilities=total_liabilities,
net_worth=total_assets - total_liabilities,
currency=base_currency,
)
async def take_net_worth_snapshot(db: AsyncSession, user_id: uuid.UUID, base_currency: str) -> None:
today = date.today()
existing = await db.execute(
select(NetWorthSnapshot).where(
NetWorthSnapshot.user_id == user_id,
NetWorthSnapshot.date == today,
)
)
if existing.scalar_one_or_none():
return
assets, liabilities = await _current_net_worth(db, user_id)
snapshot = NetWorthSnapshot(
id=uuid.uuid4(),
user_id=user_id,
date=today,
total_assets=assets,
total_liabilities=liabilities,
net_worth=assets - liabilities,
base_currency=base_currency,
breakdown={},
created_at=datetime.now(timezone.utc),
)
db.add(snapshot)
await db.flush()