MyMidas/backend/app/api/v1/users.py
megaproxy 61a7884ee5 Initial commit: MyMidas personal finance tracker
Full-stack self-hosted finance app with FastAPI backend and React frontend.

Features:
- Accounts, transactions, budgets, investments with GBP base currency
- CSV import with auto-detection for 10 UK bank formats
- ML predictions: spending forecast, net worth projection, Monte Carlo
- 7 selectable themes (Obsidian, Arctic, Midnight, Vault, Terminal, Synthwave, Ledger)
- Receipt/document attachments on transactions (JPEG, PNG, WebP, PDF)
- AES-256-GCM field encryption, RS256 JWT, TOTP 2FA, RLS, audit log
- Encrypted nightly backups + key rotation script
- Mobile-responsive layout with bottom nav

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 11:56:10 +00:00

126 lines
4 KiB
Python

import csv
import io
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.audit import write_audit
from app.core.security import hash_password, verify_password
from app.dependencies import get_current_user, get_db
router = APIRouter()
@router.get("/me")
async def get_me(user=Depends(get_current_user)):
return {
"id": str(user.id),
"email": user.email,
"display_name": user.display_name,
"base_currency": user.base_currency,
"theme": user.theme,
"locale": user.locale,
"totp_enabled": user.totp_enabled,
"last_login_at": user.last_login_at,
"created_at": user.created_at,
}
class PasswordChangeRequest(BaseModel):
current_password: str
new_password: str = Field(..., min_length=10)
@router.post("/me/password", status_code=200)
async def change_password(
body: PasswordChangeRequest,
db: AsyncSession = Depends(get_db),
user=Depends(get_current_user),
):
if not verify_password(body.current_password, user.password_hash):
raise HTTPException(status_code=400, detail="Current password is incorrect")
user.password_hash = hash_password(body.new_password)
user.updated_at = datetime.now(timezone.utc)
await write_audit(db, user_id=user.id, action="password_change")
await db.commit()
return {"message": "Password updated successfully"}
class ProfileUpdateRequest(BaseModel):
display_name: str | None = Field(default=None, max_length=100)
base_currency: str | None = Field(default=None, min_length=3, max_length=10)
@router.put("/me", status_code=200)
async def update_profile(
body: ProfileUpdateRequest,
db: AsyncSession = Depends(get_db),
user=Depends(get_current_user),
):
if body.display_name is not None:
user.display_name = body.display_name
if body.base_currency is not None:
user.base_currency = body.base_currency.upper()
user.updated_at = datetime.now(timezone.utc)
await db.commit()
return {"message": "Profile updated"}
@router.get("/me/export")
async def export_data(
db: AsyncSession = Depends(get_db),
user=Depends(get_current_user),
):
from app.db.models.transaction import Transaction
from app.db.models.account import Account
from app.db.models.category import Category
from app.core.security import decrypt_field
result = await db.execute(
select(Transaction, Account, Category)
.join(Account, Account.id == Transaction.account_id)
.outerjoin(Category, Category.id == Transaction.category_id)
.where(
Transaction.user_id == user.id,
Transaction.deleted_at.is_(None),
)
.order_by(Transaction.date.desc())
)
rows = result.all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow([
"date", "description", "merchant", "amount", "currency",
"type", "status", "category", "account", "notes", "tags",
])
for txn, account, category in rows:
writer.writerow([
txn.date.isoformat(),
decrypt_field(txn.description_enc) or "",
decrypt_field(txn.merchant_enc) if txn.merchant_enc else "",
str(txn.amount),
txn.currency,
txn.type,
txn.status,
category.name if category else "",
decrypt_field(account.name_enc) or "",
decrypt_field(txn.notes_enc) if txn.notes_enc else "",
",".join(txn.tags or []),
])
output.seek(0)
filename = f"transactions_{datetime.now(timezone.utc).strftime('%Y%m%d')}.csv"
await write_audit(db, user_id=user.id, action="data_export")
await db.commit()
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)