MyMidas/backend/app/core/key_rotation.py
megaproxy 61a7884ee5 Initial commit: MyMidas personal finance tracker
Full-stack self-hosted finance app with FastAPI backend and React frontend.

Features:
- Accounts, transactions, budgets, investments with GBP base currency
- CSV import with auto-detection for 10 UK bank formats
- ML predictions: spending forecast, net worth projection, Monte Carlo
- 7 selectable themes (Obsidian, Arctic, Midnight, Vault, Terminal, Synthwave, Ledger)
- Receipt/document attachments on transactions (JPEG, PNG, WebP, PDF)
- AES-256-GCM field encryption, RS256 JWT, TOTP 2FA, RLS, audit log
- Encrypted nightly backups + key rotation script
- Mobile-responsive layout with bottom nav

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 11:56:10 +00:00

144 lines
5 KiB
Python

"""
AES-256-GCM key rotation: decrypt all encrypted fields with OLD key, re-encrypt with NEW key.
Run while the application is STOPPED:
docker compose exec \
-e ENCRYPTION_KEY="$OLD_ENCRYPTION_KEY" \
-e NEW_ENCRYPTION_KEY="$NEW_ENCRYPTION_KEY" \
backend python -m app.core.key_rotation
On success, update ENCRYPTION_KEY in .env to the new value and restart.
"""
import os
import sys
import logging
from typing import Callable
import psycopg2
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
logging.basicConfig(level=logging.INFO, format="[rotate] %(message)s")
log = logging.getLogger(__name__)
def _make_cipher(key_hex: str) -> AESGCM:
key = bytes.fromhex(key_hex)
if len(key) != 32:
raise ValueError("Key must be 32 bytes (64 hex chars)")
return AESGCM(key)
def _decrypt(cipher: AESGCM, data: bytes) -> bytes:
"""Return plaintext bytes given IV(12)||ciphertext+tag."""
if not data:
return b""
return cipher.decrypt(data[:12], data[12:], None)
def _encrypt(cipher: AESGCM, plaintext: bytes) -> bytes:
"""Encrypt plaintext bytes → IV(12)||ciphertext+tag."""
if not plaintext:
return b""
iv = os.urandom(12)
return iv + cipher.encrypt(iv, plaintext, None)
def _reencrypt(old: AESGCM, new: AESGCM, data: bytes | None) -> bytes | None:
if not data:
return data
plaintext = _decrypt(old, data)
return _encrypt(new, plaintext)
def _reencrypt_hex(old: AESGCM, new: AESGCM, hex_str: str | None) -> str | None:
"""For fields stored as hex strings (e.g. totp_secret_enc)."""
if not hex_str:
return hex_str
data = bytes.fromhex(hex_str)
plaintext = _decrypt(old, data)
return _encrypt(new, plaintext).hex()
def rotate(db_url: str, old_key_hex: str, new_key_hex: str) -> None:
old = _make_cipher(old_key_hex)
new = _make_cipher(new_key_hex)
conn = psycopg2.connect(db_url)
conn.autocommit = False
cur = conn.cursor()
try:
# ------------------------------------------------------------------ accounts
cur.execute("SELECT id, name, institution, notes FROM accounts WHERE deleted_at IS NULL")
rows = cur.fetchall()
log.info(f"Rotating {len(rows)} account row(s)…")
for row_id, name, institution, notes in rows:
cur.execute(
"UPDATE accounts SET name=%s, institution=%s, notes=%s WHERE id=%s",
(
_reencrypt(old, new, bytes(name) if name else None),
_reencrypt(old, new, bytes(institution) if institution else None),
_reencrypt(old, new, bytes(notes) if notes else None),
row_id,
),
)
# -------------------------------------------------------------- transactions
cur.execute(
"SELECT id, description, merchant, notes FROM transactions WHERE deleted_at IS NULL"
)
rows = cur.fetchall()
log.info(f"Rotating {len(rows)} transaction row(s)…")
for row_id, description, merchant, notes in rows:
cur.execute(
"UPDATE transactions SET description=%s, merchant=%s, notes=%s WHERE id=%s",
(
_reencrypt(old, new, bytes(description) if description else None),
_reencrypt(old, new, bytes(merchant) if merchant else None),
_reencrypt(old, new, bytes(notes) if notes else None),
row_id,
),
)
# -------------------------------------------------------------------- users
cur.execute("SELECT id, totp_secret FROM users WHERE deleted_at IS NULL")
rows = cur.fetchall()
log.info(f"Rotating {len(rows)} user row(s)…")
for row_id, totp_secret in rows:
cur.execute(
"UPDATE users SET totp_secret=%s WHERE id=%s",
(_reencrypt_hex(old, new, totp_secret), row_id),
)
conn.commit()
log.info("Key rotation complete — all fields re-encrypted.")
log.info("Now update ENCRYPTION_KEY in .env and restart the application.")
except Exception:
conn.rollback()
log.exception("Rotation FAILED — rolled back, no data changed.")
sys.exit(1)
finally:
cur.close()
conn.close()
if __name__ == "__main__":
old_key = os.environ.get("ENCRYPTION_KEY", "")
new_key = os.environ.get("NEW_ENCRYPTION_KEY", "")
db_url = os.environ.get("DATABASE_URL", "").replace("postgresql+asyncpg://", "postgresql://")
if not old_key:
log.error("ENCRYPTION_KEY (current/old key) is not set")
sys.exit(1)
if not new_key:
log.error("NEW_ENCRYPTION_KEY is not set")
sys.exit(1)
if not db_url:
log.error("DATABASE_URL is not set")
sys.exit(1)
if old_key == new_key:
log.error("NEW_ENCRYPTION_KEY is the same as ENCRYPTION_KEY — nothing to do")
sys.exit(1)
rotate(db_url, old_key, new_key)