MyMidas/backend/scripts/rotate_keys.py
megaproxy fe4e69b9ad Complete Phase 3, Phase 5 polish and hardening
Phase 3 — Investments:
- Multi-currency support: holdings track purchase currency, FX rates convert to base for totals
- Capital gains report using UK Section 104 pool method, grouped by tax year
- Capital Gains tab added to Reports page

Phase 5 — Polish & Hardening:
- Mobile-responsive layout: bottom nav, sidebar hidden on mobile, logo in TopBar, compact header buttons, hover-only actions now always visible on touch
- Backup system: encrypted GPG backups via backup.sh, nightly scheduler job, admin API (list/trigger/download/restore), Settings UI with drag-to-restore confirmation
- Docker entrypoint with gosu privilege drop to fix bind-mount ownership on fresh deployments
- OWASP fixes: refresh token now bound to its session (new refresh_token_hash column + migration), CSRF secure flag tied to environment, IP-level rate limiting on login, TOTPEnableRequest Pydantic schema replaces raw dict
- AES-256-GCM key rotation script (rotate_keys.py) with dry-run mode and atomic DB transaction
- CLAUDE.md added for AI-assisted development context
- README updated: correct reverse proxy port, accurate backup/restore commands, key rotation instructions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 14:59:11 +00:00

195 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
AES-256-GCM encryption key rotation.
Re-encrypts all PII fields with a new key in a single atomic transaction.
If anything fails the database is left unchanged.
Usage (from inside the backend container):
python /app/scripts/rotate_keys.py --old-key <64-hex> --new-key <64-hex>
--dry-run Decrypt and re-encrypt in memory only, do not write to DB.
After the script reports success:
1. Update ENCRYPTION_KEY in your .env file to the new key.
2. Restart the backend: docker compose up -d backend
"""
import argparse
import asyncio
import os
import sys
from os import urandom
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# ---------------------------------------------------------------------------
# Crypto helpers (standalone, no dependency on app code)
# ---------------------------------------------------------------------------
def _make_decrypt(key_hex: str):
key = bytes.fromhex(key_hex)
if len(key) != 32:
sys.exit("ERROR: key must be 32 bytes (64 hex characters)")
aesgcm = AESGCM(key)
def decrypt(data: bytes) -> str:
if not data:
return ""
iv, ct = data[:12], data[12:]
return aesgcm.decrypt(iv, ct, None).decode()
return decrypt
def _make_encrypt(key_hex: str):
key = bytes.fromhex(key_hex)
if len(key) != 32:
sys.exit("ERROR: key must be 32 bytes (64 hex characters)")
aesgcm = AESGCM(key)
def encrypt(plaintext: str) -> bytes:
if not plaintext:
return b""
iv = urandom(12)
return iv + aesgcm.encrypt(iv, plaintext.encode(), None)
return encrypt
def _rotate_bytes(data: bytes | None, decrypt, encrypt) -> bytes | None:
"""Decrypt with old key, re-encrypt with new key. None/empty passes through."""
if not data:
return data
return encrypt(decrypt(data))
def _rotate_hex(hex_str: str | None, decrypt, encrypt) -> str | None:
"""Same as _rotate_bytes but field is stored as hex text (TOTP secret)."""
if not hex_str:
return hex_str
return encrypt(decrypt(bytes.fromhex(hex_str))).hex()
# ---------------------------------------------------------------------------
# Main rotation logic
# ---------------------------------------------------------------------------
async def rotate(old_key: str, new_key: str, dry_run: bool) -> None:
import asyncpg
decrypt = _make_decrypt(old_key)
encrypt = _make_encrypt(new_key)
db_url = os.environ.get("DATABASE_URL", "")
if not db_url:
sys.exit("ERROR: DATABASE_URL environment variable not set")
pg_url = db_url.replace("postgresql+asyncpg://", "postgresql://")
print(f"Connecting to database...")
conn = await asyncpg.connect(pg_url)
try:
async with conn.transaction():
# ── accounts ──────────────────────────────────────────────────
print("Rotating accounts (name, institution, notes)...")
rows = await conn.fetch(
"SELECT id, name, institution, notes FROM accounts"
)
updated = 0
for row in rows:
new_name = _rotate_bytes(row["name"], decrypt, encrypt)
new_inst = _rotate_bytes(row["institution"], decrypt, encrypt)
new_notes = _rotate_bytes(row["notes"], decrypt, encrypt)
if not dry_run:
await conn.execute(
"UPDATE accounts SET name=$1, institution=$2, notes=$3 WHERE id=$4",
new_name, new_inst, new_notes, row["id"],
)
updated += 1
print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} accounts")
# ── transactions ──────────────────────────────────────────────
print("Rotating transactions (description, merchant, notes)...")
rows = await conn.fetch(
"SELECT id, description, merchant, notes FROM transactions"
)
updated = 0
for row in rows:
new_desc = _rotate_bytes(row["description"], decrypt, encrypt)
new_merch = _rotate_bytes(row["merchant"], decrypt, encrypt)
new_notes = _rotate_bytes(row["notes"], decrypt, encrypt)
if not dry_run:
await conn.execute(
"UPDATE transactions SET description=$1, merchant=$2, notes=$3 WHERE id=$4",
new_desc, new_merch, new_notes, row["id"],
)
updated += 1
print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} transactions")
# ── investment transactions ────────────────────────────────────
print("Rotating investment transaction notes...")
rows = await conn.fetch(
"SELECT id, notes FROM investment_transactions WHERE notes IS NOT NULL"
)
updated = 0
for row in rows:
new_notes = _rotate_bytes(row["notes"], decrypt, encrypt)
if not dry_run:
await conn.execute(
"UPDATE investment_transactions SET notes=$1 WHERE id=$2",
new_notes, row["id"],
)
updated += 1
print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} investment transaction notes")
# ── users — TOTP secret (hex-encoded) ─────────────────────────
print("Rotating user TOTP secrets...")
rows = await conn.fetch(
"SELECT id, totp_secret FROM users WHERE totp_secret IS NOT NULL"
)
updated = 0
for row in rows:
new_secret = _rotate_hex(row["totp_secret"], decrypt, encrypt)
if not dry_run:
await conn.execute(
"UPDATE users SET totp_secret=$1 WHERE id=$2",
new_secret, row["id"],
)
updated += 1
print(f" {'(dry-run) would rotate' if dry_run else 'rotated'} {updated} TOTP secrets")
if dry_run:
raise _DryRunAbort()
except _DryRunAbort:
print("\nDry-run complete — no changes written.")
return
finally:
await conn.close()
print("\n✓ Key rotation complete.")
print("\nNext steps:")
print(" 1. Update ENCRYPTION_KEY in your .env to the new key.")
print(" 2. Restart the backend: docker compose up -d backend")
class _DryRunAbort(Exception):
"""Raised inside the transaction to trigger an asyncpg rollback in dry-run mode."""
def main():
parser = argparse.ArgumentParser(description="Rotate AES-256-GCM encryption key")
parser.add_argument("--old-key", required=True, help="Current key as 64-char hex")
parser.add_argument("--new-key", required=True, help="New key as 64-char hex")
parser.add_argument("--dry-run", action="store_true", help="Validate only, do not write")
args = parser.parse_args()
if args.old_key == args.new_key:
sys.exit("ERROR: old key and new key are identical — nothing to do")
print(f"{'DRY RUN — ' if args.dry_run else ''}AES key rotation starting...")
asyncio.run(rotate(args.old_key, args.new_key, args.dry_run))
if __name__ == "__main__":
main()