MyMidas/backend/app/core/middleware.py
megaproxy fe4e69b9ad Complete Phase 3, Phase 5 polish and hardening
Phase 3 — Investments:
- Multi-currency support: holdings track purchase currency, FX rates convert to base for totals
- Capital gains report using UK Section 104 pool method, grouped by tax year
- Capital Gains tab added to Reports page

Phase 5 — Polish & Hardening:
- Mobile-responsive layout: bottom nav, sidebar hidden on mobile, logo in TopBar, compact header buttons, hover-only actions now always visible on touch
- Backup system: encrypted GPG backups via backup.sh, nightly scheduler job, admin API (list/trigger/download/restore), Settings UI with drag-to-restore confirmation
- Docker entrypoint with gosu privilege drop to fix bind-mount ownership on fresh deployments
- OWASP fixes: refresh token now bound to its session (new refresh_token_hash column + migration), CSRF secure flag tied to environment, IP-level rate limiting on login, TOTPEnableRequest Pydantic schema replaces raw dict
- AES-256-GCM key rotation script (rotate_keys.py) with dry-run mode and atomic DB transaction
- CLAUDE.md added for AI-assisted development context
- README updated: correct reverse proxy port, accurate backup/restore commands, key rotation instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 14:59:11 +00:00

83 lines
3 KiB
Python

"""
Security middleware: headers, CSRF double-submit, request ID, RLS user context.
"""
import uuid
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from app.config import get_settings
SAFE_METHODS = {"GET", "HEAD", "OPTIONS"}
SECURITY_HEADERS = {
"X-Frame-Options": "DENY",
"X-Content-Type-Options": "nosniff",
"Referrer-Policy": "strict-origin-when-cross-origin",
"Permissions-Policy": "camera=(), microphone=(), geolocation=()",
"Cross-Origin-Opener-Policy": "same-origin",
"Cross-Origin-Resource-Policy": "same-origin",
"Strict-Transport-Security": "max-age=63072000; includeSubDomains",
"Content-Security-Policy": (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"connect-src 'self'; "
"form-action 'self'; "
"frame-ancestors 'none'"
),
}
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response: Response = await call_next(request)
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
response.headers["X-Request-ID"] = str(uuid.uuid4())
return response
class CSRFMiddleware(BaseHTTPMiddleware):
"""Double-submit cookie CSRF protection for mutating requests."""
EXEMPT_PATHS = {"/api/v1/auth/login", "/api/v1/auth/refresh", "/api/v1/auth/register", "/health"}
async def dispatch(self, request: Request, call_next):
# Always set the csrf_token cookie if it doesn't exist yet
existing_csrf = request.cookies.get("csrf_token")
if request.method in SAFE_METHODS:
response: Response = await call_next(request)
if not existing_csrf:
token = str(uuid.uuid4())
response.set_cookie(
"csrf_token", token,
httponly=False, # must be readable by JS
samesite="lax",
secure=not get_settings().is_development,
)
return response
if request.url.path in self.EXEMPT_PATHS:
response = await call_next(request)
if not existing_csrf:
token = str(uuid.uuid4())
response.set_cookie("csrf_token", token, httponly=False, samesite="lax", secure=not get_settings().is_development)
return response
if request.url.path in {"/api/v1/auth/login", "/api/v1/auth/login/totp"}:
return await call_next(request)
cookie_token = existing_csrf
header_token = request.headers.get("X-CSRF-Token")
if not cookie_token or not header_token or cookie_token != header_token:
return JSONResponse(
status_code=403,
content={"detail": "CSRF token missing or invalid"},
)
return await call_next(request)