""" Cryptographic primitives: Argon2id password hashing, RS256 JWT, AES-256-GCM field encryption, TOTP. """ import base64 import os import secrets from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any import pyotp import qrcode import qrcode.image.svg from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError from cryptography.hazmat.primitives.ciphers.aead import AESGCM from jose import JWTError, jwt from app.config import get_settings # Argon2id — OWASP recommended parameters _ph = PasswordHasher( time_cost=3, memory_cost=65536, parallelism=4, hash_len=32, salt_len=16, ) # --------------------------------------------------------------------------- # Password hashing # --------------------------------------------------------------------------- def hash_password(password: str) -> str: return _ph.hash(password) def verify_password(password: str, hashed: str) -> bool: try: return _ph.verify(hashed, password) except (VerifyMismatchError, VerificationError, InvalidHashError): return False def password_needs_rehash(hashed: str) -> bool: return _ph.check_needs_rehash(hashed) # --------------------------------------------------------------------------- # JWT (RS256) # --------------------------------------------------------------------------- def _load_private_key() -> str: settings = get_settings() return Path(settings.jwt_private_key_file).read_text() def _load_public_key() -> str: settings = get_settings() return Path(settings.jwt_public_key_file).read_text() def create_access_token(subject: str, extra: dict[str, Any] | None = None) -> str: settings = get_settings() now = datetime.now(timezone.utc) payload: dict[str, Any] = { "sub": subject, "iat": now, "exp": now + timedelta(minutes=settings.access_token_expire_minutes), "type": "access", } if extra: payload.update(extra) return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm) def create_refresh_token(subject: str) -> str: settings = get_settings() now = datetime.now(timezone.utc) payload: dict[str, Any] = { "sub": subject, "iat": now, "exp": now + timedelta(days=settings.refresh_token_expire_days), "type": "refresh", "jti": secrets.token_hex(16), } return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm) def decode_token(token: str, token_type: str = "access") -> dict[str, Any]: settings = get_settings() payload = jwt.decode( token, _load_public_key(), algorithms=[settings.jwt_algorithm], options={"verify_exp": True}, ) if payload.get("type") != token_type: raise JWTError("Invalid token type") return payload # --------------------------------------------------------------------------- # AES-256-GCM field encryption # --------------------------------------------------------------------------- def _get_aes_key() -> bytes: """Derive 32-byte key from hex ENCRYPTION_KEY env var.""" settings = get_settings() key_hex = settings.encryption_key key = bytes.fromhex(key_hex) if len(key) != 32: raise ValueError("ENCRYPTION_KEY must be a 32-byte hex string (64 hex chars)") return key def encrypt_field(plaintext: str) -> bytes: """Encrypt a string field. Returns IV(12) || ciphertext || tag(16) as bytes.""" if not plaintext: return b"" key = _get_aes_key() iv = os.urandom(12) aesgcm = AESGCM(key) ciphertext_with_tag = aesgcm.encrypt(iv, plaintext.encode(), None) return iv + ciphertext_with_tag def decrypt_field(data: bytes) -> str: """Decrypt bytes produced by encrypt_field.""" if not data: return "" key = _get_aes_key() iv = data[:12] ciphertext_with_tag = data[12:] aesgcm = AESGCM(key) return aesgcm.decrypt(iv, ciphertext_with_tag, None).decode() def encrypt_field_b64(plaintext: str) -> str: """Convenience: encrypt and return base64 string (for JSON/text contexts).""" return base64.b64encode(encrypt_field(plaintext)).decode() def decrypt_field_b64(data: str) -> str: return decrypt_field(base64.b64decode(data)) # --------------------------------------------------------------------------- # TOTP (RFC 6238) # --------------------------------------------------------------------------- def generate_totp_secret() -> str: return pyotp.random_base32() def get_totp_uri(secret: str, email: str) -> str: return pyotp.totp.TOTP(secret).provisioning_uri( name=email, issuer_name="Finance Tracker" ) def generate_totp_qr_png(secret: str, email: str) -> bytes: uri = get_totp_uri(secret, email) img = qrcode.make(uri) from io import BytesIO buf = BytesIO() img.save(buf, format="PNG") return buf.getvalue() def verify_totp(secret: str, code: str) -> bool: totp = pyotp.TOTP(secret) return totp.verify(code, valid_window=1) # --------------------------------------------------------------------------- # CSRF token # --------------------------------------------------------------------------- def generate_csrf_token() -> str: return secrets.token_hex(32) # --------------------------------------------------------------------------- # Misc helpers # --------------------------------------------------------------------------- def generate_backup_codes(count: int = 8) -> list[str]: """Generate one-time backup codes.""" return [secrets.token_hex(4).upper() + "-" + secrets.token_hex(4).upper() for _ in range(count)] def hash_token(token: str) -> str: """SHA-256 hash of a bearer token for DB storage.""" import hashlib return hashlib.sha256(token.encode()).hexdigest()