import select from typing import Callable from crc import Calculator, Crc16 from enum import Enum from socket import SocketType from struct import pack import socket import re DEFAULT_PORT = 700 class NetbriteConnectionException(Exception): pass class NetbriteTransferException(Exception): pass class Colors(Enum): RED = 0x01 GREEN = 0x02 YELLOW = 0x03 class ScrollSpeeds(Enum): SLOW = 0x00 NORMAL = 0x01 class Priorities(Enum): OVERRIDE = 0x01 INTERRUPT = 0x02 FOLLOW = 0x03 YIELD = 0x04 ROUNDROBIN = 0x0A class Fonts(Enum): NORMAL_7 = 0x01 NORMAL_5 = 0x02 BOLD_7 = 0x05 MONOSPACE = 0x07 calculator = Calculator(Crc16.KERMIT.value) def checksum(data: bytes) -> int: return int(calculator.checksum(data)) def pkt_escape(pkt: bytes) -> bytes: esc = pack("B", 0x10) buf = bytearray() for i, byte in enumerate(pkt): if i > 4 and i < len(pkt) - 4 and byte in [0x10, 0x01, 0x04, 0x17]: buf.extend(esc) buf.append(byte) return bytes(buf) def is_socket_alive(sock: SocketType): readable, _, exceptional = select.select([sock], [], [sock], 1) if sock in exceptional: return False return bool(readable) COLORS = [i.name.lower() for i in Colors] COLORS_PATTERH = rb"\{(" + "|".join(COLORS).encode("ascii") + rb")\}" class Message: activation_delay: int = 0 # Message activation delay display_delay: int = 0 # Message display delay display_repeat: int = 0 # Not sure priority: Priorities sound_alarm: bool = False # Beep when message is displayed text: str ttl: int = 0 # Self destruct timeout def __init__( self, text: str, activation_delay: int = 0, # Message activation delay display_delay: int = 0, # Message display delay display_repeat: int = 0, # Not sure priority: Priorities = Priorities.OVERRIDE, sound_alarm: bool = False, # Beep when message is displayed ttl: int = 0, # Self destruct timeout ): self.activation_delay = activation_delay self.display_delay = display_delay self.display_repeat = display_repeat self.priority = priority self.sound_alarm = sound_alarm self.text = text self.ttl = ttl def parse_msg(self) -> bytes: msg_bytes = self.text.strip().encode("ascii") replacements: list[ tuple[bytes, bytes] | tuple[bytes, Callable[[re.Match[bytes]], bytes]] ] = [ (rb"\{scrolloff\}", b"\x10\x14"), (rb"\{scrollon\}", b"\x10\x15"), (rb"\{blinkoff\}", b"\x10\x01"), (rb"\{blinkon\}", b"\x10\x00"), (rb"\{left\}", b"\x10\x27"), (rb"\{center\}", b"\x10\x29"), (rb"\{right\}", b"\x10\x28"), (rb"\{pause\}", b"\x10\x05"), (rb"\{erase\}", b"\x10\x03"), (rb"\{serialnum\}", b"\x10\x09"), (rb"\{bell\}", b"\x10\x05"), (rb"\{red\}", b"\x10\x0c" + pack("B", Colors.RED.value)), (rb"\{green\}", b"\x10\x0c" + pack("B", Colors.GREEN.value)), (rb"\{yellow\}", b"\x10\x0c" + pack("B", Colors.YELLOW.value)), ( COLORS_PATTERH, lambda m: b"\x10\x0c" + pack("B", Colors[m[1].decode("ascii")].value), ), ( rb"\{note\s+(\d+)\s+(\d+)\}", lambda m: b"\x10\x11" + pack(" 0xFF else "B", 1 << (z.id - 1)) zmlen = len(zmask) header = pack( f"<3B H H 3B 2B B{zmlen}s 2B", 0x16, 0x16, 0x01, # msg start len(body) + 1, self.seqno, 0x00, 0x01, 0x00, 0x03, maskbytes * 8, 0x00, zmask, 0x02, 0x00, # header end ) footer = pack(" 254: rect[0] = 254 if rect[1] > 254: rect[1] = 254 if rect[0] >= rect[2]: rect[2] = rect[0] + 1 if rect[1] >= rect[3]: rect[3] = rect[1] + 1 z.rect = tuple(rect) # pyright: ignore[reportAttributeAccessIssue] body = pack( f"<4B B4B 3B BH 8B B 4B 4B H5B 10B 3B 20BH3B11B{zlen}s B", 0x0F, # Body start 0x00, 0x0E, 0x02, zid, # zone def *z.rect, 0x0D, # Scroll speed z.scroll_speed.value, 0x00, 0x0C, # Pause duration z.pause_duration, 0x0B, 0xFE, 0x0A, 0xE8, 0x03, 0x09, 0x0E, 0x08, # msg def params z.volume, # Volume, 0x07, # Font z.default_font.value, 0x06, # Color z.default_color.value, 0x05, # Font footer 0x00, 0x00, 0x04, 2012, # yyyy, mo, d, h, min, sec? #TODO: try out new dates and see if it changes shit 2, 10, 19, 21, 33, 0x00, 0x03, 0x00, 0x00, 0x2F, 0x02, 0xFF, 0x10, 0x3A, 0x01, # Def message hdr zid, # ID 0x00, 0x03, 0x02, # Magic 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFE, 0x7E, 0x00, 0x02, 0x00, zlen, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFE, 0x7E, 0x00, ztext, 0x17, ) header = pack( "<3B H H 3B 2B 4B 2B 2B", 0x16, # Msg start 0x16, 0x01, len(body), self.seqno, 0x00, 0x01, 0x00, 0x01, # Type = init 0x01, 0x00, # Sign id 0xC8, 0x01, 0x00, 0x00, # Session packet count self.sessno, 0x04, # Header end 0x00, ) crc = checksum(header + body) footer = pack(" str: # return str(num).rjust(2, "0") # # # while True: # now = datetime.now() # netbrt.message( # Message("{scrolloff}{left}" + f"{z(now.hour)}:{z(now.minute)}:{z(now.second)}"), # "1", # ) # # t = time() # sleep(1 - (t - int(t)))