import csv import io import logging import mimetypes import os import uuid from pathlib import Path from typing import Annotated logger = logging.getLogger(__name__) from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile from fastapi.responses import FileResponse from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.core.audit import write_audit from app.dependencies import get_current_user, get_db from app.schemas.transaction import TransactionCreate, TransactionFilter, TransactionUpdate from app.services.transaction_service import ( TransactionError, create_transaction, delete_transaction, get_transaction, import_csv, list_transactions, update_transaction, _to_response, ) MAX_IMPORT_FILE_BYTES = 10 * 1024 * 1024 # 10 MB MAX_IMPORT_ROWS = 50_000 ALLOWED_MIME_TYPES = { "image/jpeg", "image/png", "image/webp", "application/pdf", } ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".pdf"} router = APIRouter() @router.get("") async def get_transactions( account_id: uuid.UUID | None = None, category_id: uuid.UUID | None = None, type: str | None = None, status: str | None = None, date_from: str | None = None, date_to: str | None = None, search: str | None = None, is_recurring: bool | None = None, page: int = Query(default=1, ge=1), page_size: int = Query(default=50, ge=1, le=200), db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): from datetime import date filters = TransactionFilter( account_id=account_id, category_id=category_id, type=type, status=status, date_from=date.fromisoformat(date_from) if date_from else None, date_to=date.fromisoformat(date_to) if date_to else None, search=search, is_recurring=is_recurring, page=page, page_size=page_size, ) return await list_transactions(db, user.id, filters) @router.post("", status_code=201) async def create( body: TransactionCreate, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): try: result = await create_transaction(db, user.id, body, user.base_currency) await write_audit(db, user_id=user.id, action="transaction_create") await db.commit() return result except TransactionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) @router.get("/{txn_id}") async def get_one( txn_id: uuid.UUID, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): try: txn = await get_transaction(db, txn_id, user.id) return _to_response(txn) except TransactionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) @router.put("/{txn_id}") async def update( txn_id: uuid.UUID, body: TransactionUpdate, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): try: result = await update_transaction(db, txn_id, user.id, body, user.base_currency) await write_audit(db, user_id=user.id, action="transaction_update", resource_type="transaction", resource_id=txn_id) await db.commit() return result except TransactionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) @router.delete("/{txn_id}", status_code=204) async def delete( txn_id: uuid.UUID, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): try: await delete_transaction(db, txn_id, user.id) await write_audit(db, user_id=user.id, action="transaction_delete", resource_type="transaction", resource_id=txn_id) await db.commit() except TransactionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) @router.post("/{txn_id}/attachments") async def upload_attachment( txn_id: uuid.UUID, file: UploadFile = File(...), db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): settings = get_settings() # Validate extension filename = file.filename or "upload" ext = Path(filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(status_code=400, detail="Unsupported file type. Allowed: JPG, PNG, WebP, PDF") # Verify transaction ownership try: txn = await get_transaction(db, txn_id, user.id) except TransactionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) current_refs: list = txn.get("attachment_refs", []) if isinstance(txn, dict) else [] # Fetch raw model for JSONB mutation from sqlalchemy import select from app.db.models.transaction import Transaction as TxnModel result = await db.execute( select(TxnModel).where(TxnModel.id == txn_id, TxnModel.user_id == user.id) ) txn_row = result.scalar_one_or_none() if not txn_row: raise HTTPException(status_code=404, detail="Transaction not found") current_refs = list(txn_row.attachment_refs or []) if len(current_refs) >= settings.max_attachments_per_txn: raise HTTPException(status_code=400, detail=f"Maximum {settings.max_attachments_per_txn} attachments per transaction") # Read and size-check content = await file.read(settings.max_attachment_bytes + 1) if len(content) > settings.max_attachment_bytes: raise HTTPException(status_code=413, detail="File too large (max 10 MB)") # Sniff MIME from content import magic # python-magic detected_mime = magic.from_buffer(content[:2048], mime=True) if detected_mime not in ALLOWED_MIME_TYPES: raise HTTPException(status_code=400, detail="File content does not match an allowed type (JPEG, PNG, WebP, PDF)") # Store file attachment_id = str(uuid.uuid4()) user_upload_dir = Path(settings.upload_dir) / str(user.id) user_upload_dir.mkdir(parents=True, exist_ok=True) stored_name = f"{attachment_id}{ext}" stored_path = user_upload_dir / stored_name stored_path.write_bytes(content) # Update attachment_refs ref = { "id": attachment_id, "filename": filename, "mime_type": detected_mime, "size": len(content), "stored_name": stored_name, } from sqlalchemy import update as sql_update import copy new_refs = copy.copy(current_refs) new_refs.append(ref) await db.execute( sql_update(TxnModel) .where(TxnModel.id == txn_id) .values(attachment_refs=new_refs) ) await write_audit(db, user_id=user.id, action="transaction_update", resource_type="transaction", resource_id=txn_id, metadata={"attachment_added": attachment_id}) await db.commit() return ref @router.get("/{txn_id}/attachments/{attachment_id}") async def download_attachment( txn_id: uuid.UUID, attachment_id: str, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): settings = get_settings() from sqlalchemy import select from app.db.models.transaction import Transaction as TxnModel result = await db.execute( select(TxnModel).where(TxnModel.id == txn_id, TxnModel.user_id == user.id) ) txn_row = result.scalar_one_or_none() if not txn_row: raise HTTPException(status_code=404, detail="Transaction not found") ref = next((r for r in (txn_row.attachment_refs or []) if r["id"] == attachment_id), None) if not ref: raise HTTPException(status_code=404, detail="Attachment not found") path = Path(settings.upload_dir) / str(user.id) / ref["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Attachment file missing") return FileResponse( path=str(path), media_type=ref["mime_type"], filename=ref["filename"], ) @router.delete("/{txn_id}/attachments/{attachment_id}", status_code=204) async def delete_attachment( txn_id: uuid.UUID, attachment_id: str, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): settings = get_settings() from sqlalchemy import select, update as sql_update from app.db.models.transaction import Transaction as TxnModel result = await db.execute( select(TxnModel).where(TxnModel.id == txn_id, TxnModel.user_id == user.id) ) txn_row = result.scalar_one_or_none() if not txn_row: raise HTTPException(status_code=404, detail="Transaction not found") refs = list(txn_row.attachment_refs or []) ref = next((r for r in refs if r["id"] == attachment_id), None) if not ref: raise HTTPException(status_code=404, detail="Attachment not found") # Delete file path = Path(settings.upload_dir) / str(user.id) / ref["stored_name"] try: path.unlink(missing_ok=True) except OSError as e: logger.warning("Could not delete attachment file %s: %s", path, e) new_refs = [r for r in refs if r["id"] != attachment_id] await db.execute( sql_update(TxnModel) .where(TxnModel.id == txn_id) .values(attachment_refs=new_refs) ) await write_audit(db, user_id=user.id, action="transaction_update", resource_type="transaction", resource_id=txn_id, metadata={"attachment_deleted": attachment_id}) await db.commit() _RECEIPT_TEXT_PROMPT = ( "You are a receipt parser. Below is the raw text extracted from a receipt via OCR.\n\n" "Receipt text:\n{ocr_text}\n\n" "Extract the information and return ONLY a JSON object with exactly these keys " "(use null for any field you cannot determine):\n" '{{"merchant": "store name", "amount": 0.00, "currency": "GBP", ' '"date": "YYYY-MM-DD", "description": "brief description", ' '"category": "one of: Food & Drink, Transport, Shopping, Entertainment, Health, Travel, Bills & Utilities, Other"}}\n' "Return ONLY the JSON object. No markdown, no explanation, no code fences." ) _EMPTY_RESULT: dict = { "merchant": None, "amount": None, "currency": None, "date": None, "description": None, "category": None, "raw": None, "ocr_text": None, } def _extract_ocr_text(file_bytes: bytes, mime_type: str) -> str: """Extract text from an image or PDF. Returns empty string on failure.""" if mime_type == "application/pdf": import io import pdfplumber try: with pdfplumber.open(io.BytesIO(file_bytes)) as pdf: pages_text = [page.extract_text() or "" for page in pdf.pages[:4]] text = "\n".join(pages_text).strip() if text: return text except Exception as e: logger.warning("pdfplumber text extraction failed: %s", e) # Scanned PDF — convert first page to image then OCR try: from pdf2image import convert_from_bytes import pytesseract images = convert_from_bytes(file_bytes, first_page=1, last_page=1, dpi=200) if images: return pytesseract.image_to_string(images[0]) except Exception as e: logger.warning("pdf2image/tesseract OCR failed: %s", e) return "" else: import io import pytesseract from PIL import Image try: img = Image.open(io.BytesIO(file_bytes)) return pytesseract.image_to_string(img) except Exception as e: logger.warning("Image OCR failed: %s", e) return "" def _rule_based_parse(ocr_text: str) -> dict: """Extract receipt fields from OCR text using regex. Best-effort.""" import re from datetime import datetime lines = [ln.strip() for ln in ocr_text.splitlines() if ln.strip()] # Merchant: skip very short lines and lines that look like addresses/phone numbers merchant = None for ln in lines[:5]: if len(ln) > 2 and not re.match(r"^[\d\s\-\+\(\)]+$", ln) and not re.match(r"^\d+\s+\w+", ln): merchant = ln break # Currency from symbols currency = None if "£" in ocr_text: currency = "GBP" elif "€" in ocr_text: currency = "EUR" elif "$" in ocr_text: currency = "USD" # Amount: prefer lines containing total/amount keywords, then fall back to largest number amount = None total_line_pat = re.compile( r"(?:total|amount\s*due|grand\s*total|balance\s*due|subtotal|net\s*total)" r"[^\d£$€]*([£$€]?\s*\d{1,6}[.,]\d{2})\b", re.IGNORECASE, ) all_amount_pat = re.compile(r"[£$€]?\s*(\d{1,6}[.,]\d{2})\b") for m in total_line_pat.finditer(ocr_text): raw = re.sub(r"[£$€\s]", "", m.group(1)).replace(",", ".") try: amount = float(raw) break except ValueError: pass if amount is None: candidates = [] for m in all_amount_pat.finditer(ocr_text): try: candidates.append(float(m.group(1).replace(",", "."))) except ValueError: pass if candidates: amount = max(candidates) # Date: try common formats date = None date_patterns = [ (r"\b(\d{4}[-/]\d{2}[-/]\d{2})\b", ["%Y-%m-%d", "%Y/%m/%d"]), (r"\b(\d{2}[-/]\d{2}[-/]\d{4})\b", ["%d-%m-%Y", "%d/%m/%Y", "%m/%d/%Y"]), (r"\b(\d{1,2}\s+(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{4})\b", ["%d %B %Y", "%d %b %Y"]), (r"\b((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\.?\s+\d{1,2},?\s+\d{4})\b", ["%B %d, %Y", "%b %d, %Y"]), ] for pattern, fmts in date_patterns: m = re.search(pattern, ocr_text, re.IGNORECASE) if m: raw_date = m.group(1).rstrip(".") for fmt in fmts: try: date = datetime.strptime(raw_date, fmt).strftime("%Y-%m-%d") break except ValueError: pass if date: break description = merchant # simple default return { "merchant": merchant, "amount": amount, "currency": currency, "date": date, "description": description, "category": None, "raw": None, "ocr_text": ocr_text, } def _strip_code_fence(text: str) -> str: if text.startswith("```"): parts = text.split("```") text = parts[1] if len(parts) > 1 else text if text.startswith("json"): text = text[4:] return text.strip() async def _call_ai_parse(file_bytes: bytes, mime_type: str, user_row) -> dict: """ Parse a receipt: OCR text extraction → AI (text prompt) → rule-based fallback. AI is optional; rules always run as fallback if AI is unconfigured or fails. """ import json import httpx from app.core.security import decrypt_field # Step 1: extract text via OCR / PDF text layer ocr_text = _extract_ocr_text(file_bytes, mime_type) has_ai = bool(user_row and user_row.ai_provider and user_row.ai_api_key_enc) # Step 2: attempt AI parse if configured if has_ai and ocr_text.strip(): api_key = decrypt_field(user_row.ai_api_key_enc) custom_base_url = (user_row.ai_base_url or "").rstrip("/") custom_model = (user_row.ai_model or "").strip() prompt = _RECEIPT_TEXT_PROMPT.format(ocr_text=ocr_text) try: if user_row.ai_provider == "anthropic": base_url = custom_base_url or "https://api.anthropic.com" model = custom_model or "claude-haiku-4-5-20251001" async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{base_url}/v1/messages", headers={"x-api-key": api_key, "anthropic-version": "2023-06-01", "content-type": "application/json"}, json={"model": model, "max_tokens": 512, "messages": [{"role": "user", "content": prompt}]}, ) resp.raise_for_status() raw = resp.json()["content"][0]["text"].strip() elif user_row.ai_provider == "openai": base_url = custom_base_url or "https://api.openai.com" model = custom_model or "gpt-4o-mini" async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{base_url}/v1/chat/completions", headers={"Authorization": f"Bearer {api_key}", "content-type": "application/json"}, json={"model": model, "max_tokens": 512, "messages": [{"role": "user", "content": prompt}]}, ) resp.raise_for_status() raw = resp.json()["choices"][0]["message"]["content"].strip() else: raw = None if raw: cleaned = _strip_code_fence(raw) try: parsed = json.loads(cleaned) return { "merchant": parsed.get("merchant"), "amount": parsed.get("amount"), "currency": parsed.get("currency"), "date": parsed.get("date"), "description": parsed.get("description"), "category": parsed.get("category"), "raw": raw, "ocr_text": ocr_text, } except json.JSONDecodeError: logger.warning("AI returned non-JSON response, falling back to rule-based parser") except (httpx.HTTPStatusError, httpx.RequestError) as e: logger.warning("AI API request failed (%s), falling back to rule-based parser", type(e).__name__) # Step 3: rule-based fallback (also used when AI is not configured) if ocr_text.strip(): return _rule_based_parse(ocr_text) # Nothing worked if has_ai: raise HTTPException(status_code=400, detail="Could not extract any text from the file. Try a clearer image.") raise HTTPException(status_code=400, detail="No AI configured and OCR extracted no text. Add an API key in Settings → AI or try a clearer image.") @router.post("/parse-receipt") async def parse_receipt_upload( file: UploadFile = File(...), db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): """Upload a receipt image and parse it with AI — no existing transaction required.""" from app.db.models.user import User as UserModel settings = get_settings() filename = file.filename or "upload" ext = Path(filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(status_code=400, detail="Unsupported file type. Allowed: JPG, PNG, WebP, PDF") content = await file.read(settings.max_attachment_bytes + 1) if len(content) > settings.max_attachment_bytes: raise HTTPException(status_code=413, detail="File too large (max 10 MB)") import magic mime_type = magic.from_buffer(content[:2048], mime=True) if mime_type not in ALLOWED_MIME_TYPES: raise HTTPException(status_code=400, detail="File content does not match an allowed type") user_row = await db.get(UserModel, user.id) return await _call_ai_parse(content, mime_type, user_row) @router.post("/{txn_id}/attachments/{attachment_id}/parse") async def parse_attachment( txn_id: uuid.UUID, attachment_id: str, db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): """Parse an already-uploaded attachment with AI.""" from sqlalchemy import select from app.db.models.transaction import Transaction as TxnModel from app.db.models.user import User as UserModel settings = get_settings() user_row = await db.get(UserModel, user.id) result = await db.execute(select(TxnModel).where(TxnModel.id == txn_id, TxnModel.user_id == user.id)) txn_row = result.scalar_one_or_none() if not txn_row: raise HTTPException(status_code=404, detail="Transaction not found") ref = next((r for r in (txn_row.attachment_refs or []) if r["id"] == attachment_id), None) if not ref: raise HTTPException(status_code=404, detail="Attachment not found") path = Path(settings.upload_dir) / str(user.id) / ref["stored_name"] if not path.exists(): raise HTTPException(status_code=404, detail="Attachment file missing") return await _call_ai_parse(path.read_bytes(), ref["mime_type"], user_row) @router.post("/import") async def import_transactions( file: UploadFile = File(...), account_id: uuid.UUID = Form(...), date_col: str = Form(default="date"), description_col: str = Form(default="description"), amount_col: str = Form(default="amount"), db: AsyncSession = Depends(get_db), user=Depends(get_current_user), ): if not file.filename or not file.filename.lower().endswith(".csv"): raise HTTPException(status_code=400, detail="Only CSV files are supported") content = await file.read(MAX_IMPORT_FILE_BYTES + 1) if len(content) > MAX_IMPORT_FILE_BYTES: raise HTTPException(status_code=413, detail="File too large (max 10 MB)") try: text = content.decode("utf-8-sig") # handle BOM except UnicodeDecodeError: text = content.decode("latin-1") reader = csv.DictReader(io.StringIO(text)) rows = [] for row in reader: if len(rows) >= MAX_IMPORT_ROWS: raise HTTPException(status_code=400, detail=f"File contains too many rows (max {MAX_IMPORT_ROWS:,})") mapped = {} # Flexible column mapping for key, col in [("date", date_col), ("description", description_col), ("amount", amount_col)]: val = row.get(col) or row.get(col.lower()) or row.get(col.upper()) if val is not None: mapped[key] = val.strip() if "date" in mapped and "amount" in mapped: mapped.setdefault("description", "Imported transaction") rows.append(mapped) if not rows: raise HTTPException(status_code=400, detail="No valid rows found. Check column names.") result = await import_csv(db, user.id, account_id, rows, user.base_currency) await write_audit(db, user_id=user.id, action="import_data", metadata=result) await db.commit() return result @router.get("/import/template") async def import_template(): from fastapi.responses import Response csv_content = "date,description,amount,merchant,notes\n2026-01-15,Tesco Groceries,-45.67,Tesco,\n2026-01-14,Salary,2500.00,Employer,January salary\n" return Response( content=csv_content, media_type="text/csv", headers={"Content-Disposition": "attachment; filename=import_template.csv"}, )