import asyncio import json import logging from datetime import datetime, timezone import aiomqtt from sqlalchemy import text from core.config import settings from core.database import AsyncSessionLocal from services.alarm_engine import check_and_update_alarms from services.ws_manager import manager as ws_manager logger = logging.getLogger(__name__) def parse_topic(topic: str) -> dict | None: """ Topic formats: bms/{site_id}/{room_id}/{rack_id}/env — rack environment bms/{site_id}/{room_id}/{rack_id}/power — rack PDU power bms/{site_id}/cooling/{crac_id} — CRAC unit bms/{site_id}/cooling/chiller/{chiller_id} — chiller plant bms/{site_id}/power/{ups_id} — UPS unit bms/{site_id}/power/ats/{ats_id} — ATS transfer switch bms/{site_id}/generator/{gen_id} — diesel generator bms/{site_id}/fire/{zone_id} — VESDA fire zone bms/{site_id}/leak/{sensor_id} — water leak sensor """ parts = topic.split("/") if len(parts) < 4 or parts[0] != "bms": return None site_id = parts[1] # 5-part: rack env/power OR cooling/chiller/{id} OR power/ats/{id} if len(parts) == 5: if parts[4] in ("env", "power"): return { "site_id": site_id, "room_id": parts[2], "rack_id": parts[3], "device_id": None, "msg_type": parts[4], } if parts[2] == "cooling" and parts[3] == "chiller": return { "site_id": site_id, "room_id": None, "rack_id": None, "device_id": parts[4], "msg_type": "chiller", } if parts[2] == "power" and parts[3] == "ats": return { "site_id": site_id, "room_id": None, "rack_id": None, "device_id": parts[4], "msg_type": "ats", } # 4-part: bms/{site_id}/{room_id}/particles if len(parts) == 4 and parts[3] == "particles": return { "site_id": site_id, "room_id": parts[2], "rack_id": None, "device_id": None, "msg_type": "particles", } # 4-part: known subsystem topics if len(parts) == 4 and parts[2] in ("cooling", "power", "leak", "generator", "fire", "network"): return { "site_id": site_id, "room_id": None, "rack_id": None, "device_id": parts[3], "msg_type": parts[2], } return None async def process_message(topic: str, payload: dict) -> None: meta = parse_topic(topic) if not meta: return site_id = meta["site_id"] room_id = meta["room_id"] rack_id = meta["rack_id"] device_id = meta["device_id"] msg_type = meta["msg_type"] now = datetime.now(timezone.utc) # Build list of (sensor_id, sensor_type, value, unit) tuples readings: list[tuple[str, str, float, str]] = [] if msg_type == "env" and rack_id: base = f"{site_id}/{room_id}/{rack_id}" if "temperature" in payload: readings.append((f"{base}/temperature", "temperature", float(payload["temperature"]), "°C")) if "humidity" in payload: readings.append((f"{base}/humidity", "humidity", float(payload["humidity"]), "%")) elif msg_type == "power" and rack_id: base = f"{site_id}/{room_id}/{rack_id}" if "load_kw" in payload: readings.append((f"{base}/power_kw", "power_kw", float(payload["load_kw"]), "kW")) # Per-phase PDU data for key, s_type, unit in [ ("phase_a_kw", "pdu_phase_a_kw", "kW"), ("phase_b_kw", "pdu_phase_b_kw", "kW"), ("phase_c_kw", "pdu_phase_c_kw", "kW"), ("phase_a_a", "pdu_phase_a_a", "A"), ("phase_b_a", "pdu_phase_b_a", "A"), ("phase_c_a", "pdu_phase_c_a", "A"), ("imbalance_pct", "pdu_imbalance", "%"), ]: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) elif msg_type == "cooling" and device_id: base = f"{site_id}/cooling/{device_id}" crac_fields = [ # (payload_key, sensor_type, unit) ("supply_temp", "cooling_supply", "°C"), ("return_temp", "cooling_return", "°C"), ("fan_pct", "cooling_fan", "%"), ("supply_humidity", "cooling_supply_hum", "%"), ("return_humidity", "cooling_return_hum", "%"), ("airflow_cfm", "cooling_airflow", "CFM"), ("filter_dp_pa", "cooling_filter_dp", "Pa"), ("cooling_capacity_kw", "cooling_cap_kw", "kW"), ("cooling_capacity_pct", "cooling_cap_pct", "%"), ("cop", "cooling_cop", ""), ("sensible_heat_ratio", "cooling_shr", ""), ("compressor_state", "cooling_comp_state", ""), ("compressor_load_pct", "cooling_comp_load", "%"), ("compressor_power_kw", "cooling_comp_power", "kW"), ("compressor_run_hours", "cooling_comp_hours", "h"), ("high_pressure_bar", "cooling_high_press", "bar"), ("low_pressure_bar", "cooling_low_press", "bar"), ("discharge_superheat_c", "cooling_superheat", "°C"), ("liquid_subcooling_c", "cooling_subcooling", "°C"), ("fan_rpm", "cooling_fan_rpm", "RPM"), ("fan_power_kw", "cooling_fan_power", "kW"), ("fan_run_hours", "cooling_fan_hours", "h"), ("total_unit_power_kw", "cooling_unit_power", "kW"), ("input_voltage_v", "cooling_voltage", "V"), ("input_current_a", "cooling_current", "A"), ("power_factor", "cooling_pf", ""), ] for key, s_type, unit in crac_fields: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) elif msg_type == "power" and device_id: base = f"{site_id}/power/{device_id}" for key, s_type, unit in [ ("charge_pct", "ups_charge", "%"), ("load_pct", "ups_load", "%"), ("runtime_min", "ups_runtime", "min"), ("voltage", "ups_voltage", "V"), ]: if key in payload: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) # Store state explicitly: 0.0 = online, 1.0 = on_battery, 2.0 = overload if "state" in payload: state_val = {"online": 0.0, "on_battery": 1.0, "overload": 2.0}.get(payload["state"], 0.0) readings.append((f"{base}/state", "ups_state", state_val, "")) elif msg_type == "generator" and device_id: base = f"{site_id}/generator/{device_id}" state_map = {"standby": 0.0, "running": 1.0, "test": 2.0, "fault": -1.0} for key, s_type, unit in [ ("fuel_pct", "gen_fuel_pct", "%"), ("fuel_litres", "gen_fuel_l", "L"), ("fuel_rate_lph", "gen_fuel_rate", "L/h"), ("load_kw", "gen_load_kw", "kW"), ("load_pct", "gen_load_pct", "%"), ("run_hours", "gen_run_hours", "h"), ("voltage_v", "gen_voltage_v", "V"), ("frequency_hz", "gen_freq_hz", "Hz"), ("engine_rpm", "gen_rpm", "RPM"), ("oil_pressure_bar", "gen_oil_press", "bar"), ("coolant_temp_c", "gen_coolant_c", "°C"), ("exhaust_temp_c", "gen_exhaust_c", "°C"), ("alternator_temp_c", "gen_alt_temp_c", "°C"), ("power_factor", "gen_pf", ""), ("battery_v", "gen_batt_v", "V"), ]: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) if "state" in payload: readings.append((f"{base}/state", "gen_state", state_map.get(payload["state"], 0.0), "")) elif msg_type == "ats" and device_id: base = f"{site_id}/power/ats/{device_id}" feed_map = {"utility-a": 0.0, "utility-b": 1.0, "generator": 2.0} for key, s_type, unit in [ ("transfer_count", "ats_xfer_count", ""), ("last_transfer_ms", "ats_xfer_ms", "ms"), ("utility_a_v", "ats_ua_v", "V"), ("utility_b_v", "ats_ub_v", "V"), ("generator_v", "ats_gen_v", "V"), ]: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) if "active_feed" in payload: readings.append((f"{base}/active_feed", "ats_active", feed_map.get(payload["active_feed"], 0.0), "")) if "state" in payload: readings.append((f"{base}/state", "ats_state", 1.0 if payload["state"] == "transferring" else 0.0, "")) elif msg_type == "chiller" and device_id: base = f"{site_id}/cooling/chiller/{device_id}" for key, s_type, unit in [ ("chw_supply_c", "chiller_chw_supply", "°C"), ("chw_return_c", "chiller_chw_return", "°C"), ("chw_delta_c", "chiller_chw_delta", "°C"), ("flow_gpm", "chiller_flow_gpm", "GPM"), ("cooling_load_kw", "chiller_load_kw", "kW"), ("cooling_load_pct", "chiller_load_pct", "%"), ("cop", "chiller_cop", ""), ("compressor_load_pct", "chiller_comp_load", "%"), ("condenser_pressure_bar", "chiller_cond_press", "bar"), ("evaporator_pressure_bar", "chiller_evap_press", "bar"), ("cw_supply_c", "chiller_cw_supply", "°C"), ("cw_return_c", "chiller_cw_return", "°C"), ("run_hours", "chiller_run_hours", "h"), ]: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) if "state" in payload: readings.append((f"{base}/state", "chiller_state", 1.0 if payload["state"] == "online" else 0.0, "")) elif msg_type == "fire" and device_id: base = f"{site_id}/fire/{device_id}" level_map = {"normal": 0.0, "alert": 1.0, "action": 2.0, "fire": 3.0} if "level" in payload: readings.append((f"{base}/level", "vesda_level", level_map.get(payload["level"], 0.0), "")) if "obscuration_pct_m" in payload: readings.append((f"{base}/obscuration", "vesda_obscuration", float(payload["obscuration_pct_m"]), "%/m")) for key, s_type in [ ("detector_1_ok", "vesda_det1"), ("detector_2_ok", "vesda_det2"), ("power_ok", "vesda_power"), ("flow_ok", "vesda_flow"), ]: if key in payload: readings.append((f"{base}/{key}", s_type, 1.0 if payload[key] else 0.0, "")) elif msg_type == "network" and device_id: base = f"{site_id}/network/{device_id}" state_map = {"up": 0.0, "degraded": 1.0, "down": 2.0} for key, s_type, unit in [ ("uptime_s", "net_uptime_s", "s"), ("active_ports", "net_active_ports", ""), ("bandwidth_in_mbps", "net_bw_in_mbps", "Mbps"), ("bandwidth_out_mbps","net_bw_out_mbps", "Mbps"), ("cpu_pct", "net_cpu_pct", "%"), ("mem_pct", "net_mem_pct", "%"), ("temperature_c", "net_temp_c", "°C"), ("packet_loss_pct", "net_pkt_loss_pct", "%"), ]: if payload.get(key) is not None: readings.append((f"{base}/{key}", s_type, float(payload[key]), unit)) if "state" in payload: readings.append((f"{base}/state", "net_state", state_map.get(payload["state"], 0.0), "")) elif msg_type == "leak" and device_id: state = payload.get("state", "clear") readings.append(( f"{site_id}/leak/{device_id}", "leak", 1.0 if state == "detected" else 0.0, "", )) elif msg_type == "particles": base = f"{site_id}/{room_id}/particles" if "particles_0_5um" in payload: readings.append((f"{base}/0_5um", "particles_0_5um", float(payload["particles_0_5um"]), "/m³")) if "particles_5um" in payload: readings.append((f"{base}/5um", "particles_5um", float(payload["particles_5um"]), "/m³")) if not readings: return async with AsyncSessionLocal() as session: for sensor_id, sensor_type, value, unit in readings: await session.execute(text(""" INSERT INTO readings (recorded_at, sensor_id, sensor_type, site_id, room_id, rack_id, value, unit) VALUES (:ts, :sid, :stype, :site, :room, :rack, :val, :unit) """), { "ts": now, "sid": sensor_id, "stype": sensor_type, "site": site_id, "room": room_id, "rack": rack_id, "val": value, "unit": unit, }) await check_and_update_alarms( session, sensor_id, sensor_type, site_id, room_id, rack_id, value ) await session.commit() # Push to any connected WebSocket clients await ws_manager.broadcast({ "topic": topic, "site_id": site_id, "room_id": room_id, "rack_id": rack_id, "readings": [ {"sensor_id": s, "type": t, "value": v, "unit": u} for s, t, v, u in readings ], "timestamp": now.isoformat(), }) async def run_subscriber() -> None: """Runs forever, reconnecting on any failure.""" while True: try: logger.info(f"Connecting to MQTT at {settings.MQTT_HOST}:{settings.MQTT_PORT}") async with aiomqtt.Client(settings.MQTT_HOST, port=settings.MQTT_PORT) as client: logger.info("MQTT connected — subscribing to bms/#") await client.subscribe("bms/#") async for message in client.messages: try: payload = json.loads(message.payload.decode()) await process_message(str(message.topic), payload) except Exception as e: logger.error(f"Error processing message on {message.topic}: {e}") except Exception as e: logger.error(f"MQTT connection failed: {e} — retrying in 5s") await asyncio.sleep(5)