from typing import Callable from crc import Calculator, Crc16 from enum import Enum from socket import SocketType from struct import pack import socket import re DEFAULT_PORT = 700 class Colors(Enum): RED = 0x01 GREEN = 0x02 YELLOW = 0x03 class ScrollSpeeds(Enum): SLOW = 0x00 NORMAL = 0x01 class Priorities(Enum): OVERRIDE = 0x01 INTERRUPT = 0x02 FOLLOW = 0x03 YIELD = 0x04 ROUNDROBIN = 0x0A class Fonts(Enum): NORMAL_7 = 0x01 NORMAL_5 = 0x02 BOLD_7 = 0x05 MONOSPACE = 0x07 calculator = Calculator(Crc16.KERMIT.value) def checksum(data: bytes) -> int: return int(calculator.checksum(data)) def pkt_escape(pkt: bytes) -> bytes: esc = pack("B", 0x10) buf = bytearray() for i, byte in enumerate(pkt): if i > 4 and i < len(pkt) - 4 and byte in [0x10, 0x01, 0x04, 0x17]: buf.extend(esc) buf.append(byte) return bytes(buf) COLORS = [i.name.lower() for i in Colors] COLORS_PATTERH = rb"\{(" + "|".join(COLORS).encode("ascii") + rb")\}" class Message: activation_delay: int = 0 # Message activation delay display_delay: int = 0 # Message display delay display_repeat: int = 0 # Not sure priority: Priorities sound_alarm: bool = False # Beep when message is displayed text: str ttl: int = 0 # Self destruct timeout def __init__( self, text: str, activation_delay: int = 0, # Message activation delay display_delay: int = 0, # Message display delay display_repeat: int = 0, # Not sure priority: Priorities = Priorities.OVERRIDE, sound_alarm: bool = False, # Beep when message is displayed ttl: int = 0, # Self destruct timeout ): self.activation_delay = activation_delay self.display_delay = display_delay self.display_repeat = display_repeat self.priority = priority self.sound_alarm = sound_alarm self.text = text self.ttl = ttl def parse_msg(self) -> bytes: msg_bytes = self.text.strip().encode("ascii") replacements: list[ tuple[bytes, bytes] | tuple[bytes, Callable[[re.Match[bytes]], bytes]] ] = [ (rb"\{scrolloff\}", b"\x10\x14"), (rb"\{scrollon\}", b"\x10\x15"), (rb"\{blinkoff\}", b"\x10\x01"), (rb"\{blinkon\}", b"\x10\x00"), (rb"\{left\}", b"\x10\x27"), (rb"\{center\}", b"\x10\x29"), (rb"\{right\}", b"\x10\x28"), (rb"\{pause\}", b"\x10\x05"), (rb"\{erase\}", b"\x10\x03"), (rb"\{serial\}", b"\x10\x09"), (rb"\{bell\}", b"\x10\x05"), (rb"\{red\}", b"\x10\x0c" + pack("B", Colors.RED.value)), (rb"\{green\}", b"\x10\x0c" + pack("B", Colors.GREEN.value)), (rb"\{yellow\}", b"\x10\x0c" + pack("B", Colors.YELLOW.value)), ( COLORS_PATTERH, lambda m: b"\x10\x0c" + pack("B", Colors[m[1].decode("ascii")].value), ), ( rb"\{note\s+(\d+)\s+(\d+)\}", lambda m: b"\x10\x11" + pack(" 0xFF else "B", 1 << (z.id - 1)) zmlen = len(zmask) header = pack( f"<3B H H 3B 2B B{zmlen}s 2B", 0x16, 0x16, 0x01, # msg start len(body) + 1, self.seqno, 0x00, 0x01, 0x00, 0x03, maskbytes * 8, 0x00, zmask, 0x02, 0x00, # header end ) footer = pack(" str: # return str(num).rjust(2, "0") # # # while True: # now = datetime.now() # netbrt.message( # Message("{scrolloff}{left}" + f"{z(now.hour)}:{z(now.minute)}:{z(now.second)}"), # "1", # ) # # t = time() # sleep(1 - (t - int(t)))