import csv import io from datetime import datetime, timezone, timedelta from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from core.database import get_session router = APIRouter() TARIFF_SGD_KWH = 0.298 ROOMS = { "sg-01": [ {"room_id": "hall-a", "racks": [f"SG1A01.{i:02d}" for i in range(1, 21)] + [f"SG1A02.{i:02d}" for i in range(1, 21)], "crac_id": "crac-01"}, {"room_id": "hall-b", "racks": [f"SG1B01.{i:02d}" for i in range(1, 21)] + [f"SG1B02.{i:02d}" for i in range(1, 21)], "crac_id": "crac-02"}, ] } UPS_IDS = {"sg-01": ["ups-01", "ups-02"]} @router.get("/energy") async def energy_report( site_id: str = Query(...), days: int = Query(30, ge=1, le=90), session: AsyncSession = Depends(get_session), ): """kWh consumption, cost, and 30-day PUE trend.""" from_time = datetime.now(timezone.utc) - timedelta(days=days) # Total kWh over period (5-min buckets × kW / 12 = kWh per bucket) try: kwh_result = await session.execute(text(""" SELECT ROUND((SUM(avg_kw) / 12.0)::numeric, 1) AS kwh_total FROM ( SELECT time_bucket('5 minutes', recorded_at) AS bucket, AVG(value) AS avg_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND recorded_at > :from_time GROUP BY bucket ) bucketed """), {"site_id": site_id, "from_time": from_time}) except Exception: kwh_result = await session.execute(text(""" SELECT ROUND((SUM(avg_kw) / 12.0)::numeric, 1) AS kwh_total FROM ( SELECT date_trunc('minute', recorded_at) AS bucket, AVG(value) AS avg_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND recorded_at > :from_time GROUP BY bucket ) bucketed """), {"site_id": site_id, "from_time": from_time}) kwh_row = kwh_result.mappings().first() kwh_total = float(kwh_row["kwh_total"] or 0) if kwh_row else 0.0 cost_sgd = round(kwh_total * TARIFF_SGD_KWH, 2) # PUE daily average (IT load / total facility load — approximated as IT load / 0.85 overhead) # Since we only have IT load, estimate PUE = total_facility / it_load ≈ 1.4–1.6 # For a proper PUE we'd need facility meter — use a day-by-day IT load trend instead try: pue_result = await session.execute(text(""" SELECT time_bucket('1 day', recorded_at) AS day, ROUND(AVG(value)::numeric, 2) AS avg_it_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND recorded_at > :from_time GROUP BY day ORDER BY day ASC """), {"site_id": site_id, "from_time": from_time}) except Exception: pue_result = await session.execute(text(""" SELECT date_trunc('day', recorded_at) AS day, ROUND(AVG(value)::numeric, 2) AS avg_it_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND recorded_at > :from_time GROUP BY day ORDER BY day ASC """), {"site_id": site_id, "from_time": from_time}) # Estimated PUE: assume ~40% overhead (cooling + lighting + UPS losses) OVERHEAD_FACTOR = 1.40 pue_trend = [ { "day": str(r["day"]), "avg_it_kw": float(r["avg_it_kw"]), "pue_est": round(OVERHEAD_FACTOR, 2), } for r in pue_result.mappings().all() ] return { "site_id": site_id, "period_days": days, "from_date": from_time.date().isoformat(), "to_date": datetime.now(timezone.utc).date().isoformat(), "kwh_total": kwh_total, "cost_sgd": cost_sgd, "tariff_sgd_kwh": TARIFF_SGD_KWH, "currency": "SGD", "pue_estimated": OVERHEAD_FACTOR, "pue_trend": pue_trend, } @router.get("/summary") async def site_summary( site_id: str = Query(...), session: AsyncSession = Depends(get_session), ): """Site-level summary: KPIs, alarm stats, CRAC uptime%, UPS uptime%.""" # KPIs kpi_res = await session.execute(text(""" SELECT ROUND(SUM(CASE WHEN sensor_type = 'power_kw' THEN value END)::numeric, 2) AS total_power_kw, ROUND(AVG(CASE WHEN sensor_type = 'temperature' THEN value END)::numeric, 1) AS avg_temperature FROM ( SELECT DISTINCT ON (sensor_id) sensor_id, sensor_type, value FROM readings WHERE site_id = :site_id AND sensor_type IN ('power_kw', 'temperature') AND recorded_at > NOW() - INTERVAL '10 minutes' ORDER BY sensor_id, recorded_at DESC ) latest """), {"site_id": site_id}) kpi_row = kpi_res.mappings().first() or {} # Alarm stats (all-time by state/severity) alarm_res = await session.execute(text(""" SELECT state, severity, COUNT(*) AS cnt FROM alarms WHERE site_id = :site_id GROUP BY state, severity """), {"site_id": site_id}) alarm_stats: dict = {"active": 0, "acknowledged": 0, "resolved": 0, "critical": 0, "warning": 0} for row in alarm_res.mappings().all(): if row["state"] in alarm_stats: alarm_stats[row["state"]] += int(row["cnt"]) if row["severity"] in ("critical", "warning"): alarm_stats[row["severity"]] += int(row["cnt"]) # CRAC uptime % over last 24h from_24h = datetime.now(timezone.utc) - timedelta(hours=24) total_buckets = 24 * 12 # one 5-min bucket per 5 minutes cracs = [] for room in ROOMS.get(site_id, []): crac_id = room["crac_id"] try: r = await session.execute(text(""" SELECT COUNT(DISTINCT time_bucket('5 minutes', recorded_at)) AS buckets FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type = 'cooling_supply' AND recorded_at > :from_time """), {"site_id": site_id, "pattern": f"{site_id}/cooling/{crac_id}/%", "from_time": from_24h}) except Exception: r = await session.execute(text(""" SELECT COUNT(DISTINCT date_trunc('minute', recorded_at)) AS buckets FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type = 'cooling_supply' AND recorded_at > :from_time """), {"site_id": site_id, "pattern": f"{site_id}/cooling/{crac_id}/%", "from_time": from_24h}) row = r.mappings().first() buckets = int(row["buckets"]) if row and row["buckets"] else 0 cracs.append({ "crac_id": crac_id, "room_id": room["room_id"], "uptime_pct": round(min(100.0, buckets / total_buckets * 100), 1), }) # UPS uptime % over last 24h ups_units = [] for ups_id in UPS_IDS.get(site_id, []): try: r = await session.execute(text(""" SELECT COUNT(DISTINCT time_bucket('5 minutes', recorded_at)) AS buckets FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type = 'ups_charge' AND recorded_at > :from_time """), {"site_id": site_id, "pattern": f"{site_id}/ups/{ups_id}/%", "from_time": from_24h}) except Exception: r = await session.execute(text(""" SELECT COUNT(DISTINCT date_trunc('minute', recorded_at)) AS buckets FROM readings WHERE site_id = :site_id AND sensor_id LIKE :pattern AND sensor_type = 'ups_charge' AND recorded_at > :from_time """), {"site_id": site_id, "pattern": f"{site_id}/ups/{ups_id}/%", "from_time": from_24h}) row = r.mappings().first() buckets = int(row["buckets"]) if row and row["buckets"] else 0 ups_units.append({ "ups_id": ups_id, "uptime_pct": round(min(100.0, buckets / total_buckets * 100), 1), }) return { "site_id": site_id, "generated_at": datetime.now(timezone.utc).isoformat(), "kpis": { "total_power_kw": float(kpi_row.get("total_power_kw") or 0), "avg_temperature": float(kpi_row.get("avg_temperature") or 0), }, "alarm_stats": alarm_stats, "crac_uptime": cracs, "ups_uptime": ups_units, } @router.get("/export/power") async def export_power( site_id: str = Query(...), hours: int = Query(24, ge=1, le=168), session: AsyncSession = Depends(get_session), ): """Download power history as CSV.""" from_time = datetime.now(timezone.utc) - timedelta(hours=hours) try: result = await session.execute(text(""" SELECT time_bucket('5 minutes', recorded_at) AS bucket, room_id, ROUND(SUM(value)::numeric, 2) AS total_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND room_id IS NOT NULL AND recorded_at > :from_time GROUP BY bucket, room_id ORDER BY bucket ASC """), {"site_id": site_id, "from_time": from_time}) except Exception: result = await session.execute(text(""" SELECT date_trunc('minute', recorded_at) AS bucket, room_id, ROUND(SUM(value)::numeric, 2) AS total_kw FROM readings WHERE site_id = :site_id AND sensor_type = 'power_kw' AND room_id IS NOT NULL AND recorded_at > :from_time GROUP BY bucket, room_id ORDER BY bucket ASC """), {"site_id": site_id, "from_time": from_time}) output = io.StringIO() writer = csv.writer(output) writer.writerow(["timestamp", "room_id", "total_kw"]) for row in result.mappings().all(): writer.writerow([row["bucket"], row["room_id"], row["total_kw"]]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=power_{site_id}_{hours}h.csv"}, ) @router.get("/export/temperature") async def export_temperature( site_id: str = Query(...), hours: int = Query(24, ge=1, le=168), session: AsyncSession = Depends(get_session), ): """Download temperature history per rack as CSV.""" from_time = datetime.now(timezone.utc) - timedelta(hours=hours) try: result = await session.execute(text(""" SELECT time_bucket('5 minutes', recorded_at) AS bucket, rack_id, room_id, ROUND(AVG(value)::numeric, 1) AS avg_temp FROM readings WHERE site_id = :site_id AND sensor_type = 'temperature' AND rack_id IS NOT NULL AND recorded_at > :from_time GROUP BY bucket, rack_id, room_id ORDER BY bucket ASC """), {"site_id": site_id, "from_time": from_time}) except Exception: result = await session.execute(text(""" SELECT date_trunc('minute', recorded_at) AS bucket, rack_id, room_id, ROUND(AVG(value)::numeric, 1) AS avg_temp FROM readings WHERE site_id = :site_id AND sensor_type = 'temperature' AND rack_id IS NOT NULL AND recorded_at > :from_time GROUP BY bucket, rack_id, room_id ORDER BY bucket ASC """), {"site_id": site_id, "from_time": from_time}) output = io.StringIO() writer = csv.writer(output) writer.writerow(["timestamp", "room_id", "rack_id", "avg_temp_c"]) for row in result.mappings().all(): writer.writerow([row["bucket"], row["room_id"], row["rack_id"], row["avg_temp"]]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=temperature_{site_id}_{hours}h.csv"}, ) @router.get("/export/alarms") async def export_alarms( site_id: str = Query(...), session: AsyncSession = Depends(get_session), ): """Download full alarm log as CSV.""" result = await session.execute(text(""" SELECT id, severity, message, state, room_id, rack_id, triggered_at FROM alarms WHERE site_id = :site_id ORDER BY triggered_at DESC """), {"site_id": site_id}) output = io.StringIO() writer = csv.writer(output) writer.writerow(["id", "severity", "message", "state", "room_id", "rack_id", "triggered_at"]) for row in result.mappings().all(): writer.writerow([ row["id"], row["severity"], row["message"], row["state"], row["room_id"], row["rack_id"], row["triggered_at"], ]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=alarms_{site_id}.csv"}, )