import json import logging from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from core.database import get_session from services.alarm_engine import invalidate_threshold_cache from services.seed import THRESHOLD_SEED_DATA, DEFAULT_SETTINGS, SITE_ID as DEFAULT_SITE router = APIRouter() logger = logging.getLogger(__name__) # ── Pydantic models ──────────────────────────────────────────────────────────── class SensorCreate(BaseModel): device_id: str name: str device_type: str room_id: str | None = None rack_id: str | None = None protocol: str = "mqtt" protocol_config: dict[str, Any] = {} enabled: bool = True class SensorUpdate(BaseModel): name: str | None = None device_type: str | None = None room_id: str | None = None rack_id: str | None = None protocol: str | None = None protocol_config: dict[str, Any] | None = None enabled: bool | None = None class ThresholdUpdate(BaseModel): threshold_value: float | None = None severity: str | None = None enabled: bool | None = None class ThresholdCreate(BaseModel): sensor_type: str threshold_value: float direction: str severity: str message_template: str class SettingsUpdate(BaseModel): value: dict[str, Any] # ── Sensors ──────────────────────────────────────────────────────────────────── @router.get("/sensors") async def list_sensors( site_id: str = Query(DEFAULT_SITE), device_type: str | None = Query(None), room_id: str | None = Query(None), protocol: str | None = Query(None), session: AsyncSession = Depends(get_session), ): """List all sensor devices, with optional filters.""" conditions = ["site_id = :site_id"] params: dict = {"site_id": site_id} if device_type: conditions.append("device_type = :device_type") params["device_type"] = device_type if room_id: conditions.append("room_id = :room_id") params["room_id"] = room_id if protocol: conditions.append("protocol = :protocol") params["protocol"] = protocol where = " AND ".join(conditions) result = await session.execute(text(f""" SELECT id, site_id, device_id, name, device_type, room_id, rack_id, protocol, protocol_config, enabled, created_at, updated_at FROM sensors WHERE {where} ORDER BY device_type, room_id NULLS LAST, device_id """), params) return [dict(r) for r in result.mappings().all()] @router.post("/sensors", status_code=201) async def create_sensor( body: SensorCreate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): """Register a new sensor device.""" result = await session.execute(text(""" INSERT INTO sensors (site_id, device_id, name, device_type, room_id, rack_id, protocol, protocol_config, enabled) VALUES (:site_id, :device_id, :name, :device_type, :room_id, :rack_id, :protocol, :protocol_config, :enabled) RETURNING id, site_id, device_id, name, device_type, room_id, rack_id, protocol, protocol_config, enabled, created_at, updated_at """), { "site_id": site_id, "device_id": body.device_id, "name": body.name, "device_type": body.device_type, "room_id": body.room_id, "rack_id": body.rack_id, "protocol": body.protocol, "protocol_config": json.dumps(body.protocol_config), "enabled": body.enabled, }) await session.commit() return dict(result.mappings().first()) @router.get("/sensors/{sensor_id}") async def get_sensor( sensor_id: int, session: AsyncSession = Depends(get_session), ): """Get a single sensor device plus its most recent readings.""" result = await session.execute(text(""" SELECT id, site_id, device_id, name, device_type, room_id, rack_id, protocol, protocol_config, enabled, created_at, updated_at FROM sensors WHERE id = :id """), {"id": sensor_id}) row = result.mappings().first() if not row: raise HTTPException(status_code=404, detail="Sensor not found") sensor = dict(row) # Fetch latest readings for this device readings_result = await session.execute(text(""" SELECT DISTINCT ON (sensor_type) sensor_type, value, unit, recorded_at FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND recorded_at > NOW() - INTERVAL '10 minutes' ORDER BY sensor_type, recorded_at DESC """), { "site_id": sensor["site_id"], "pattern": f"{sensor['site_id']}%{sensor['device_id']}%", }) sensor["recent_readings"] = [dict(r) for r in readings_result.mappings().all()] return sensor @router.put("/sensors/{sensor_id}") async def update_sensor( sensor_id: int, body: SensorUpdate, session: AsyncSession = Depends(get_session), ): """Update a sensor device's config or toggle enabled.""" updates = [] params: dict = {"id": sensor_id} if body.name is not None: updates.append("name = :name") params["name"] = body.name if body.device_type is not None: updates.append("device_type = :device_type") params["device_type"] = body.device_type if body.room_id is not None: updates.append("room_id = :room_id") params["room_id"] = body.room_id if body.rack_id is not None: updates.append("rack_id = :rack_id") params["rack_id"] = body.rack_id if body.protocol is not None: updates.append("protocol = :protocol") params["protocol"] = body.protocol if body.protocol_config is not None: updates.append("protocol_config = :protocol_config") params["protocol_config"] = json.dumps(body.protocol_config) if body.enabled is not None: updates.append("enabled = :enabled") params["enabled"] = body.enabled if not updates: raise HTTPException(status_code=400, detail="No fields to update") updates.append("updated_at = NOW()") set_clause = ", ".join(updates) result = await session.execute(text(f""" UPDATE sensors SET {set_clause} WHERE id = :id RETURNING id, site_id, device_id, name, device_type, room_id, rack_id, protocol, protocol_config, enabled, created_at, updated_at """), params) row = result.mappings().first() if not row: raise HTTPException(status_code=404, detail="Sensor not found") await session.commit() return dict(row) @router.delete("/sensors/{sensor_id}", status_code=204) async def delete_sensor( sensor_id: int, session: AsyncSession = Depends(get_session), ): """Remove a sensor device from the registry.""" result = await session.execute( text("DELETE FROM sensors WHERE id = :id RETURNING id"), {"id": sensor_id}, ) if not result.fetchone(): raise HTTPException(status_code=404, detail="Sensor not found") await session.commit() # ── Alarm thresholds ─────────────────────────────────────────────────────────── @router.get("/thresholds") async def list_thresholds( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): """Return all user-editable threshold rules (locked=false).""" result = await session.execute(text(""" SELECT id, site_id, sensor_type, threshold_value, direction, severity, message_template, enabled, locked, created_at, updated_at FROM alarm_thresholds WHERE site_id = :site_id AND locked = false ORDER BY id """), {"site_id": site_id}) return [dict(r) for r in result.mappings().all()] @router.put("/thresholds/{threshold_id}") async def update_threshold( threshold_id: int, body: ThresholdUpdate, session: AsyncSession = Depends(get_session), ): """Update a threshold value, severity, or enabled state.""" # Refuse to update locked rules locked_result = await session.execute( text("SELECT locked, site_id FROM alarm_thresholds WHERE id = :id"), {"id": threshold_id}, ) row = locked_result.mappings().first() if not row: raise HTTPException(status_code=404, detail="Threshold not found") if row["locked"]: raise HTTPException(status_code=403, detail="Cannot modify locked threshold") updates = [] params: dict = {"id": threshold_id} if body.threshold_value is not None: updates.append("threshold_value = :threshold_value") params["threshold_value"] = body.threshold_value if body.severity is not None: if body.severity not in ("warning", "critical"): raise HTTPException(status_code=400, detail="severity must be warning or critical") updates.append("severity = :severity") params["severity"] = body.severity if body.enabled is not None: updates.append("enabled = :enabled") params["enabled"] = body.enabled if not updates: raise HTTPException(status_code=400, detail="No fields to update") updates.append("updated_at = NOW()") set_clause = ", ".join(updates) result = await session.execute(text(f""" UPDATE alarm_thresholds SET {set_clause} WHERE id = :id RETURNING id, site_id, sensor_type, threshold_value, direction, severity, message_template, enabled, locked, updated_at """), params) await session.commit() invalidate_threshold_cache(row["site_id"]) return dict(result.mappings().first()) @router.post("/thresholds", status_code=201) async def create_threshold( body: ThresholdCreate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): """Add a custom threshold rule.""" if body.direction not in ("above", "below"): raise HTTPException(status_code=400, detail="direction must be above or below") if body.severity not in ("warning", "critical"): raise HTTPException(status_code=400, detail="severity must be warning or critical") result = await session.execute(text(""" INSERT INTO alarm_thresholds (site_id, sensor_type, threshold_value, direction, severity, message_template, enabled, locked) VALUES (:site_id, :sensor_type, :threshold_value, :direction, :severity, :message_template, true, false) RETURNING id, site_id, sensor_type, threshold_value, direction, severity, message_template, enabled, locked, created_at, updated_at """), { "site_id": site_id, "sensor_type": body.sensor_type, "threshold_value": body.threshold_value, "direction": body.direction, "severity": body.severity, "message_template": body.message_template, }) await session.commit() invalidate_threshold_cache(site_id) return dict(result.mappings().first()) @router.delete("/thresholds/{threshold_id}", status_code=204) async def delete_threshold( threshold_id: int, session: AsyncSession = Depends(get_session), ): """Delete a custom (non-locked) threshold rule.""" locked_result = await session.execute( text("SELECT locked, site_id FROM alarm_thresholds WHERE id = :id"), {"id": threshold_id}, ) row = locked_result.mappings().first() if not row: raise HTTPException(status_code=404, detail="Threshold not found") if row["locked"]: raise HTTPException(status_code=403, detail="Cannot delete locked threshold") await session.execute( text("DELETE FROM alarm_thresholds WHERE id = :id"), {"id": threshold_id}, ) await session.commit() invalidate_threshold_cache(row["site_id"]) @router.post("/thresholds/reset") async def reset_thresholds( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): """Delete all thresholds for a site and re-seed from defaults.""" await session.execute( text("DELETE FROM alarm_thresholds WHERE site_id = :site_id"), {"site_id": site_id}, ) for st, tv, direction, severity, msg, locked in THRESHOLD_SEED_DATA: await session.execute(text(""" INSERT INTO alarm_thresholds (site_id, sensor_type, threshold_value, direction, severity, message_template, enabled, locked) VALUES (:site_id, :sensor_type, :threshold_value, :direction, :severity, :message_template, true, :locked) """), { "site_id": site_id, "sensor_type": st, "threshold_value": tv, "direction": direction, "severity": severity, "message_template": msg, "locked": locked, }) await session.commit() invalidate_threshold_cache(site_id) logger.info(f"Alarm thresholds reset to defaults for {site_id}") return {"ok": True, "count": len(THRESHOLD_SEED_DATA)} # ── Generic settings (site / notifications / integrations / page_prefs) ──────── async def _get_settings(session: AsyncSession, site_id: str, category: str) -> dict: result = await session.execute(text(""" SELECT value FROM site_settings WHERE site_id = :site_id AND category = :category AND key = 'config' """), {"site_id": site_id, "category": category}) row = result.mappings().first() if row: return row["value"] if isinstance(row["value"], dict) else json.loads(row["value"]) return DEFAULT_SETTINGS.get(category, {}) async def _put_settings( session: AsyncSession, site_id: str, category: str, updates: dict ) -> dict: current = await _get_settings(session, site_id, category) merged = {**current, **updates} await session.execute(text(""" INSERT INTO site_settings (site_id, category, key, value, updated_at) VALUES (:site_id, :category, 'config', :value, NOW()) ON CONFLICT (site_id, category, key) DO UPDATE SET value = :value, updated_at = NOW() """), {"site_id": site_id, "category": category, "value": json.dumps(merged)}) await session.commit() return merged @router.get("/site") async def get_site_settings( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _get_settings(session, site_id, "site") @router.put("/site") async def update_site_settings( body: SettingsUpdate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _put_settings(session, site_id, "site", body.value) @router.get("/notifications") async def get_notifications( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _get_settings(session, site_id, "notifications") @router.put("/notifications") async def update_notifications( body: SettingsUpdate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _put_settings(session, site_id, "notifications", body.value) @router.get("/integrations") async def get_integrations( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _get_settings(session, site_id, "integrations") @router.put("/integrations") async def update_integrations( body: SettingsUpdate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _put_settings(session, site_id, "integrations", body.value) @router.get("/page-prefs") async def get_page_prefs( site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _get_settings(session, site_id, "page_prefs") @router.put("/page-prefs") async def update_page_prefs( body: SettingsUpdate, site_id: str = Query(DEFAULT_SITE), session: AsyncSession = Depends(get_session), ): return await _put_settings(session, site_id, "page_prefs", body.value)