BMS/backend/api/routes/reports.py
2026-03-19 11:32:17 +00:00

356 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import io
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.database import get_session
router = APIRouter()
TARIFF_SGD_KWH = 0.298
ROOMS = {
"sg-01": [
{"room_id": "hall-a", "racks": [f"SG1A01.{i:02d}" for i in range(1, 21)] + [f"SG1A02.{i:02d}" for i in range(1, 21)], "crac_id": "crac-01"},
{"room_id": "hall-b", "racks": [f"SG1B01.{i:02d}" for i in range(1, 21)] + [f"SG1B02.{i:02d}" for i in range(1, 21)], "crac_id": "crac-02"},
]
}
UPS_IDS = {"sg-01": ["ups-01", "ups-02"]}
@router.get("/energy")
async def energy_report(
site_id: str = Query(...),
days: int = Query(30, ge=1, le=90),
session: AsyncSession = Depends(get_session),
):
"""kWh consumption, cost, and 30-day PUE trend."""
from_time = datetime.now(timezone.utc) - timedelta(days=days)
# Total kWh over period (5-min buckets × kW / 12 = kWh per bucket)
try:
kwh_result = await session.execute(text("""
SELECT ROUND((SUM(avg_kw) / 12.0)::numeric, 1) AS kwh_total
FROM (
SELECT
time_bucket('5 minutes', recorded_at) AS bucket,
AVG(value) AS avg_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND recorded_at > :from_time
GROUP BY bucket
) bucketed
"""), {"site_id": site_id, "from_time": from_time})
except Exception:
kwh_result = await session.execute(text("""
SELECT ROUND((SUM(avg_kw) / 12.0)::numeric, 1) AS kwh_total
FROM (
SELECT
date_trunc('minute', recorded_at) AS bucket,
AVG(value) AS avg_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND recorded_at > :from_time
GROUP BY bucket
) bucketed
"""), {"site_id": site_id, "from_time": from_time})
kwh_row = kwh_result.mappings().first()
kwh_total = float(kwh_row["kwh_total"] or 0) if kwh_row else 0.0
cost_sgd = round(kwh_total * TARIFF_SGD_KWH, 2)
# PUE daily average (IT load / total facility load — approximated as IT load / 0.85 overhead)
# Since we only have IT load, estimate PUE = total_facility / it_load ≈ 1.41.6
# For a proper PUE we'd need facility meter — use a day-by-day IT load trend instead
try:
pue_result = await session.execute(text("""
SELECT
time_bucket('1 day', recorded_at) AS day,
ROUND(AVG(value)::numeric, 2) AS avg_it_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND recorded_at > :from_time
GROUP BY day
ORDER BY day ASC
"""), {"site_id": site_id, "from_time": from_time})
except Exception:
pue_result = await session.execute(text("""
SELECT
date_trunc('day', recorded_at) AS day,
ROUND(AVG(value)::numeric, 2) AS avg_it_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND recorded_at > :from_time
GROUP BY day
ORDER BY day ASC
"""), {"site_id": site_id, "from_time": from_time})
# Estimated PUE: assume ~40% overhead (cooling + lighting + UPS losses)
OVERHEAD_FACTOR = 1.40
pue_trend = [
{
"day": str(r["day"]),
"avg_it_kw": float(r["avg_it_kw"]),
"pue_est": round(OVERHEAD_FACTOR, 2),
}
for r in pue_result.mappings().all()
]
return {
"site_id": site_id,
"period_days": days,
"from_date": from_time.date().isoformat(),
"to_date": datetime.now(timezone.utc).date().isoformat(),
"kwh_total": kwh_total,
"cost_sgd": cost_sgd,
"tariff_sgd_kwh": TARIFF_SGD_KWH,
"currency": "SGD",
"pue_estimated": OVERHEAD_FACTOR,
"pue_trend": pue_trend,
}
@router.get("/summary")
async def site_summary(
site_id: str = Query(...),
session: AsyncSession = Depends(get_session),
):
"""Site-level summary: KPIs, alarm stats, CRAC uptime%, UPS uptime%."""
# KPIs
kpi_res = await session.execute(text("""
SELECT
ROUND(SUM(CASE WHEN sensor_type = 'power_kw' THEN value END)::numeric, 2) AS total_power_kw,
ROUND(AVG(CASE WHEN sensor_type = 'temperature' THEN value END)::numeric, 1) AS avg_temperature
FROM (
SELECT DISTINCT ON (sensor_id) sensor_id, sensor_type, value
FROM readings
WHERE site_id = :site_id
AND sensor_type IN ('power_kw', 'temperature')
AND recorded_at > NOW() - INTERVAL '10 minutes'
ORDER BY sensor_id, recorded_at DESC
) latest
"""), {"site_id": site_id})
kpi_row = kpi_res.mappings().first() or {}
# Alarm stats (all-time by state/severity)
alarm_res = await session.execute(text("""
SELECT state, severity, COUNT(*) AS cnt
FROM alarms
WHERE site_id = :site_id
GROUP BY state, severity
"""), {"site_id": site_id})
alarm_stats: dict = {"active": 0, "acknowledged": 0, "resolved": 0, "critical": 0, "warning": 0}
for row in alarm_res.mappings().all():
if row["state"] in alarm_stats:
alarm_stats[row["state"]] += int(row["cnt"])
if row["severity"] in ("critical", "warning"):
alarm_stats[row["severity"]] += int(row["cnt"])
# CRAC uptime % over last 24h
from_24h = datetime.now(timezone.utc) - timedelta(hours=24)
total_buckets = 24 * 12 # one 5-min bucket per 5 minutes
cracs = []
for room in ROOMS.get(site_id, []):
crac_id = room["crac_id"]
try:
r = await session.execute(text("""
SELECT COUNT(DISTINCT time_bucket('5 minutes', recorded_at)) AS buckets
FROM readings
WHERE site_id = :site_id
AND sensor_id LIKE :pattern
AND sensor_type = 'cooling_supply'
AND recorded_at > :from_time
"""), {"site_id": site_id, "pattern": f"{site_id}/cooling/{crac_id}/%", "from_time": from_24h})
except Exception:
r = await session.execute(text("""
SELECT COUNT(DISTINCT date_trunc('minute', recorded_at)) AS buckets
FROM readings
WHERE site_id = :site_id
AND sensor_id LIKE :pattern
AND sensor_type = 'cooling_supply'
AND recorded_at > :from_time
"""), {"site_id": site_id, "pattern": f"{site_id}/cooling/{crac_id}/%", "from_time": from_24h})
row = r.mappings().first()
buckets = int(row["buckets"]) if row and row["buckets"] else 0
cracs.append({
"crac_id": crac_id,
"room_id": room["room_id"],
"uptime_pct": round(min(100.0, buckets / total_buckets * 100), 1),
})
# UPS uptime % over last 24h
ups_units = []
for ups_id in UPS_IDS.get(site_id, []):
try:
r = await session.execute(text("""
SELECT COUNT(DISTINCT time_bucket('5 minutes', recorded_at)) AS buckets
FROM readings
WHERE site_id = :site_id
AND sensor_id LIKE :pattern
AND sensor_type = 'ups_charge'
AND recorded_at > :from_time
"""), {"site_id": site_id, "pattern": f"{site_id}/ups/{ups_id}/%", "from_time": from_24h})
except Exception:
r = await session.execute(text("""
SELECT COUNT(DISTINCT date_trunc('minute', recorded_at)) AS buckets
FROM readings
WHERE site_id = :site_id
AND sensor_id LIKE :pattern
AND sensor_type = 'ups_charge'
AND recorded_at > :from_time
"""), {"site_id": site_id, "pattern": f"{site_id}/ups/{ups_id}/%", "from_time": from_24h})
row = r.mappings().first()
buckets = int(row["buckets"]) if row and row["buckets"] else 0
ups_units.append({
"ups_id": ups_id,
"uptime_pct": round(min(100.0, buckets / total_buckets * 100), 1),
})
return {
"site_id": site_id,
"generated_at": datetime.now(timezone.utc).isoformat(),
"kpis": {
"total_power_kw": float(kpi_row.get("total_power_kw") or 0),
"avg_temperature": float(kpi_row.get("avg_temperature") or 0),
},
"alarm_stats": alarm_stats,
"crac_uptime": cracs,
"ups_uptime": ups_units,
}
@router.get("/export/power")
async def export_power(
site_id: str = Query(...),
hours: int = Query(24, ge=1, le=168),
session: AsyncSession = Depends(get_session),
):
"""Download power history as CSV."""
from_time = datetime.now(timezone.utc) - timedelta(hours=hours)
try:
result = await session.execute(text("""
SELECT
time_bucket('5 minutes', recorded_at) AS bucket,
room_id,
ROUND(SUM(value)::numeric, 2) AS total_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND room_id IS NOT NULL
AND recorded_at > :from_time
GROUP BY bucket, room_id
ORDER BY bucket ASC
"""), {"site_id": site_id, "from_time": from_time})
except Exception:
result = await session.execute(text("""
SELECT
date_trunc('minute', recorded_at) AS bucket,
room_id,
ROUND(SUM(value)::numeric, 2) AS total_kw
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'power_kw'
AND room_id IS NOT NULL
AND recorded_at > :from_time
GROUP BY bucket, room_id
ORDER BY bucket ASC
"""), {"site_id": site_id, "from_time": from_time})
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["timestamp", "room_id", "total_kw"])
for row in result.mappings().all():
writer.writerow([row["bucket"], row["room_id"], row["total_kw"]])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=power_{site_id}_{hours}h.csv"},
)
@router.get("/export/temperature")
async def export_temperature(
site_id: str = Query(...),
hours: int = Query(24, ge=1, le=168),
session: AsyncSession = Depends(get_session),
):
"""Download temperature history per rack as CSV."""
from_time = datetime.now(timezone.utc) - timedelta(hours=hours)
try:
result = await session.execute(text("""
SELECT
time_bucket('5 minutes', recorded_at) AS bucket,
rack_id, room_id,
ROUND(AVG(value)::numeric, 1) AS avg_temp
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'temperature'
AND rack_id IS NOT NULL
AND recorded_at > :from_time
GROUP BY bucket, rack_id, room_id
ORDER BY bucket ASC
"""), {"site_id": site_id, "from_time": from_time})
except Exception:
result = await session.execute(text("""
SELECT
date_trunc('minute', recorded_at) AS bucket,
rack_id, room_id,
ROUND(AVG(value)::numeric, 1) AS avg_temp
FROM readings
WHERE site_id = :site_id
AND sensor_type = 'temperature'
AND rack_id IS NOT NULL
AND recorded_at > :from_time
GROUP BY bucket, rack_id, room_id
ORDER BY bucket ASC
"""), {"site_id": site_id, "from_time": from_time})
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["timestamp", "room_id", "rack_id", "avg_temp_c"])
for row in result.mappings().all():
writer.writerow([row["bucket"], row["room_id"], row["rack_id"], row["avg_temp"]])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=temperature_{site_id}_{hours}h.csv"},
)
@router.get("/export/alarms")
async def export_alarms(
site_id: str = Query(...),
session: AsyncSession = Depends(get_session),
):
"""Download full alarm log as CSV."""
result = await session.execute(text("""
SELECT id, severity, message, state, room_id, rack_id, triggered_at
FROM alarms
WHERE site_id = :site_id
ORDER BY triggered_at DESC
"""), {"site_id": site_id})
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "severity", "message", "state", "room_id", "rack_id", "triggered_at"])
for row in result.mappings().all():
writer.writerow([
row["id"], row["severity"], row["message"], row["state"],
row["room_id"], row["rack_id"], row["triggered_at"],
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=alarms_{site_id}.csv"},
)