""" AES-256-GCM key rotation: decrypt all encrypted fields with OLD key, re-encrypt with NEW key. Run while the application is STOPPED: docker compose exec \ -e ENCRYPTION_KEY="$OLD_ENCRYPTION_KEY" \ -e NEW_ENCRYPTION_KEY="$NEW_ENCRYPTION_KEY" \ backend python -m app.core.key_rotation On success, update ENCRYPTION_KEY in .env to the new value and restart. """ import os import sys import logging from typing import Callable import psycopg2 from cryptography.hazmat.primitives.ciphers.aead import AESGCM logging.basicConfig(level=logging.INFO, format="[rotate] %(message)s") log = logging.getLogger(__name__) def _make_cipher(key_hex: str) -> AESGCM: key = bytes.fromhex(key_hex) if len(key) != 32: raise ValueError("Key must be 32 bytes (64 hex chars)") return AESGCM(key) def _decrypt(cipher: AESGCM, data: bytes) -> bytes: """Return plaintext bytes given IV(12)||ciphertext+tag.""" if not data: return b"" return cipher.decrypt(data[:12], data[12:], None) def _encrypt(cipher: AESGCM, plaintext: bytes) -> bytes: """Encrypt plaintext bytes → IV(12)||ciphertext+tag.""" if not plaintext: return b"" iv = os.urandom(12) return iv + cipher.encrypt(iv, plaintext, None) def _reencrypt(old: AESGCM, new: AESGCM, data: bytes | None) -> bytes | None: if not data: return data plaintext = _decrypt(old, data) return _encrypt(new, plaintext) def _reencrypt_hex(old: AESGCM, new: AESGCM, hex_str: str | None) -> str | None: """For fields stored as hex strings (e.g. totp_secret_enc).""" if not hex_str: return hex_str data = bytes.fromhex(hex_str) plaintext = _decrypt(old, data) return _encrypt(new, plaintext).hex() def rotate(db_url: str, old_key_hex: str, new_key_hex: str) -> None: old = _make_cipher(old_key_hex) new = _make_cipher(new_key_hex) conn = psycopg2.connect(db_url) conn.autocommit = False cur = conn.cursor() try: # ------------------------------------------------------------------ accounts cur.execute("SELECT id, name, institution, notes FROM accounts WHERE deleted_at IS NULL") rows = cur.fetchall() log.info(f"Rotating {len(rows)} account row(s)…") for row_id, name, institution, notes in rows: cur.execute( "UPDATE accounts SET name=%s, institution=%s, notes=%s WHERE id=%s", ( _reencrypt(old, new, bytes(name) if name else None), _reencrypt(old, new, bytes(institution) if institution else None), _reencrypt(old, new, bytes(notes) if notes else None), row_id, ), ) # -------------------------------------------------------------- transactions cur.execute( "SELECT id, description, merchant, notes FROM transactions WHERE deleted_at IS NULL" ) rows = cur.fetchall() log.info(f"Rotating {len(rows)} transaction row(s)…") for row_id, description, merchant, notes in rows: cur.execute( "UPDATE transactions SET description=%s, merchant=%s, notes=%s WHERE id=%s", ( _reencrypt(old, new, bytes(description) if description else None), _reencrypt(old, new, bytes(merchant) if merchant else None), _reencrypt(old, new, bytes(notes) if notes else None), row_id, ), ) # -------------------------------------------------------------------- users cur.execute("SELECT id, totp_secret FROM users WHERE deleted_at IS NULL") rows = cur.fetchall() log.info(f"Rotating {len(rows)} user row(s)…") for row_id, totp_secret in rows: cur.execute( "UPDATE users SET totp_secret=%s WHERE id=%s", (_reencrypt_hex(old, new, totp_secret), row_id), ) conn.commit() log.info("Key rotation complete — all fields re-encrypted.") log.info("Now update ENCRYPTION_KEY in .env and restart the application.") except Exception: conn.rollback() log.exception("Rotation FAILED — rolled back, no data changed.") sys.exit(1) finally: cur.close() conn.close() if __name__ == "__main__": old_key = os.environ.get("ENCRYPTION_KEY", "") new_key = os.environ.get("NEW_ENCRYPTION_KEY", "") db_url = os.environ.get("DATABASE_URL", "").replace("postgresql+asyncpg://", "postgresql://") if not old_key: log.error("ENCRYPTION_KEY (current/old key) is not set") sys.exit(1) if not new_key: log.error("NEW_ENCRYPTION_KEY is not set") sys.exit(1) if not db_url: log.error("DATABASE_URL is not set") sys.exit(1) if old_key == new_key: log.error("NEW_ENCRYPTION_KEY is the same as ENCRYPTION_KEY — nothing to do") sys.exit(1) rotate(db_url, old_key, new_key)