41 lines
1.2 KiB
Python
41 lines
1.2 KiB
Python
from bots.base import BaseBot
|
|
|
|
|
|
class WaterLeakBot(BaseBot):
|
|
"""Water leak sensor — normally clear, triggers on LEAK_DETECTED scenario."""
|
|
|
|
interval = 120
|
|
|
|
def __init__(
|
|
self,
|
|
site_id: str,
|
|
sensor_id: str,
|
|
floor_zone: str = "general",
|
|
under_floor: bool = False,
|
|
near_crac: bool = False,
|
|
room_id: str | None = None,
|
|
) -> None:
|
|
super().__init__()
|
|
self.site_id = site_id
|
|
self.sensor_id = sensor_id
|
|
self.floor_zone = floor_zone
|
|
self.under_floor = under_floor
|
|
self.near_crac = near_crac
|
|
self.room_id = room_id
|
|
self._leaked = False
|
|
|
|
def get_topic(self) -> str:
|
|
return f"bms/{self.site_id}/leak/{self.sensor_id}"
|
|
|
|
def set_scenario(self, name: str | None) -> None:
|
|
super().set_scenario(name)
|
|
self._leaked = (name == "LEAK_DETECTED")
|
|
|
|
def get_payload(self) -> dict:
|
|
return {
|
|
"state": "detected" if self._leaked else "clear",
|
|
"floor_zone": self.floor_zone,
|
|
"under_floor": self.under_floor,
|
|
"near_crac": self.near_crac,
|
|
"room_id": self.room_id,
|
|
}
|