Full-stack self-hosted finance app with FastAPI backend and React frontend. Features: - Accounts, transactions, budgets, investments with GBP base currency - CSV import with auto-detection for 10 UK bank formats - ML predictions: spending forecast, net worth projection, Monte Carlo - 7 selectable themes (Obsidian, Arctic, Midnight, Vault, Terminal, Synthwave, Ledger) - Receipt/document attachments on transactions (JPEG, PNG, WebP, PDF) - AES-256-GCM field encryption, RS256 JWT, TOTP 2FA, RLS, audit log - Encrypted nightly backups + key rotation script - Mobile-responsive layout with bottom nav Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
144 lines
5 KiB
Python
144 lines
5 KiB
Python
"""
|
|
AES-256-GCM key rotation: decrypt all encrypted fields with OLD key, re-encrypt with NEW key.
|
|
|
|
Run while the application is STOPPED:
|
|
docker compose exec \
|
|
-e ENCRYPTION_KEY="$OLD_ENCRYPTION_KEY" \
|
|
-e NEW_ENCRYPTION_KEY="$NEW_ENCRYPTION_KEY" \
|
|
backend python -m app.core.key_rotation
|
|
|
|
On success, update ENCRYPTION_KEY in .env to the new value and restart.
|
|
"""
|
|
import os
|
|
import sys
|
|
import logging
|
|
from typing import Callable
|
|
|
|
import psycopg2
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
logging.basicConfig(level=logging.INFO, format="[rotate] %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _make_cipher(key_hex: str) -> AESGCM:
|
|
key = bytes.fromhex(key_hex)
|
|
if len(key) != 32:
|
|
raise ValueError("Key must be 32 bytes (64 hex chars)")
|
|
return AESGCM(key)
|
|
|
|
|
|
def _decrypt(cipher: AESGCM, data: bytes) -> bytes:
|
|
"""Return plaintext bytes given IV(12)||ciphertext+tag."""
|
|
if not data:
|
|
return b""
|
|
return cipher.decrypt(data[:12], data[12:], None)
|
|
|
|
|
|
def _encrypt(cipher: AESGCM, plaintext: bytes) -> bytes:
|
|
"""Encrypt plaintext bytes → IV(12)||ciphertext+tag."""
|
|
if not plaintext:
|
|
return b""
|
|
iv = os.urandom(12)
|
|
return iv + cipher.encrypt(iv, plaintext, None)
|
|
|
|
|
|
def _reencrypt(old: AESGCM, new: AESGCM, data: bytes | None) -> bytes | None:
|
|
if not data:
|
|
return data
|
|
plaintext = _decrypt(old, data)
|
|
return _encrypt(new, plaintext)
|
|
|
|
|
|
def _reencrypt_hex(old: AESGCM, new: AESGCM, hex_str: str | None) -> str | None:
|
|
"""For fields stored as hex strings (e.g. totp_secret_enc)."""
|
|
if not hex_str:
|
|
return hex_str
|
|
data = bytes.fromhex(hex_str)
|
|
plaintext = _decrypt(old, data)
|
|
return _encrypt(new, plaintext).hex()
|
|
|
|
|
|
def rotate(db_url: str, old_key_hex: str, new_key_hex: str) -> None:
|
|
old = _make_cipher(old_key_hex)
|
|
new = _make_cipher(new_key_hex)
|
|
|
|
conn = psycopg2.connect(db_url)
|
|
conn.autocommit = False
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
# ------------------------------------------------------------------ accounts
|
|
cur.execute("SELECT id, name, institution, notes FROM accounts WHERE deleted_at IS NULL")
|
|
rows = cur.fetchall()
|
|
log.info(f"Rotating {len(rows)} account row(s)…")
|
|
for row_id, name, institution, notes in rows:
|
|
cur.execute(
|
|
"UPDATE accounts SET name=%s, institution=%s, notes=%s WHERE id=%s",
|
|
(
|
|
_reencrypt(old, new, bytes(name) if name else None),
|
|
_reencrypt(old, new, bytes(institution) if institution else None),
|
|
_reencrypt(old, new, bytes(notes) if notes else None),
|
|
row_id,
|
|
),
|
|
)
|
|
|
|
# -------------------------------------------------------------- transactions
|
|
cur.execute(
|
|
"SELECT id, description, merchant, notes FROM transactions WHERE deleted_at IS NULL"
|
|
)
|
|
rows = cur.fetchall()
|
|
log.info(f"Rotating {len(rows)} transaction row(s)…")
|
|
for row_id, description, merchant, notes in rows:
|
|
cur.execute(
|
|
"UPDATE transactions SET description=%s, merchant=%s, notes=%s WHERE id=%s",
|
|
(
|
|
_reencrypt(old, new, bytes(description) if description else None),
|
|
_reencrypt(old, new, bytes(merchant) if merchant else None),
|
|
_reencrypt(old, new, bytes(notes) if notes else None),
|
|
row_id,
|
|
),
|
|
)
|
|
|
|
# -------------------------------------------------------------------- users
|
|
cur.execute("SELECT id, totp_secret FROM users WHERE deleted_at IS NULL")
|
|
rows = cur.fetchall()
|
|
log.info(f"Rotating {len(rows)} user row(s)…")
|
|
for row_id, totp_secret in rows:
|
|
cur.execute(
|
|
"UPDATE users SET totp_secret=%s WHERE id=%s",
|
|
(_reencrypt_hex(old, new, totp_secret), row_id),
|
|
)
|
|
|
|
conn.commit()
|
|
log.info("Key rotation complete — all fields re-encrypted.")
|
|
log.info("Now update ENCRYPTION_KEY in .env and restart the application.")
|
|
|
|
except Exception:
|
|
conn.rollback()
|
|
log.exception("Rotation FAILED — rolled back, no data changed.")
|
|
sys.exit(1)
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
old_key = os.environ.get("ENCRYPTION_KEY", "")
|
|
new_key = os.environ.get("NEW_ENCRYPTION_KEY", "")
|
|
db_url = os.environ.get("DATABASE_URL", "").replace("postgresql+asyncpg://", "postgresql://")
|
|
|
|
if not old_key:
|
|
log.error("ENCRYPTION_KEY (current/old key) is not set")
|
|
sys.exit(1)
|
|
if not new_key:
|
|
log.error("NEW_ENCRYPTION_KEY is not set")
|
|
sys.exit(1)
|
|
if not db_url:
|
|
log.error("DATABASE_URL is not set")
|
|
sys.exit(1)
|
|
if old_key == new_key:
|
|
log.error("NEW_ENCRYPTION_KEY is the same as ENCRYPTION_KEY — nothing to do")
|
|
sys.exit(1)
|
|
|
|
rotate(db_url, old_key, new_key)
|