import logging from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy import text from core.config import settings logger = logging.getLogger(__name__) engine = create_async_engine(settings.DATABASE_URL, echo=False, pool_size=10, max_overflow=20) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) async def init_db() -> None: async with engine.begin() as conn: # Enable TimescaleDB await conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE")) # Sensor readings — core time-series table await conn.execute(text(""" CREATE TABLE IF NOT EXISTS readings ( recorded_at TIMESTAMPTZ NOT NULL, sensor_id VARCHAR(120) NOT NULL, sensor_type VARCHAR(50) NOT NULL, site_id VARCHAR(50) NOT NULL, room_id VARCHAR(50), rack_id VARCHAR(50), value DOUBLE PRECISION NOT NULL, unit VARCHAR(20) ) """)) # Convert to hypertable — no-op if already one try: await conn.execute(text( "SELECT create_hypertable('readings', by_range('recorded_at'), if_not_exists => TRUE)" )) except Exception: try: await conn.execute(text( "SELECT create_hypertable('readings', 'recorded_at', if_not_exists => TRUE)" )) except Exception as e: logger.warning(f"Hypertable setup skipped (table still works): {e}") await conn.execute(text(""" CREATE INDEX IF NOT EXISTS idx_readings_sensor_time ON readings (sensor_id, recorded_at DESC) """)) # Alarms table await conn.execute(text(""" CREATE TABLE IF NOT EXISTS alarms ( id BIGSERIAL PRIMARY KEY, sensor_id VARCHAR(120), site_id VARCHAR(50), room_id VARCHAR(50), rack_id VARCHAR(50), severity VARCHAR(20) NOT NULL, message TEXT NOT NULL, state VARCHAR(20) NOT NULL DEFAULT 'active', triggered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), acknowledged_at TIMESTAMPTZ, resolved_at TIMESTAMPTZ ) """)) # Site config — generic key/value JSON store (used for floor layout etc.) await conn.execute(text(""" CREATE TABLE IF NOT EXISTS site_config ( site_id VARCHAR(50) NOT NULL, key VARCHAR(100) NOT NULL, value JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (site_id, key) ) """)) # Sensor device registry await conn.execute(text(""" CREATE TABLE IF NOT EXISTS sensors ( id SERIAL PRIMARY KEY, site_id VARCHAR(50) NOT NULL, device_id VARCHAR(100) NOT NULL, name VARCHAR(200) NOT NULL, device_type VARCHAR(50) NOT NULL, room_id VARCHAR(50), rack_id VARCHAR(50), protocol VARCHAR(30) NOT NULL DEFAULT 'mqtt', protocol_config JSONB NOT NULL DEFAULT '{}', enabled BOOLEAN NOT NULL DEFAULT true, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(site_id, device_id) ) """)) # Configurable alarm thresholds (replaces hard-coded list at runtime) await conn.execute(text(""" CREATE TABLE IF NOT EXISTS alarm_thresholds ( id SERIAL PRIMARY KEY, site_id VARCHAR(50) NOT NULL, sensor_type VARCHAR(50) NOT NULL, threshold_value FLOAT NOT NULL, direction VARCHAR(10) NOT NULL, severity VARCHAR(20) NOT NULL, message_template TEXT NOT NULL, enabled BOOLEAN NOT NULL DEFAULT true, locked BOOLEAN NOT NULL DEFAULT false, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """)) # Site-level settings (profile, notifications, integrations, page prefs) await conn.execute(text(""" CREATE TABLE IF NOT EXISTS site_settings ( site_id VARCHAR(50) NOT NULL, category VARCHAR(50) NOT NULL, key VARCHAR(100) NOT NULL, value JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (site_id, category, key) ) """)) logger.info("Database initialised") async def get_session(): async with AsyncSessionLocal() as session: yield session