""" Auth endpoints: register, login, TOTP, refresh, logout, sessions. """ from __future__ import annotations from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from redis.asyncio import Redis from sqlalchemy.ext.asyncio import AsyncSession from app.core.audit import write_audit from app.core.rate_limiter import is_rate_limited from app.core.security import create_refresh_token, decode_token, generate_csrf_token, hash_token from app.schemas.auth import TOTPEnableRequest from app.dependencies import get_current_user, get_db, get_redis from app.schemas.auth import ( LoginRequest, RegisterRequest, SessionInfo, TOTPChallengeResponse, TOTPLoginRequest, TOTPSetupResponse, TokenResponse, ) from app.services.auth_service import ( AuthError, authenticate_user, complete_totp_login, create_totp_challenge_token, disable_totp, enable_totp, get_sessions, register_user, revoke_all_sessions, revoke_session, setup_totp, ) router = APIRouter() def _ip(request: Request) -> str | None: forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() return request.client.host if request.client else None def _ua(request: Request) -> str | None: return request.headers.get("User-Agent") def _set_refresh_cookie(response: Response, token: str) -> None: response.set_cookie( "refresh_token", token, httponly=True, secure=True, samesite="strict", max_age=7 * 24 * 3600, path="/api/v1/auth", ) def _set_csrf_cookie(response: Response, token: str) -> None: response.set_cookie( "csrf_token", token, httponly=False, secure=True, samesite="strict", max_age=86400, ) @router.post("/register", status_code=201) async def register( body: RegisterRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), ): try: user = await register_user(db, body.email, body.password, body.display_name) await write_audit(db, user_id=user.id, action="register", ip_address=_ip(request)) await db.commit() except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) return {"message": "Account created. Please log in."} @router.post("/login") async def login( body: LoginRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), ): ip = _ip(request) or "unknown" limited, _ = await is_rate_limited(redis, f"rate:login:{ip}", limit=20, window_seconds=60) if limited: raise HTTPException(status_code=429, detail="Too many login attempts — try again shortly") try: user, access_token, refresh_token = await authenticate_user( db, redis, body.email, body.password, _ip(request), _ua(request) ) except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) if access_token is None: # TOTP required challenge_token = create_totp_challenge_token(user.id) await write_audit(db, user_id=user.id, action="login", ip_address=_ip(request), metadata={"totp_required": True}) await db.commit() return TOTPChallengeResponse(challenge_token=challenge_token) csrf = generate_csrf_token() _set_refresh_cookie(response, refresh_token) _set_csrf_cookie(response, csrf) await write_audit(db, user_id=user.id, action="login", ip_address=_ip(request)) await db.commit() settings_expire = 15 * 60 return TokenResponse(access_token=access_token, expires_in=settings_expire) @router.post("/login/totp") async def login_totp( body: TOTPLoginRequest, request: Request, response: Response, db: AsyncSession = Depends(get_db), redis: Redis = Depends(get_redis), ): ip = _ip(request) or "unknown" limited, _ = await is_rate_limited(redis, f"rate:totp:{ip}", limit=10, window_seconds=60) if limited: raise HTTPException(status_code=429, detail="Too many TOTP attempts — try again shortly") try: access_token, refresh_token = await complete_totp_login( db, body.challenge_token, body.totp_code, _ip(request), _ua(request) ) except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) csrf = generate_csrf_token() _set_refresh_cookie(response, refresh_token) _set_csrf_cookie(response, csrf) await db.commit() return TokenResponse(access_token=access_token, expires_in=15 * 60) @router.post("/refresh") async def refresh_token( request: Request, response: Response, db: AsyncSession = Depends(get_db), ): token = request.cookies.get("refresh_token") if not token: raise HTTPException(status_code=401, detail="No refresh token") try: payload = decode_token(token, token_type="refresh") except Exception: raise HTTPException(status_code=401, detail="Invalid refresh token") import uuid from app.core.security import create_access_token from sqlalchemy import select from datetime import datetime, timezone from app.db.models.session import Session user_id = uuid.UUID(payload["sub"]) now = datetime.now(timezone.utc) refresh_hash = hash_token(token) # Find the specific session this refresh token was issued for result = await db.execute( select(Session).where( Session.user_id == user_id, Session.refresh_token_hash == refresh_hash, Session.revoked_at.is_(None), Session.expires_at > now, ) ) session = result.scalars().first() if not session: raise HTTPException(status_code=401, detail="Session not found or refresh token already used") new_access = create_access_token(str(user_id)) new_refresh = create_refresh_token(str(user_id)) # Rotate both token hashes — old refresh token is now invalid session.token_hash = hash_token(new_access) session.refresh_token_hash = hash_token(new_refresh) session.last_active_at = now await db.commit() csrf = generate_csrf_token() _set_refresh_cookie(response, new_refresh) _set_csrf_cookie(response, csrf) return TokenResponse(access_token=new_access, expires_in=15 * 60) @router.post("/logout") async def logout( request: Request, response: Response, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): token = request.headers.get("Authorization", "")[7:] th = hash_token(token) await revoke_session_by_hash(db, th, user.id) await write_audit(db, user_id=user.id, action="logout", ip_address=_ip(request)) await db.commit() response.delete_cookie("refresh_token", path="/api/v1/auth") response.delete_cookie("csrf_token") return {"message": "Logged out"} async def revoke_session_by_hash(db, token_hash: str, user_id): from sqlalchemy import select, update from datetime import datetime, timezone from app.db.models.session import Session await db.execute( update(Session) .where(Session.user_id == user_id, Session.token_hash == token_hash) .values(revoked_at=datetime.now(timezone.utc)) ) @router.post("/logout-all") async def logout_all( request: Request, response: Response, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): token = request.headers.get("Authorization", "")[7:] await revoke_all_sessions(db, user.id) await write_audit(db, user_id=user.id, action="logout_all", ip_address=_ip(request)) await db.commit() response.delete_cookie("refresh_token", path="/api/v1/auth") response.delete_cookie("csrf_token") return {"message": "All sessions revoked"} @router.get("/sessions", response_model=list[SessionInfo]) async def list_sessions( request: Request, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): token = request.headers.get("Authorization", "")[7:] current_hash = hash_token(token) sessions = await get_sessions(db, user.id) result = [] for s in sessions: info = SessionInfo.model_validate(s) info.is_current = (s.token_hash == current_hash) result.append(info) return result @router.delete("/sessions/{session_id}", status_code=204) async def delete_session( session_id, request: Request, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): import uuid try: sid = uuid.UUID(str(session_id)) except ValueError: raise HTTPException(status_code=422, detail="Invalid session ID") try: await revoke_session(db, sid, user.id) except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) await write_audit(db, user_id=user.id, action="session_revoke", resource_type="session", resource_id=sid) await db.commit() @router.get("/totp/setup", response_model=TOTPSetupResponse) async def totp_setup( db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): secret, qr_b64, backup_codes = await setup_totp(user, db) return TOTPSetupResponse(secret=secret, qr_code_png_b64=qr_b64, backup_codes=backup_codes) @router.post("/totp/enable", status_code=200) async def totp_enable( body: TOTPEnableRequest, request: Request, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): try: await enable_totp(user, db, body.secret, body.code) except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) await write_audit(db, user_id=user.id, action="totp_enable", ip_address=_ip(request)) await db.commit() return {"message": "TOTP enabled"} @router.delete("/totp", status_code=200) async def totp_disable( body: dict, request: Request, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): password = body.get("password") if not password: raise HTTPException(status_code=422, detail="password required") try: await disable_totp(user, db, password) except AuthError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) await write_audit(db, user_id=user.id, action="totp_disable", ip_address=_ip(request)) await db.commit() return {"message": "TOTP disabled"}