328 lines
15 KiB
Python
328 lines
15 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import aiomqtt
|
|
from sqlalchemy import text
|
|
|
|
from core.config import settings
|
|
from core.database import AsyncSessionLocal
|
|
from services.alarm_engine import check_and_update_alarms
|
|
from services.ws_manager import manager as ws_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_topic(topic: str) -> dict | None:
|
|
"""
|
|
Topic formats:
|
|
bms/{site_id}/{room_id}/{rack_id}/env — rack environment
|
|
bms/{site_id}/{room_id}/{rack_id}/power — rack PDU power
|
|
bms/{site_id}/cooling/{crac_id} — CRAC unit
|
|
bms/{site_id}/cooling/chiller/{chiller_id} — chiller plant
|
|
bms/{site_id}/power/{ups_id} — UPS unit
|
|
bms/{site_id}/power/ats/{ats_id} — ATS transfer switch
|
|
bms/{site_id}/generator/{gen_id} — diesel generator
|
|
bms/{site_id}/fire/{zone_id} — VESDA fire zone
|
|
bms/{site_id}/leak/{sensor_id} — water leak sensor
|
|
"""
|
|
parts = topic.split("/")
|
|
if len(parts) < 4 or parts[0] != "bms":
|
|
return None
|
|
|
|
site_id = parts[1]
|
|
|
|
# 5-part: rack env/power OR cooling/chiller/{id} OR power/ats/{id}
|
|
if len(parts) == 5:
|
|
if parts[4] in ("env", "power"):
|
|
return {
|
|
"site_id": site_id, "room_id": parts[2],
|
|
"rack_id": parts[3], "device_id": None, "msg_type": parts[4],
|
|
}
|
|
if parts[2] == "cooling" and parts[3] == "chiller":
|
|
return {
|
|
"site_id": site_id, "room_id": None, "rack_id": None,
|
|
"device_id": parts[4], "msg_type": "chiller",
|
|
}
|
|
if parts[2] == "power" and parts[3] == "ats":
|
|
return {
|
|
"site_id": site_id, "room_id": None, "rack_id": None,
|
|
"device_id": parts[4], "msg_type": "ats",
|
|
}
|
|
|
|
# 4-part: bms/{site_id}/{room_id}/particles
|
|
if len(parts) == 4 and parts[3] == "particles":
|
|
return {
|
|
"site_id": site_id, "room_id": parts[2], "rack_id": None,
|
|
"device_id": None, "msg_type": "particles",
|
|
}
|
|
|
|
# 4-part: known subsystem topics
|
|
if len(parts) == 4 and parts[2] in ("cooling", "power", "leak", "generator", "fire", "network"):
|
|
return {
|
|
"site_id": site_id, "room_id": None, "rack_id": None,
|
|
"device_id": parts[3], "msg_type": parts[2],
|
|
}
|
|
return None
|
|
|
|
|
|
async def process_message(topic: str, payload: dict) -> None:
|
|
meta = parse_topic(topic)
|
|
if not meta:
|
|
return
|
|
|
|
site_id = meta["site_id"]
|
|
room_id = meta["room_id"]
|
|
rack_id = meta["rack_id"]
|
|
device_id = meta["device_id"]
|
|
msg_type = meta["msg_type"]
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Build list of (sensor_id, sensor_type, value, unit) tuples
|
|
readings: list[tuple[str, str, float, str]] = []
|
|
|
|
if msg_type == "env" and rack_id:
|
|
base = f"{site_id}/{room_id}/{rack_id}"
|
|
if "temperature" in payload:
|
|
readings.append((f"{base}/temperature", "temperature", float(payload["temperature"]), "°C"))
|
|
if "humidity" in payload:
|
|
readings.append((f"{base}/humidity", "humidity", float(payload["humidity"]), "%"))
|
|
|
|
elif msg_type == "power" and rack_id:
|
|
base = f"{site_id}/{room_id}/{rack_id}"
|
|
if "load_kw" in payload:
|
|
readings.append((f"{base}/power_kw", "power_kw", float(payload["load_kw"]), "kW"))
|
|
# Per-phase PDU data
|
|
for key, s_type, unit in [
|
|
("phase_a_kw", "pdu_phase_a_kw", "kW"),
|
|
("phase_b_kw", "pdu_phase_b_kw", "kW"),
|
|
("phase_c_kw", "pdu_phase_c_kw", "kW"),
|
|
("phase_a_a", "pdu_phase_a_a", "A"),
|
|
("phase_b_a", "pdu_phase_b_a", "A"),
|
|
("phase_c_a", "pdu_phase_c_a", "A"),
|
|
("imbalance_pct", "pdu_imbalance", "%"),
|
|
]:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
|
|
elif msg_type == "cooling" and device_id:
|
|
base = f"{site_id}/cooling/{device_id}"
|
|
crac_fields = [
|
|
# (payload_key, sensor_type, unit)
|
|
("supply_temp", "cooling_supply", "°C"),
|
|
("return_temp", "cooling_return", "°C"),
|
|
("fan_pct", "cooling_fan", "%"),
|
|
("supply_humidity", "cooling_supply_hum", "%"),
|
|
("return_humidity", "cooling_return_hum", "%"),
|
|
("airflow_cfm", "cooling_airflow", "CFM"),
|
|
("filter_dp_pa", "cooling_filter_dp", "Pa"),
|
|
("cooling_capacity_kw", "cooling_cap_kw", "kW"),
|
|
("cooling_capacity_pct", "cooling_cap_pct", "%"),
|
|
("cop", "cooling_cop", ""),
|
|
("sensible_heat_ratio", "cooling_shr", ""),
|
|
("compressor_state", "cooling_comp_state", ""),
|
|
("compressor_load_pct", "cooling_comp_load", "%"),
|
|
("compressor_power_kw", "cooling_comp_power", "kW"),
|
|
("compressor_run_hours", "cooling_comp_hours", "h"),
|
|
("high_pressure_bar", "cooling_high_press", "bar"),
|
|
("low_pressure_bar", "cooling_low_press", "bar"),
|
|
("discharge_superheat_c", "cooling_superheat", "°C"),
|
|
("liquid_subcooling_c", "cooling_subcooling", "°C"),
|
|
("fan_rpm", "cooling_fan_rpm", "RPM"),
|
|
("fan_power_kw", "cooling_fan_power", "kW"),
|
|
("fan_run_hours", "cooling_fan_hours", "h"),
|
|
("total_unit_power_kw", "cooling_unit_power", "kW"),
|
|
("input_voltage_v", "cooling_voltage", "V"),
|
|
("input_current_a", "cooling_current", "A"),
|
|
("power_factor", "cooling_pf", ""),
|
|
]
|
|
for key, s_type, unit in crac_fields:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
|
|
elif msg_type == "power" and device_id:
|
|
base = f"{site_id}/power/{device_id}"
|
|
for key, s_type, unit in [
|
|
("charge_pct", "ups_charge", "%"),
|
|
("load_pct", "ups_load", "%"),
|
|
("runtime_min", "ups_runtime", "min"),
|
|
("voltage", "ups_voltage", "V"),
|
|
]:
|
|
if key in payload:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
# Store state explicitly: 0.0 = online, 1.0 = on_battery, 2.0 = overload
|
|
if "state" in payload:
|
|
state_val = {"online": 0.0, "on_battery": 1.0, "overload": 2.0}.get(payload["state"], 0.0)
|
|
readings.append((f"{base}/state", "ups_state", state_val, ""))
|
|
|
|
elif msg_type == "generator" and device_id:
|
|
base = f"{site_id}/generator/{device_id}"
|
|
state_map = {"standby": 0.0, "running": 1.0, "test": 2.0, "fault": -1.0}
|
|
for key, s_type, unit in [
|
|
("fuel_pct", "gen_fuel_pct", "%"),
|
|
("fuel_litres", "gen_fuel_l", "L"),
|
|
("fuel_rate_lph", "gen_fuel_rate", "L/h"),
|
|
("load_kw", "gen_load_kw", "kW"),
|
|
("load_pct", "gen_load_pct", "%"),
|
|
("run_hours", "gen_run_hours", "h"),
|
|
("voltage_v", "gen_voltage_v", "V"),
|
|
("frequency_hz", "gen_freq_hz", "Hz"),
|
|
("engine_rpm", "gen_rpm", "RPM"),
|
|
("oil_pressure_bar", "gen_oil_press", "bar"),
|
|
("coolant_temp_c", "gen_coolant_c", "°C"),
|
|
("exhaust_temp_c", "gen_exhaust_c", "°C"),
|
|
("alternator_temp_c", "gen_alt_temp_c", "°C"),
|
|
("power_factor", "gen_pf", ""),
|
|
("battery_v", "gen_batt_v", "V"),
|
|
]:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
if "state" in payload:
|
|
readings.append((f"{base}/state", "gen_state", state_map.get(payload["state"], 0.0), ""))
|
|
|
|
elif msg_type == "ats" and device_id:
|
|
base = f"{site_id}/power/ats/{device_id}"
|
|
feed_map = {"utility-a": 0.0, "utility-b": 1.0, "generator": 2.0}
|
|
for key, s_type, unit in [
|
|
("transfer_count", "ats_xfer_count", ""),
|
|
("last_transfer_ms", "ats_xfer_ms", "ms"),
|
|
("utility_a_v", "ats_ua_v", "V"),
|
|
("utility_b_v", "ats_ub_v", "V"),
|
|
("generator_v", "ats_gen_v", "V"),
|
|
]:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
if "active_feed" in payload:
|
|
readings.append((f"{base}/active_feed", "ats_active",
|
|
feed_map.get(payload["active_feed"], 0.0), ""))
|
|
if "state" in payload:
|
|
readings.append((f"{base}/state", "ats_state",
|
|
1.0 if payload["state"] == "transferring" else 0.0, ""))
|
|
|
|
elif msg_type == "chiller" and device_id:
|
|
base = f"{site_id}/cooling/chiller/{device_id}"
|
|
for key, s_type, unit in [
|
|
("chw_supply_c", "chiller_chw_supply", "°C"),
|
|
("chw_return_c", "chiller_chw_return", "°C"),
|
|
("chw_delta_c", "chiller_chw_delta", "°C"),
|
|
("flow_gpm", "chiller_flow_gpm", "GPM"),
|
|
("cooling_load_kw", "chiller_load_kw", "kW"),
|
|
("cooling_load_pct", "chiller_load_pct", "%"),
|
|
("cop", "chiller_cop", ""),
|
|
("compressor_load_pct", "chiller_comp_load", "%"),
|
|
("condenser_pressure_bar", "chiller_cond_press", "bar"),
|
|
("evaporator_pressure_bar", "chiller_evap_press", "bar"),
|
|
("cw_supply_c", "chiller_cw_supply", "°C"),
|
|
("cw_return_c", "chiller_cw_return", "°C"),
|
|
("run_hours", "chiller_run_hours", "h"),
|
|
]:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
if "state" in payload:
|
|
readings.append((f"{base}/state", "chiller_state",
|
|
1.0 if payload["state"] == "online" else 0.0, ""))
|
|
|
|
elif msg_type == "fire" and device_id:
|
|
base = f"{site_id}/fire/{device_id}"
|
|
level_map = {"normal": 0.0, "alert": 1.0, "action": 2.0, "fire": 3.0}
|
|
if "level" in payload:
|
|
readings.append((f"{base}/level", "vesda_level",
|
|
level_map.get(payload["level"], 0.0), ""))
|
|
if "obscuration_pct_m" in payload:
|
|
readings.append((f"{base}/obscuration", "vesda_obscuration",
|
|
float(payload["obscuration_pct_m"]), "%/m"))
|
|
for key, s_type in [
|
|
("detector_1_ok", "vesda_det1"),
|
|
("detector_2_ok", "vesda_det2"),
|
|
("power_ok", "vesda_power"),
|
|
("flow_ok", "vesda_flow"),
|
|
]:
|
|
if key in payload:
|
|
readings.append((f"{base}/{key}", s_type,
|
|
1.0 if payload[key] else 0.0, ""))
|
|
|
|
elif msg_type == "network" and device_id:
|
|
base = f"{site_id}/network/{device_id}"
|
|
state_map = {"up": 0.0, "degraded": 1.0, "down": 2.0}
|
|
for key, s_type, unit in [
|
|
("uptime_s", "net_uptime_s", "s"),
|
|
("active_ports", "net_active_ports", ""),
|
|
("bandwidth_in_mbps", "net_bw_in_mbps", "Mbps"),
|
|
("bandwidth_out_mbps","net_bw_out_mbps", "Mbps"),
|
|
("cpu_pct", "net_cpu_pct", "%"),
|
|
("mem_pct", "net_mem_pct", "%"),
|
|
("temperature_c", "net_temp_c", "°C"),
|
|
("packet_loss_pct", "net_pkt_loss_pct", "%"),
|
|
]:
|
|
if payload.get(key) is not None:
|
|
readings.append((f"{base}/{key}", s_type, float(payload[key]), unit))
|
|
if "state" in payload:
|
|
readings.append((f"{base}/state", "net_state",
|
|
state_map.get(payload["state"], 0.0), ""))
|
|
|
|
elif msg_type == "leak" and device_id:
|
|
state = payload.get("state", "clear")
|
|
readings.append((
|
|
f"{site_id}/leak/{device_id}", "leak",
|
|
1.0 if state == "detected" else 0.0, "",
|
|
))
|
|
|
|
elif msg_type == "particles":
|
|
base = f"{site_id}/{room_id}/particles"
|
|
if "particles_0_5um" in payload:
|
|
readings.append((f"{base}/0_5um", "particles_0_5um", float(payload["particles_0_5um"]), "/m³"))
|
|
if "particles_5um" in payload:
|
|
readings.append((f"{base}/5um", "particles_5um", float(payload["particles_5um"]), "/m³"))
|
|
|
|
if not readings:
|
|
return
|
|
|
|
async with AsyncSessionLocal() as session:
|
|
for sensor_id, sensor_type, value, unit in readings:
|
|
await session.execute(text("""
|
|
INSERT INTO readings
|
|
(recorded_at, sensor_id, sensor_type, site_id, room_id, rack_id, value, unit)
|
|
VALUES
|
|
(:ts, :sid, :stype, :site, :room, :rack, :val, :unit)
|
|
"""), {
|
|
"ts": now, "sid": sensor_id, "stype": sensor_type,
|
|
"site": site_id, "room": room_id, "rack": rack_id,
|
|
"val": value, "unit": unit,
|
|
})
|
|
await check_and_update_alarms(
|
|
session, sensor_id, sensor_type, site_id, room_id, rack_id, value
|
|
)
|
|
await session.commit()
|
|
|
|
# Push to any connected WebSocket clients
|
|
await ws_manager.broadcast({
|
|
"topic": topic,
|
|
"site_id": site_id,
|
|
"room_id": room_id,
|
|
"rack_id": rack_id,
|
|
"readings": [
|
|
{"sensor_id": s, "type": t, "value": v, "unit": u}
|
|
for s, t, v, u in readings
|
|
],
|
|
"timestamp": now.isoformat(),
|
|
})
|
|
|
|
|
|
async def run_subscriber() -> None:
|
|
"""Runs forever, reconnecting on any failure."""
|
|
while True:
|
|
try:
|
|
logger.info(f"Connecting to MQTT at {settings.MQTT_HOST}:{settings.MQTT_PORT}")
|
|
async with aiomqtt.Client(settings.MQTT_HOST, port=settings.MQTT_PORT) as client:
|
|
logger.info("MQTT connected — subscribing to bms/#")
|
|
await client.subscribe("bms/#")
|
|
async for message in client.messages:
|
|
try:
|
|
payload = json.loads(message.payload.decode())
|
|
await process_message(str(message.topic), payload)
|
|
except Exception as e:
|
|
logger.error(f"Error processing message on {message.topic}: {e}")
|
|
except Exception as e:
|
|
logger.error(f"MQTT connection failed: {e} — retrying in 5s")
|
|
await asyncio.sleep(5)
|