MyMidas/backend/app/core/security.py
megaproxy c7b868e585 Rebrand to MyMidas
- Browser tab title: MyMidas
- Sidebar logo: Coins icon + MyMidas text
- Login page header: MyMidas
- TOTP issuer name: MyMidas (shows in authenticator apps)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 12:11:52 +00:00

197 lines
5.8 KiB
Python

"""
Cryptographic primitives: Argon2id password hashing, RS256 JWT, AES-256-GCM field encryption, TOTP.
"""
import base64
import os
import secrets
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import pyotp
import qrcode
import qrcode.image.svg
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from jose import JWTError, jwt
from app.config import get_settings
# Argon2id — OWASP recommended parameters
_ph = PasswordHasher(
time_cost=3,
memory_cost=65536,
parallelism=4,
hash_len=32,
salt_len=16,
)
# ---------------------------------------------------------------------------
# Password hashing
# ---------------------------------------------------------------------------
def hash_password(password: str) -> str:
return _ph.hash(password)
def verify_password(password: str, hashed: str) -> bool:
try:
return _ph.verify(hashed, password)
except (VerifyMismatchError, VerificationError, InvalidHashError):
return False
def password_needs_rehash(hashed: str) -> bool:
return _ph.check_needs_rehash(hashed)
# ---------------------------------------------------------------------------
# JWT (RS256)
# ---------------------------------------------------------------------------
def _load_private_key() -> str:
settings = get_settings()
return Path(settings.jwt_private_key_file).read_text()
def _load_public_key() -> str:
settings = get_settings()
return Path(settings.jwt_public_key_file).read_text()
def create_access_token(subject: str, extra: dict[str, Any] | None = None) -> str:
settings = get_settings()
now = datetime.now(timezone.utc)
payload: dict[str, Any] = {
"sub": subject,
"iat": now,
"exp": now + timedelta(minutes=settings.access_token_expire_minutes),
"type": "access",
}
if extra:
payload.update(extra)
return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm)
def create_refresh_token(subject: str) -> str:
settings = get_settings()
now = datetime.now(timezone.utc)
payload: dict[str, Any] = {
"sub": subject,
"iat": now,
"exp": now + timedelta(days=settings.refresh_token_expire_days),
"type": "refresh",
"jti": secrets.token_hex(16),
}
return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm)
def decode_token(token: str, token_type: str = "access") -> dict[str, Any]:
settings = get_settings()
payload = jwt.decode(
token,
_load_public_key(),
algorithms=[settings.jwt_algorithm],
options={"verify_exp": True},
)
if payload.get("type") != token_type:
raise JWTError("Invalid token type")
return payload
# ---------------------------------------------------------------------------
# AES-256-GCM field encryption
# ---------------------------------------------------------------------------
def _get_aes_key() -> bytes:
"""Derive 32-byte key from hex ENCRYPTION_KEY env var."""
settings = get_settings()
key_hex = settings.encryption_key
key = bytes.fromhex(key_hex)
if len(key) != 32:
raise ValueError("ENCRYPTION_KEY must be a 32-byte hex string (64 hex chars)")
return key
def encrypt_field(plaintext: str) -> bytes:
"""Encrypt a string field. Returns IV(12) || ciphertext || tag(16) as bytes."""
if not plaintext:
return b""
key = _get_aes_key()
iv = os.urandom(12)
aesgcm = AESGCM(key)
ciphertext_with_tag = aesgcm.encrypt(iv, plaintext.encode(), None)
return iv + ciphertext_with_tag
def decrypt_field(data: bytes) -> str:
"""Decrypt bytes produced by encrypt_field."""
if not data:
return ""
key = _get_aes_key()
iv = data[:12]
ciphertext_with_tag = data[12:]
aesgcm = AESGCM(key)
return aesgcm.decrypt(iv, ciphertext_with_tag, None).decode()
def encrypt_field_b64(plaintext: str) -> str:
"""Convenience: encrypt and return base64 string (for JSON/text contexts)."""
return base64.b64encode(encrypt_field(plaintext)).decode()
def decrypt_field_b64(data: str) -> str:
return decrypt_field(base64.b64decode(data))
# ---------------------------------------------------------------------------
# TOTP (RFC 6238)
# ---------------------------------------------------------------------------
def generate_totp_secret() -> str:
return pyotp.random_base32()
def get_totp_uri(secret: str, email: str) -> str:
return pyotp.totp.TOTP(secret).provisioning_uri(
name=email, issuer_name="MyMidas"
)
def generate_totp_qr_png(secret: str, email: str) -> bytes:
uri = get_totp_uri(secret, email)
img = qrcode.make(uri)
from io import BytesIO
buf = BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
def verify_totp(secret: str, code: str) -> bool:
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1)
# ---------------------------------------------------------------------------
# CSRF token
# ---------------------------------------------------------------------------
def generate_csrf_token() -> str:
return secrets.token_hex(32)
# ---------------------------------------------------------------------------
# Misc helpers
# ---------------------------------------------------------------------------
def generate_backup_codes(count: int = 8) -> list[str]:
"""Generate one-time backup codes."""
return [secrets.token_hex(4).upper() + "-" + secrets.token_hex(4).upper() for _ in range(count)]
def hash_token(token: str) -> str:
"""SHA-256 hash of a bearer token for DB storage."""
import hashlib
return hashlib.sha256(token.encode()).hexdigest()