BMS/backend/services/ws_manager.py
2026-03-19 11:32:17 +00:00

35 lines
1 KiB
Python

import json
import logging
from fastapi import WebSocket
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self) -> None:
self._connections: set[WebSocket] = set()
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
self._connections.add(ws)
logger.info(f"WS client connected. Total: {len(self._connections)}")
def disconnect(self, ws: WebSocket) -> None:
self._connections.discard(ws)
logger.info(f"WS client disconnected. Total: {len(self._connections)}")
async def broadcast(self, data: dict) -> None:
if not self._connections:
return
message = json.dumps(data, default=str)
dead: set[WebSocket] = set()
for ws in self._connections:
try:
await ws.send_text(message)
except Exception:
dead.add(ws)
self._connections -= dead
# Singleton — imported by both the MQTT subscriber and the WS route
manager = ConnectionManager()