BMS/backend/api/routes/leak.py
2026-03-19 11:32:17 +00:00

57 lines
2 KiB
Python

from fastapi import APIRouter, Depends, Query
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.database import get_session
router = APIRouter()
# Static topology metadata — mirrors simulator config
LEAK_SENSORS = {
"sg-01": [
{"sensor_id": "leak-01", "floor_zone": "crac-zone-a", "under_floor": True, "near_crac": True, "room_id": "hall-a"},
{"sensor_id": "leak-02", "floor_zone": "server-row-b1", "under_floor": True, "near_crac": False, "room_id": "hall-b"},
{"sensor_id": "leak-03", "floor_zone": "ups-room", "under_floor": False, "near_crac": False, "room_id": None},
]
}
@router.get("/status")
async def leak_status(
site_id: str = Query(...),
session: AsyncSession = Depends(get_session),
):
"""Latest state for all leak sensors, enriched with location metadata."""
result = await session.execute(text("""
SELECT DISTINCT ON (sensor_id)
sensor_id, value, recorded_at
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'leak'
AND recorded_at > NOW() - INTERVAL '5 minutes'
ORDER BY sensor_id, recorded_at DESC
"""), {"site_id": site_id})
# sensor_id format: {site_id}/leak/{sensor_id}
state_map: dict[str, dict] = {}
for row in result.mappings().all():
parts = row["sensor_id"].split("/")
if len(parts) < 3:
continue
sid = parts[2]
state_map[sid] = {
"state": "detected" if float(row["value"]) > 0.5 else "clear",
"recorded_at": str(row["recorded_at"]),
}
out = []
for cfg in LEAK_SENSORS.get(site_id, []):
sid = cfg["sensor_id"]
entry = {**cfg}
if sid in state_map:
entry["state"] = state_map[sid]["state"]
entry["recorded_at"] = state_map[sid]["recorded_at"]
else:
entry["state"] = "unknown"
entry["recorded_at"] = None
out.append(entry)
return out