""" Security middleware: headers, CSRF double-submit, request ID, RLS user context. """ import uuid from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse SAFE_METHODS = {"GET", "HEAD", "OPTIONS"} SECURITY_HEADERS = { "X-Frame-Options": "DENY", "X-Content-Type-Options": "nosniff", "Referrer-Policy": "strict-origin-when-cross-origin", "Permissions-Policy": "camera=(), microphone=(), geolocation=()", "Cross-Origin-Opener-Policy": "same-origin", "Cross-Origin-Resource-Policy": "same-origin", "Strict-Transport-Security": "max-age=63072000; includeSubDomains", "Content-Security-Policy": ( "default-src 'self'; " "script-src 'self'; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data:; " "connect-src 'self'; " "form-action 'self'; " "frame-ancestors 'none'" ), } class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response: Response = await call_next(request) for header, value in SECURITY_HEADERS.items(): response.headers[header] = value response.headers["X-Request-ID"] = str(uuid.uuid4()) return response class CSRFMiddleware(BaseHTTPMiddleware): """Double-submit cookie CSRF protection for mutating requests.""" EXEMPT_PATHS = {"/api/v1/auth/login", "/api/v1/auth/refresh", "/api/v1/auth/register", "/health"} async def dispatch(self, request: Request, call_next): # Always set the csrf_token cookie if it doesn't exist yet existing_csrf = request.cookies.get("csrf_token") if request.method in SAFE_METHODS: response: Response = await call_next(request) if not existing_csrf: token = str(uuid.uuid4()) response.set_cookie( "csrf_token", token, httponly=False, # must be readable by JS samesite="lax", secure=False, # CSRF token is public by design; Secure would break HTTP deployments max_age=604800, # 7 days — survive browser restarts ) return response if request.url.path in self.EXEMPT_PATHS: response = await call_next(request) if not existing_csrf: token = str(uuid.uuid4()) response.set_cookie("csrf_token", token, httponly=False, samesite="lax", secure=False, max_age=604800) return response if request.url.path in {"/api/v1/auth/login", "/api/v1/auth/login/totp"}: return await call_next(request) cookie_token = existing_csrf header_token = request.headers.get("X-CSRF-Token") if not cookie_token or not header_token or cookie_token != header_token: return JSONResponse( status_code=403, content={"detail": "CSRF token missing or invalid"}, ) return await call_next(request)