- Browser tab title: MyMidas - Sidebar logo: Coins icon + MyMidas text - Login page header: MyMidas - TOTP issuer name: MyMidas (shows in authenticator apps) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
197 lines
5.8 KiB
Python
197 lines
5.8 KiB
Python
"""
|
|
Cryptographic primitives: Argon2id password hashing, RS256 JWT, AES-256-GCM field encryption, TOTP.
|
|
"""
|
|
import base64
|
|
import os
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pyotp
|
|
import qrcode
|
|
import qrcode.image.svg
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError, VerificationError, InvalidHashError
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from jose import JWTError, jwt
|
|
|
|
from app.config import get_settings
|
|
|
|
# Argon2id — OWASP recommended parameters
|
|
_ph = PasswordHasher(
|
|
time_cost=3,
|
|
memory_cost=65536,
|
|
parallelism=4,
|
|
hash_len=32,
|
|
salt_len=16,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password hashing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hash_password(password: str) -> str:
|
|
return _ph.hash(password)
|
|
|
|
|
|
def verify_password(password: str, hashed: str) -> bool:
|
|
try:
|
|
return _ph.verify(hashed, password)
|
|
except (VerifyMismatchError, VerificationError, InvalidHashError):
|
|
return False
|
|
|
|
|
|
def password_needs_rehash(hashed: str) -> bool:
|
|
return _ph.check_needs_rehash(hashed)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT (RS256)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_private_key() -> str:
|
|
settings = get_settings()
|
|
return Path(settings.jwt_private_key_file).read_text()
|
|
|
|
|
|
def _load_public_key() -> str:
|
|
settings = get_settings()
|
|
return Path(settings.jwt_public_key_file).read_text()
|
|
|
|
|
|
def create_access_token(subject: str, extra: dict[str, Any] | None = None) -> str:
|
|
settings = get_settings()
|
|
now = datetime.now(timezone.utc)
|
|
payload: dict[str, Any] = {
|
|
"sub": subject,
|
|
"iat": now,
|
|
"exp": now + timedelta(minutes=settings.access_token_expire_minutes),
|
|
"type": "access",
|
|
}
|
|
if extra:
|
|
payload.update(extra)
|
|
return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def create_refresh_token(subject: str) -> str:
|
|
settings = get_settings()
|
|
now = datetime.now(timezone.utc)
|
|
payload: dict[str, Any] = {
|
|
"sub": subject,
|
|
"iat": now,
|
|
"exp": now + timedelta(days=settings.refresh_token_expire_days),
|
|
"type": "refresh",
|
|
"jti": secrets.token_hex(16),
|
|
}
|
|
return jwt.encode(payload, _load_private_key(), algorithm=settings.jwt_algorithm)
|
|
|
|
|
|
def decode_token(token: str, token_type: str = "access") -> dict[str, Any]:
|
|
settings = get_settings()
|
|
payload = jwt.decode(
|
|
token,
|
|
_load_public_key(),
|
|
algorithms=[settings.jwt_algorithm],
|
|
options={"verify_exp": True},
|
|
)
|
|
if payload.get("type") != token_type:
|
|
raise JWTError("Invalid token type")
|
|
return payload
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AES-256-GCM field encryption
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_aes_key() -> bytes:
|
|
"""Derive 32-byte key from hex ENCRYPTION_KEY env var."""
|
|
settings = get_settings()
|
|
key_hex = settings.encryption_key
|
|
key = bytes.fromhex(key_hex)
|
|
if len(key) != 32:
|
|
raise ValueError("ENCRYPTION_KEY must be a 32-byte hex string (64 hex chars)")
|
|
return key
|
|
|
|
|
|
def encrypt_field(plaintext: str) -> bytes:
|
|
"""Encrypt a string field. Returns IV(12) || ciphertext || tag(16) as bytes."""
|
|
if not plaintext:
|
|
return b""
|
|
key = _get_aes_key()
|
|
iv = os.urandom(12)
|
|
aesgcm = AESGCM(key)
|
|
ciphertext_with_tag = aesgcm.encrypt(iv, plaintext.encode(), None)
|
|
return iv + ciphertext_with_tag
|
|
|
|
|
|
def decrypt_field(data: bytes) -> str:
|
|
"""Decrypt bytes produced by encrypt_field."""
|
|
if not data:
|
|
return ""
|
|
key = _get_aes_key()
|
|
iv = data[:12]
|
|
ciphertext_with_tag = data[12:]
|
|
aesgcm = AESGCM(key)
|
|
return aesgcm.decrypt(iv, ciphertext_with_tag, None).decode()
|
|
|
|
|
|
def encrypt_field_b64(plaintext: str) -> str:
|
|
"""Convenience: encrypt and return base64 string (for JSON/text contexts)."""
|
|
return base64.b64encode(encrypt_field(plaintext)).decode()
|
|
|
|
|
|
def decrypt_field_b64(data: str) -> str:
|
|
return decrypt_field(base64.b64decode(data))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TOTP (RFC 6238)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_totp_secret() -> str:
|
|
return pyotp.random_base32()
|
|
|
|
|
|
def get_totp_uri(secret: str, email: str) -> str:
|
|
return pyotp.totp.TOTP(secret).provisioning_uri(
|
|
name=email, issuer_name="MyMidas"
|
|
)
|
|
|
|
|
|
def generate_totp_qr_png(secret: str, email: str) -> bytes:
|
|
uri = get_totp_uri(secret, email)
|
|
img = qrcode.make(uri)
|
|
from io import BytesIO
|
|
buf = BytesIO()
|
|
img.save(buf, format="PNG")
|
|
return buf.getvalue()
|
|
|
|
|
|
def verify_totp(secret: str, code: str) -> bool:
|
|
totp = pyotp.TOTP(secret)
|
|
return totp.verify(code, valid_window=1)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CSRF token
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_csrf_token() -> str:
|
|
return secrets.token_hex(32)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Misc helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def generate_backup_codes(count: int = 8) -> list[str]:
|
|
"""Generate one-time backup codes."""
|
|
return [secrets.token_hex(4).upper() + "-" + secrets.token_hex(4).upper() for _ in range(count)]
|
|
|
|
|
|
def hash_token(token: str) -> str:
|
|
"""SHA-256 hash of a bearer token for DB storage."""
|
|
import hashlib
|
|
return hashlib.sha256(token.encode()).hexdigest()
|