BMS/simulators/bots/pdu.py
2026-03-19 11:32:17 +00:00

87 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
import random
from datetime import datetime, timezone
from bots.base import BaseBot
VOLTAGE_LINE = 230.0 # line-to-neutral (V)
class PduBot(BaseBot):
"""PDU power monitor for a single rack — rack total + per-phase breakdown."""
interval = 60
def __init__(self, site_id: str, room_id: str, rack_id: str) -> None:
super().__init__()
self.site_id = site_id
self.room_id = room_id
self.rack_id = rack_id
rack_num = int(rack_id[-2:]) if rack_id[-2:].isdigit() else 1
# Base load varies per rack (2 6 kW)
self._base_load = 2.0 + (rack_num % 5) * 0.8
# Phase split offsets — each rack has a slightly different natural imbalance
self._phase_bias = [1.0 + random.gauss(0, 0.04) for _ in range(3)]
self._imbalanced = False
def get_topic(self) -> str:
return f"bms/{self.site_id}/{self.room_id}/{self.rack_id}/power"
def set_scenario(self, name: str | None) -> None:
super().set_scenario(name)
self._imbalanced = (name == "PHASE_IMBALANCE")
def get_payload(self) -> dict:
hour = datetime.now(timezone.utc).hour
# Higher load during business hours (820)
biz_factor = 1.0 + 0.25 * math.sin(math.pi * max(0, hour - 8) / 12) if 8 <= hour <= 20 else 0.85
load = self._base_load * biz_factor + random.gauss(0, 0.1)
if self._scenario == "POWER_SPIKE":
load *= 1.0 + min(self._scenario_step * 0.08, 0.5)
elif self._scenario == "RACK_OVERLOAD":
load = 8.5 + random.gauss(0, 0.3)
load = round(max(0.5, load), 2)
# ── Per-phase breakdown ───────────────────────────────────────
if self._imbalanced:
# Drive phase A much higher, C drops
bias = [
1.0 + min(self._scenario_step * 0.06, 0.50), # phase A surges
1.0 + random.gauss(0, 0.02), # phase B stable
max(0.3, 1.0 - min(self._scenario_step * 0.04, 0.40)), # phase C drops
]
else:
bias = [b + random.gauss(0, 0.015) for b in self._phase_bias]
bias_total = sum(bias)
phase_kw = [load * (b / bias_total) for b in bias]
phase_a_kw, phase_b_kw, phase_c_kw = phase_kw
# Current per phase (I = P / V, single-phase)
pf = 0.93 + random.gauss(0, 0.01)
def amps(kw: float) -> float:
return kw * 1000 / (VOLTAGE_LINE * pf)
phase_a_a = amps(phase_a_kw)
phase_b_a = amps(phase_b_kw)
phase_c_a = amps(phase_c_kw)
# Phase imbalance % = (max - min) / avg * 100
currents = [phase_a_a, phase_b_a, phase_c_a]
avg_a = sum(currents) / 3
imbalance = (max(currents) - min(currents)) / avg_a * 100 if avg_a > 0 else 0.0
return {
"load_kw": load,
"phase_a_kw": round(phase_a_kw, 2),
"phase_b_kw": round(phase_b_kw, 2),
"phase_c_kw": round(phase_c_kw, 2),
"phase_a_a": round(phase_a_a, 1),
"phase_b_a": round(phase_b_a, 1),
"phase_c_a": round(phase_c_a, 1),
"imbalance_pct": round(imbalance, 1),
"power_factor": round(pf, 3),
"voltage_v": round(VOLTAGE_LINE + random.gauss(0, 0.5), 1),
}