from datetime import datetime, timezone, timedelta from fastapi import APIRouter, Depends, Query from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from core.database import get_session router = APIRouter() GENERATORS = {"sg-01": ["gen-01"]} GEN_FIELD_MAP = { "gen_fuel_pct": "fuel_pct", "gen_fuel_l": "fuel_litres", "gen_fuel_rate": "fuel_rate_lph", "gen_load_kw": "load_kw", "gen_load_pct": "load_pct", "gen_run_hours": "run_hours", "gen_voltage_v": "voltage_v", "gen_freq_hz": "frequency_hz", "gen_rpm": "engine_rpm", "gen_oil_press": "oil_pressure_bar", "gen_coolant_c": "coolant_temp_c", "gen_exhaust_c": "exhaust_temp_c", "gen_alt_temp_c": "alternator_temp_c", "gen_pf": "power_factor", "gen_batt_v": "battery_v", } STATE_MAP = {-1.0: "fault", 0.0: "standby", 1.0: "running", 2.0: "test"} @router.get("/status") async def generator_status( site_id: str = Query(...), session: AsyncSession = Depends(get_session), ): """Latest reading for each generator.""" types_sql = ", ".join(f"'{t}'" for t in [*GEN_FIELD_MAP.keys(), "gen_state"]) result = await session.execute(text(f""" SELECT DISTINCT ON (sensor_id) sensor_id, sensor_type, value FROM readings WHERE site_id = :site_id AND sensor_type IN ({types_sql}) AND recorded_at > NOW() - INTERVAL '5 minutes' ORDER BY sensor_id, recorded_at DESC """), {"site_id": site_id}) gen_data: dict[str, dict] = {} for row in result.mappings().all(): parts = row["sensor_id"].split("/") if len(parts) < 3: continue gen_id = parts[2] if gen_id not in gen_data: gen_data[gen_id] = {"gen_id": gen_id} field = GEN_FIELD_MAP.get(row["sensor_type"]) if field: gen_data[gen_id][field] = round(float(row["value"]), 2) elif row["sensor_type"] == "gen_state": v = round(float(row["value"])) gen_data[gen_id]["state"] = STATE_MAP.get(v, "standby") out = [] for gen_id in GENERATORS.get(site_id, []): d = gen_data.get(gen_id, {"gen_id": gen_id, "state": "unknown"}) if "state" not in d: d["state"] = "standby" out.append(d) return out HISTORY_METRICS = ( "gen_load_pct", "gen_fuel_pct", "gen_coolant_c", "gen_exhaust_c", "gen_freq_hz", "gen_alt_temp_c", ) @router.get("/history") async def generator_history( site_id: str = Query(...), gen_id: str = Query(...), hours: int = Query(6, ge=1, le=24), session: AsyncSession = Depends(get_session), ): """5-minute bucketed time-series for a single generator.""" from_time = datetime.now(timezone.utc) - timedelta(hours=hours) types_sql = ", ".join(f"'{t}'" for t in HISTORY_METRICS) try: result = await session.execute(text(f""" SELECT time_bucket('5 minutes', recorded_at) AS bucket, sensor_type, ROUND(AVG(value)::numeric, 2) AS avg_val FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type IN ({types_sql}) AND recorded_at > :from_time GROUP BY bucket, sensor_type ORDER BY bucket ASC """), {"site_id": site_id, "pattern": f"{site_id}/generator/{gen_id}/%", "from_time": from_time}) except Exception: result = await session.execute(text(f""" SELECT date_trunc('minute', recorded_at) AS bucket, sensor_type, ROUND(AVG(value)::numeric, 2) AS avg_val FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type IN ({types_sql}) AND recorded_at > :from_time GROUP BY bucket, sensor_type ORDER BY bucket ASC """), {"site_id": site_id, "pattern": f"{site_id}/generator/{gen_id}/%", "from_time": from_time}) # Pivot: bucket → {metric: value} buckets: dict[str, dict] = {} for row in result.mappings().all(): b = row["bucket"].isoformat() buckets.setdefault(b, {"bucket": b}) key_map = { "gen_load_pct": "load_pct", "gen_fuel_pct": "fuel_pct", "gen_coolant_c": "coolant_temp_c", "gen_exhaust_c": "exhaust_temp_c", "gen_freq_hz": "frequency_hz", "gen_alt_temp_c":"alternator_temp_c", } field = key_map.get(row["sensor_type"]) if field: buckets[b][field] = float(row["avg_val"]) return list(buckets.values())