#!/usr/bin/env python3 """ AES-256-GCM encryption key rotation. Re-encrypts all PII fields with a new key in a single atomic transaction. If anything fails the database is left unchanged. Usage (from inside the backend container): python /app/scripts/rotate_keys.py --old-key <64-hex> --new-key <64-hex> --dry-run Decrypt and re-encrypt in memory only, do not write to DB. After the script reports success: 1. Update ENCRYPTION_KEY in your .env file to the new key. 2. Restart the backend: docker compose up -d backend """ import argparse import asyncio import os import sys from os import urandom from cryptography.hazmat.primitives.ciphers.aead import AESGCM # --------------------------------------------------------------------------- # Crypto helpers (standalone, no dependency on app code) # --------------------------------------------------------------------------- def _make_decrypt(key_hex: str): key = bytes.fromhex(key_hex) if len(key) != 32: sys.exit("ERROR: key must be 32 bytes (64 hex characters)") aesgcm = AESGCM(key) def decrypt(data: bytes) -> str: if not data: return "" iv, ct = data[:12], data[12:] return aesgcm.decrypt(iv, ct, None).decode() return decrypt def _make_encrypt(key_hex: str): key = bytes.fromhex(key_hex) if len(key) != 32: sys.exit("ERROR: key must be 32 bytes (64 hex characters)") aesgcm = AESGCM(key) def encrypt(plaintext: str) -> bytes: if not plaintext: return b"" iv = urandom(12) return iv + aesgcm.encrypt(iv, plaintext.encode(), None) return encrypt def _rotate_bytes(data: bytes | None, decrypt, encrypt) -> bytes | None: """Decrypt with old key, re-encrypt with new key. None/empty passes through.""" if not data: return data return encrypt(decrypt(data)) def _rotate_hex(hex_str: str | None, decrypt, encrypt) -> str | None: """Same as _rotate_bytes but field is stored as hex text (TOTP secret).""" if not hex_str: return hex_str return encrypt(decrypt(bytes.fromhex(hex_str))).hex() # --------------------------------------------------------------------------- # Main rotation logic # --------------------------------------------------------------------------- async def rotate(old_key: str, new_key: str, dry_run: bool) -> None: import asyncpg decrypt = _make_decrypt(old_key) encrypt = _make_encrypt(new_key) db_url = os.environ.get("DATABASE_URL", "") if not db_url: sys.exit("ERROR: DATABASE_URL environment variable not set") pg_url = db_url.replace("postgresql+asyncpg://", "postgresql://") print(f"Connecting to database...") conn = await asyncpg.connect(pg_url) try: async with conn.transaction(): # ── accounts ────────────────────────────────────────────────── print("Rotating accounts (name, institution, notes)...") rows = await conn.fetch( "SELECT id, name, institution, notes FROM accounts" ) updated = 0 for row in rows: new_name = _rotate_bytes(row["name"], decrypt, encrypt) new_inst = _rotate_bytes(row["institution"], decrypt, encrypt) new_notes = _rotate_bytes(row["notes"], decrypt, encrypt) if not dry_run: await conn.execute( "UPDATE accounts SET name=$1, institution=$2, notes=$3 WHERE id=$4", new_name, new_inst, new_notes, row["id"], ) updated += 1 print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} accounts") # ── transactions ────────────────────────────────────────────── print("Rotating transactions (description, merchant, notes)...") rows = await conn.fetch( "SELECT id, description, merchant, notes FROM transactions" ) updated = 0 for row in rows: new_desc = _rotate_bytes(row["description"], decrypt, encrypt) new_merch = _rotate_bytes(row["merchant"], decrypt, encrypt) new_notes = _rotate_bytes(row["notes"], decrypt, encrypt) if not dry_run: await conn.execute( "UPDATE transactions SET description=$1, merchant=$2, notes=$3 WHERE id=$4", new_desc, new_merch, new_notes, row["id"], ) updated += 1 print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} transactions") # ── investment transactions ──────────────────────────────────── print("Rotating investment transaction notes...") rows = await conn.fetch( "SELECT id, notes FROM investment_transactions WHERE notes IS NOT NULL" ) updated = 0 for row in rows: new_notes = _rotate_bytes(row["notes"], decrypt, encrypt) if not dry_run: await conn.execute( "UPDATE investment_transactions SET notes=$1 WHERE id=$2", new_notes, row["id"], ) updated += 1 print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} investment transaction notes") # ── users — TOTP secret (hex-encoded) ───────────────────────── print("Rotating user TOTP secrets...") rows = await conn.fetch( "SELECT id, totp_secret FROM users WHERE totp_secret IS NOT NULL" ) updated = 0 for row in rows: new_secret = _rotate_hex(row["totp_secret"], decrypt, encrypt) if not dry_run: await conn.execute( "UPDATE users SET totp_secret=$1 WHERE id=$2", new_secret, row["id"], ) updated += 1 print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} TOTP secrets") if dry_run: raise _DryRunAbort() except _DryRunAbort: print("\nDry-run complete — no changes written.") return finally: await conn.close() print("\n✓ Key rotation complete.") print("\nNext steps:") print(" 1. Update ENCRYPTION_KEY in your .env to the new key.") print(" 2. Restart the backend: docker compose up -d backend") class _DryRunAbort(Exception): """Raised inside the transaction to trigger an asyncpg rollback in dry-run mode.""" def main(): parser = argparse.ArgumentParser(description="Rotate AES-256-GCM encryption key") parser.add_argument("--old-key", required=True, help="Current key as 64-char hex") parser.add_argument("--new-key", required=True, help="New key as 64-char hex") parser.add_argument("--dry-run", action="store_true", help="Validate only, do not write") args = parser.parse_args() if args.old_key == args.new_key: sys.exit("ERROR: old key and new key are identical — nothing to do") print(f"{'DRY RUN — ' if args.dry_run else ''}AES key rotation starting...") asyncio.run(rotate(args.old_key, args.new_key, args.dry_run)) if __name__ == "__main__": main()