BMS/simulators/main.py
2026-03-19 11:32:17 +00:00

182 lines
6.4 KiB
Python

import asyncio
import json
import logging
import aiomqtt
from config import MQTT_HOST, MQTT_PORT, TOPOLOGY
from scenarios.compound import CompoundOrchestrator, COMPOUND_SCENARIOS
from bots.env_sensor import EnvSensorBot
from bots.pdu import PduBot
from bots.ups import UpsBot
from bots.crac import CracBot
from bots.water_leak import WaterLeakBot
from bots.generator import GeneratorBot
from bots.ats import AtsBot
from bots.chiller import ChillerBot
from bots.vesda import VesdaBot
from bots.network import NetworkBot
from bots.particles import ParticleBot
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
def build_bots() -> tuple[list, dict]:
"""Instantiate all bots. Returns (list, key→bot dict for scenario targeting).
Keys include individual device IDs AND room IDs (mapped to all bots in that room).
"""
bots = []
by_key: dict[str, object | list] = {}
site_id = TOPOLOGY["site_id"]
for room in TOPOLOGY["rooms"]:
room_id = room["room_id"]
room_bots: list = []
for rack_id in room["racks"]:
env = EnvSensorBot(site_id, room_id, rack_id)
pdu = PduBot(site_id, room_id, rack_id)
bots += [env, pdu]
by_key[rack_id] = env
by_key[f"{rack_id}/pdu"] = pdu
room_bots += [env, pdu]
crac = CracBot(site_id, room["crac_id"])
bots.append(crac)
by_key[room["crac_id"]] = crac
room_bots.append(crac)
# Room-level targeting: hall-a targets ALL bots; /env and /pdu target subsets
by_key[room_id] = room_bots
by_key[f"{room_id}/env"] = [b for b in room_bots if isinstance(b, EnvSensorBot)]
by_key[f"{room_id}/pdu"] = [b for b in room_bots if isinstance(b, PduBot)]
for ups_id in TOPOLOGY["ups_units"]:
ups = UpsBot(site_id, ups_id)
bots.append(ups)
by_key[ups_id] = ups
for gen_id in TOPOLOGY.get("generators", []):
gen = GeneratorBot(site_id, gen_id)
bots.append(gen)
by_key[gen_id] = gen
for ats_id in TOPOLOGY.get("ats_units", []):
ats = AtsBot(site_id, ats_id)
bots.append(ats)
by_key[ats_id] = ats
for chiller_id in TOPOLOGY.get("chillers", []):
chiller = ChillerBot(site_id, chiller_id)
bots.append(chiller)
by_key[chiller_id] = chiller
for zone in TOPOLOGY.get("vesda_zones", []):
vesda = VesdaBot(site_id, zone["zone_id"], zone["room_id"])
bots.append(vesda)
by_key[zone["zone_id"]] = vesda
for leak_cfg in TOPOLOGY.get("leak_sensors", []):
# Support both old string format and new dict format
if isinstance(leak_cfg, str):
leak = WaterLeakBot(site_id, leak_cfg)
by_key[leak_cfg] = leak
else:
leak = WaterLeakBot(
site_id,
leak_cfg["sensor_id"],
floor_zone=leak_cfg.get("floor_zone", "general"),
under_floor=leak_cfg.get("under_floor", False),
near_crac=leak_cfg.get("near_crac", False),
room_id=leak_cfg.get("room_id"),
)
by_key[leak_cfg["sensor_id"]] = leak
bots.append(leak)
for sw_cfg in TOPOLOGY.get("switches", []):
sw = NetworkBot(site_id, sw_cfg["switch_id"], port_count=sw_cfg.get("port_count", 48))
bots.append(sw)
by_key[sw_cfg["switch_id"]] = sw
for ps_cfg in TOPOLOGY.get("particle_sensors", []):
ps = ParticleBot(site_id, ps_cfg["room_id"])
bots.append(ps)
by_key[f"particles-{ps_cfg['room_id']}"] = ps
return bots, by_key
async def listen_scenarios(
client: aiomqtt.Client,
by_key: dict,
orchestrator: CompoundOrchestrator,
) -> None:
"""Listen for scenario control messages and apply them to bots."""
await client.subscribe("bms/control/scenario")
async for message in client.messages:
if "control/scenario" not in str(message.topic):
continue
try:
data = json.loads(message.payload.decode())
scenario = data.get("scenario")
target = data.get("target") # rack_id, crac_id, ups_id — or None for all
# Compound scenarios are handled by the orchestrator
if scenario in COMPOUND_SCENARIOS:
orchestrator.trigger(scenario)
continue
if scenario == "RESET":
orchestrator.reset()
continue
if target:
entry = by_key.get(target)
if entry is None:
logger.warning(f"Unknown target '{target}'")
elif isinstance(entry, list):
for bot in entry:
bot.set_scenario(scenario)
logger.info(f"Scenario '{scenario}' applied to {len(entry)} bots in {target}")
else:
entry.set_scenario(scenario)
logger.info(f"Scenario '{scenario}' applied to {target}")
else:
# Collect unique bot instances (by_key also contains room-level lists)
all_bots: set = set()
for v in by_key.values():
if isinstance(v, list):
all_bots.update(v)
else:
all_bots.add(v)
for bot in all_bots:
bot.set_scenario(scenario)
logger.info(f"Scenario '{scenario}' applied to {len(all_bots)} bots")
except Exception as e:
logger.error(f"Scenario message error: {e}")
async def main() -> None:
bots, by_key = build_bots()
orchestrator = CompoundOrchestrator(by_key)
logger.info(f"Built {len(bots)} simulator bots for site {TOPOLOGY['site_id']}")
while True:
try:
async with aiomqtt.Client(MQTT_HOST, port=MQTT_PORT) as client:
logger.info(f"Connected to MQTT at {MQTT_HOST}:{MQTT_PORT}")
tasks = [asyncio.create_task(bot.run(client)) for bot in bots]
tasks.append(asyncio.create_task(listen_scenarios(client, by_key, orchestrator)))
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Connection lost: {e} — reconnecting in 5s")
await asyncio.sleep(5)
if __name__ == "__main__":
asyncio.run(main())