import math import random from datetime import datetime, timezone from bots.base import BaseBot VOLTAGE_LINE = 230.0 # line-to-neutral (V) class PduBot(BaseBot): """PDU power monitor for a single rack — rack total + per-phase breakdown.""" interval = 60 def __init__(self, site_id: str, room_id: str, rack_id: str) -> None: super().__init__() self.site_id = site_id self.room_id = room_id self.rack_id = rack_id rack_num = int(rack_id[-2:]) if rack_id[-2:].isdigit() else 1 # Base load varies per rack (2 – 6 kW) self._base_load = 2.0 + (rack_num % 5) * 0.8 # Phase split offsets — each rack has a slightly different natural imbalance self._phase_bias = [1.0 + random.gauss(0, 0.04) for _ in range(3)] self._imbalanced = False def get_topic(self) -> str: return f"bms/{self.site_id}/{self.room_id}/{self.rack_id}/power" def set_scenario(self, name: str | None) -> None: super().set_scenario(name) self._imbalanced = (name == "PHASE_IMBALANCE") def get_payload(self) -> dict: hour = datetime.now(timezone.utc).hour # Higher load during business hours (8–20) biz_factor = 1.0 + 0.25 * math.sin(math.pi * max(0, hour - 8) / 12) if 8 <= hour <= 20 else 0.85 load = self._base_load * biz_factor + random.gauss(0, 0.1) if self._scenario == "POWER_SPIKE": load *= 1.0 + min(self._scenario_step * 0.08, 0.5) elif self._scenario == "RACK_OVERLOAD": load = 8.5 + random.gauss(0, 0.3) load = round(max(0.5, load), 2) # ── Per-phase breakdown ─────────────────────────────────────── if self._imbalanced: # Drive phase A much higher, C drops bias = [ 1.0 + min(self._scenario_step * 0.06, 0.50), # phase A surges 1.0 + random.gauss(0, 0.02), # phase B stable max(0.3, 1.0 - min(self._scenario_step * 0.04, 0.40)), # phase C drops ] else: bias = [b + random.gauss(0, 0.015) for b in self._phase_bias] bias_total = sum(bias) phase_kw = [load * (b / bias_total) for b in bias] phase_a_kw, phase_b_kw, phase_c_kw = phase_kw # Current per phase (I = P / V, single-phase) pf = 0.93 + random.gauss(0, 0.01) def amps(kw: float) -> float: return kw * 1000 / (VOLTAGE_LINE * pf) phase_a_a = amps(phase_a_kw) phase_b_a = amps(phase_b_kw) phase_c_a = amps(phase_c_kw) # Phase imbalance % = (max - min) / avg * 100 currents = [phase_a_a, phase_b_a, phase_c_a] avg_a = sum(currents) / 3 imbalance = (max(currents) - min(currents)) / avg_a * 100 if avg_a > 0 else 0.0 return { "load_kw": load, "phase_a_kw": round(phase_a_kw, 2), "phase_b_kw": round(phase_b_kw, 2), "phase_c_kw": round(phase_c_kw, 2), "phase_a_a": round(phase_a_a, 1), "phase_b_a": round(phase_b_a, 1), "phase_c_a": round(phase_c_a, 1), "imbalance_pct": round(imbalance, 1), "power_factor": round(pf, 3), "voltage_v": round(VOLTAGE_LINE + random.gauss(0, 0.5), 1), }