from __future__ import annotations from contextlib import asynccontextmanager from time import sleep from typing import Annotated from fastapi import Depends, FastAPI, HTTPException from sqlmodel import SQLModel, Session, create_engine, select import netbrite as nb from db import ( MessageDB, MessageUpdate, NetBriteBase, NetBriteDB, NetBritePublic, NetBriteUpdate, ZoneBase, ZoneDB, ZonePublic, ZoneUpdate, ) DB_URL = "sqlite:///devices.db" engine = create_engine(DB_URL, connect_args={"check_same_thread": False}) def get_session(): with Session(engine) as session: yield session SessionDep = Annotated[Session, Depends(get_session)] @asynccontextmanager async def lifespan(_: FastAPI): SQLModel.metadata.create_all(engine) load_devices_from_db() yield app = FastAPI(lifespan=lifespan) active_devices: dict[int, nb.NetBrite] = {} # ---------- helper ---------- def load_devices_from_db() -> None: with Session(engine) as session: for device in session.exec(select(NetBriteDB)).all(): load_device(device) def load_device(device: NetBriteDB): id = device.id or 0 try: active_devices[id] = nb.NetBrite(device.address, device.port) load_zones(device.zones, active_devices[id]) except nb.NetbriteConnectionException as exc: print(f"Could not connect to {device.address}:{device.port} — {exc}") def load_zones_id(session: Session, device_id: int, net_dev: nb.NetBrite): statement = select(ZoneDB).where(ZoneDB.netbrite_id == device_id) zones = list(session.exec(statement)) load_zones(zones, net_dev) def load_zones(zones_in: list[ZoneDB], net_dev: nb.NetBrite) -> None: zones: dict[str, nb.Zone] = {} messages: dict[str, nb.Message] = {} for zone in zones_in: msg = zone.default_message default_msg = ( nb.Message( activation_delay=msg.activation_delay, display_delay=msg.display_delay, priority=msg.priority, text=msg.text, ttl=msg.ttl, ) if msg else nb.Message(f"Zone {zone.name}") ) zones[zone.name] = nb.Zone( x=zone.x, y=zone.y, width=zone.width, height=zone.height, scroll_speed=zone.scroll_speed, pause_duration=zone.pause_duration, volume=zone.volume, default_font=zone.default_font, default_color=zone.default_color, initial_text=default_msg, ) messages[zone.name] = default_msg # print(f"{zone.name}: {default_msg.text}") if zones: net_dev.zones(zones) sleep(0.2) for zone, message in messages.items(): net_dev.message(message, zone) sleep(0.2) def create_default_zone(session: Session, device_id: int) -> None: zone = ZoneDB( name="0", netbrite_id=device_id, ) msg = MessageDB( text="{erase}Welcome", ) session.add(msg) session.add(zone) session.commit() session.refresh(zone) session.refresh(msg) zone.default_message_id = msg.id session.commit() # ---------- routes ---------- @app.post("/api/devices", response_model=NetBritePublic) def create_device(device: NetBriteBase, session: SessionDep): if session.exec( select(NetBriteDB).where(NetBriteDB.address == device.address) ).first(): raise HTTPException(400, "Device already exists") db_device = NetBriteDB.model_validate(device) session.add(db_device) session.commit() session.refresh(db_device) create_default_zone(session, db_device.id or 0) load_device(db_device) return db_device @app.get("/api/devices", response_model=list[NetBritePublic]) def get_devices(session: SessionDep): devices: list[NetBritePublic] = [] for device in session.exec(select(NetBriteDB)).all(): device = NetBritePublic.model_validate( device, update={"active": (device.id or 0) in active_devices} ) devices.append(device) return devices @app.post( "/api/devices/{device_id}", response_model=NetBritePublic, description="**NOTE**: this **WILL** disconnect the device. You'll have to manually reconnect it.", ) def edit_device(device_id: int, updated_device: NetBriteUpdate, session: SessionDep): db_dev = session.get(NetBriteDB, device_id) if not db_dev: raise HTTPException(404, "Device not found") if device_id in active_devices: try: active_devices[device_id].sock.close() except OSError: print("Failed to close socket.") del active_devices[device_id] dev_data = updated_device.model_dump(exclude_unset=True) _ = db_dev.sqlmodel_update(dev_data) session.add(db_dev) session.commit() session.refresh(db_dev) return db_dev @app.delete("/api/devices/{device_id}") def delete_device(device_id: int, session: SessionDep): db_dev = session.get(NetBriteDB, device_id) if not db_dev: raise HTTPException(404, "Device not found") delete: list[MessageDB | ZoneDB | NetBriteDB] = [db_dev] for zone in db_dev.zones: if zone.default_message != None: delete.append(zone.default_message) delete.append(zone) if device_id in active_devices: try: active_devices[device_id].sock.close() except OSError: print("Failed to close socket.") del active_devices[device_id] for i in delete: session.delete(i) session.commit() return 200 @app.post("/api/devices/{device_id}/reconnect") def reconnect_device(device_id: int, session: SessionDep): db_dev = session.get(NetBriteDB, device_id) if not db_dev: raise HTTPException(404, "Device not found") try: new_netbrite = nb.NetBrite(db_dev.address, db_dev.port) active_devices[device_id] = new_netbrite load_zones_id(session, device_id, active_devices[device_id]) return 200 except nb.NetbriteConnectionException as exc: raise HTTPException(400, str(exc)) @app.post( "/api/devices/{device_id}/sync", description="**NOTE**: This will recreate the zones on the device.", ) def sync_device( device_id: int, session: SessionDep, ): if device_id not in active_devices: raise HTTPException(500, "Device not active, try reconnecting") try: load_zones_id(session, device_id, active_devices[device_id]) except nb.NetbriteTransferException: del active_devices[device_id] raise HTTPException(500, "Failed to send zones. Device inactive now.") return 200 @app.post( "/api/devices/{device_id}/restart", ) def restart_device( device_id: int, ): if device_id not in active_devices: raise HTTPException(500, "Device not active, try reconnecting") try: active_devices[device_id].reboot() del active_devices[device_id] except nb.NetbriteTransferException: raise HTTPException(500, "Failed to send reboot command. Device inactive now.") return 200 @app.post( "/api/devices/{device_id}/zones", response_model=ZonePublic, description="**NOTE**: this does not update the device.", ) def create_zone(device_id: int, new_zone: ZoneBase, session: SessionDep): device = session.get(NetBriteDB, device_id) if not device: raise HTTPException(404, "Device not found") new_zone_data = new_zone.model_dump(exclude_unset=True) extra_data = {"netbrite_id": device_id} zone = ZoneDB.model_validate(new_zone_data, update=extra_data) msg = MessageDB( text="{erase}Welcome", ) session.add(zone) session.add(msg) session.commit() session.refresh(zone) session.refresh(msg) zone.default_message_id = msg.id session.commit() return zone @app.get("/api/devices/{device_id}/zones", response_model=list[ZonePublic]) def get_zones(device_id: int, session: SessionDep): device = session.get(NetBriteDB, device_id) if not device: raise HTTPException(404, "Device not found") return device.zones @app.delete( "/api/zone/{zone_id}", description="**NOTE**: this does not update the device.", ) def delete_zone( zone_id: int, session: SessionDep, ): zone = session.get(ZoneDB, zone_id) if not zone: raise HTTPException(404, "Zone not found") message = zone.default_message if message: session.delete(message) session.delete(zone) session.commit() return 200 @app.post( "/api/zone/{zone_id}", response_model=ZonePublic, description="**NOTE**: this does not update the device.", ) def edit_zone(zone_id: int, zone: ZoneUpdate, session: SessionDep): zone_db = session.get(ZoneDB, zone_id) if not zone_db: raise HTTPException(404, "Zone not found") zone_update_data = zone.model_dump(exclude_unset=True) print(zone_update_data) _ = zone_db.sqlmodel_update(zone_update_data) session.add(zone_db) session.commit() session.refresh(zone_db) return zone_db @app.post( "/api/zone/{zone_id}/message", response_model=ZonePublic, description="**NOTE**: this does not update the device.", ) def edit_message(zone_id: int, message: MessageUpdate, session: SessionDep): zone_db = session.get(ZoneDB, zone_id) if not zone_db: raise HTTPException(404, "Zone not found") if not zone_db.default_message: db_message = MessageDB.model_validate(message) session.add(db_message) session.commit() session.refresh(db_message) zone_db.default_message_id = db_message.id session.add(zone_db) session.commit() session.refresh(zone_db) else: db_message = zone_db.default_message data = message.model_dump( exclude_unset=True, ) _ = db_message.sqlmodel_update(data, update={"id": db_message.id}) session.add(db_message) session.commit() session.refresh(zone_db) return zone_db @app.post( "/api/zone/{zone_id}/adhoc_message", description="**NOTE**: this updates the device temporarily. Edit the zone message for permanent changes.", ) def adhoc_message(zone_id: int, message: MessageUpdate, session: SessionDep): zone_db = session.get(ZoneDB, zone_id) if not zone_db: raise HTTPException(404, "Zone not found") device_id = zone_db.netbrite_id if not device_id in active_devices: raise HTTPException(500, "Device inactive") nb_message = nb.Message( text=message.text or "", activation_delay=message.activation_delay or 0, display_delay=message.display_delay or 0, display_repeat=message.display_repeat or 0, priority=message.priority or nb.Priorities.OVERRIDE, sound_alarm=message.sound_alarm or False, ttl=message.ttl or 0, ) try: active_devices[device_id].message(nb_message, zone_db.name) except nb.NetbriteTransferException: del active_devices[device_id] raise HTTPException(500, "Failed to send message. Device inactive now.") return 200