Migration to python, still a test script on bottom of script. Everything should be kind of implemented (execept some message colors and stuff)

This commit is contained in:
Jurn Wubben 2025-08-06 16:10:57 +02:00
parent b7c7a2e04c
commit 4e2aeffa8b
9 changed files with 427 additions and 948 deletions

423
netbrite.py Normal file
View file

@ -0,0 +1,423 @@
from typing import Callable
from crc import Calculator, Crc16
from enum import Enum
from socket import SocketType
from struct import pack
import socket
import re
DEFAULT_PORT = 700
class Colors(Enum):
RED = 0x01
GREEN = 0x02
YELLOW = 0x03
class ScrollSpeeds(Enum):
SLOW = 0x00
NORMAL = 0x01
class Priorities(Enum):
OVERRIDE = 0x01
INTERRUPT = 0x02
FOLLOW = 0x03
YIELD = 0x04
ROUNDROBIN = 0x0A
class Fonts(Enum):
NORMAL_7 = 0x01
NORMAL_5 = 0x02
BOLD_7 = 0x05
MONOSPACE = 0x07
calculator = Calculator(Crc16.KERMIT.value)
def checksum(data: bytes) -> int:
return int(calculator.checksum(data))
def pkt_escape(pkt: bytes) -> bytes:
esc = pack("B", 0x10)
buf = bytearray()
for i, byte in enumerate(pkt):
if i > 4 and i < len(pkt) - 4 and byte in [0x10, 0x01, 0x04, 0x17]:
buf.extend(esc)
buf.append(byte)
return bytes(buf)
class Message:
activation_delay: int = 0 # Message activation delay
display_delay: int = 0 # Message display delay
display_repeat: int = 0 # Not sure
priority: Priorities
sound_alarm: bool = False # Beep when message is displayed
text: str
ttl: int = 0 # Self destruct timeout
def __init__(
self,
text: str,
activation_delay: int = 0, # Message activation delay
display_delay: int = 0, # Message display delay
display_repeat: int = 0, # Not sure
priority: Priorities = Priorities.OVERRIDE,
sound_alarm: bool = False, # Beep when message is displayed
ttl: int = 0, # Self destruct timeout
):
self.activation_delay = activation_delay
self.display_delay = display_delay
self.display_repeat = display_repeat
self.priority = priority
self.sound_alarm = sound_alarm
self.text = text
self.ttl = ttl
def parse_msg(self) -> bytes:
msg_bytes = self.text.strip().encode("ascii")
replacements: list[
tuple[bytes, bytes] | tuple[bytes, Callable[[re.Match[bytes]], bytes]]
] = [
(rb"\{scrolloff\}", b"\x10\x14"),
(rb"\{scrollon\}", b"\x10\x15"),
(rb"\{blinkoff\}", b"\x10\x01"),
(rb"\{blinkon\}", b"\x10\x00"),
(rb"\{left\}", b"\x10\x27"),
(rb"\{center\}", b"\x10\x29"),
(rb"\{right\}", b"\x10\x28"),
(rb"\{pause\}", b"\x10\x05"),
(rb"\{erase\}", b"\x10\x03"),
(rb"\{serial\}", b"\x10\x09"),
(rb"\{bell\}", b"\x10\x05"),
(rb"\{red\}", b"\x10\x05" + pack("B", Colors.RED.value)),
(rb"\{green\}", b"\x10\x05" + pack("B", Colors.GREEN.value)),
(rb"\{yellow\}", b"\x10\x05" + pack("B", Colors.YELLOW.value)),
# (
# rb"\{(red|green|yellow)\}",
# lambda m: b"\x10\x0c" + pack("B", Colors[m[1].decode("ascii")].value),
# ),
# (
# rb"\{note\s+(\d+)\s+(\d+)\}",
# lambda m: b"\x10\x11" + pack("<HH", int(m[1]), int(m[2])),
# ),
# (
# rb"\{tune\s+([1-9])(\s+repeat)?\}",
# lambda m: bytes([0x10, 0x0A if m[2] else 0x0B, int(m[1])]),
# ),
# (
# rb"\{font\s+(\S+)\}",
# lambda m: bytes(
# [0x10, 0x0D, Fonts[m[1].decode("ascii").upper()].value]
# ),
# ),
]
for pat, repl in replacements:
msg_bytes = re.sub(pat, repl, msg_bytes, flags=re.IGNORECASE)
return msg_bytes
DEFAULT_TEXT = Message("zone")
class Zone:
id: int
rect: tuple[int, int, int, int]
scroll_speed: ScrollSpeeds
pause_duration: int
volume: int
default_font: Fonts
default_color: Colors
initial_text: Message
def __init__(
self,
x: int,
y: int,
width: int,
height: int,
scroll_speed: ScrollSpeeds = ScrollSpeeds.NORMAL,
pause_duration: int = 1000,
volume: int = 4,
default_font: Fonts = Fonts.NORMAL_7,
default_color: Colors = Colors.RED,
initial_text: Message = DEFAULT_TEXT,
):
self.id = 0
self.rect = (x, y, x + width - 1, y + height - 1)
self.scroll_speed = scroll_speed
self.pause_duration = pause_duration
self.volume = volume
self.default_font = default_font
self.default_color = default_color
self.initial_text = initial_text
class NetBrite:
address: str
port: int
seqno: int = 1
sessno: int = 1
sock: SocketType
zones_list: dict[str, Zone]
def __init__(self, address: str, port: int = DEFAULT_PORT):
self.address = address
self.port = port
self.zones_list = {}
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect()
except OSError as e:
raise ConnectionError(f"Error while opening network socket. {e}")
def connect(self):
self.sock.connect((self.address, self.port))
def tx(self, pkt: bytes):
_ = self.sock.send(pkt)
def message(self, msg: Message, zoneName: str):
z = self.zones_list.get(zoneName)
if z == None:
print("Zone doesn't exist, skipping message.")
return
ztext = msg.parse_msg()
zlen = len(ztext)
if zlen == 4 or zlen == 5:
ztext = pack("2B", 0x10, 0x15) + ztext
zlen += 2
body = pack(
f"<L B H H H H B 2B {zlen}s B",
zlen,
msg.priority.value,
msg.activation_delay,
msg.display_delay,
msg.display_repeat,
msg.ttl,
0xFF if msg.sound_alarm else 0xFE,
0x00,
0x00,
ztext,
0x17,
)
oMaskbytes = z.id / 8
maskbytes = int(oMaskbytes)
if maskbytes != oMaskbytes:
maskbytes += 1
zmask = pack("<H" if z.id > 0xFF else "B", 1 << (z.id - 1))
zmlen = len(zmask)
header = pack(
f"<3B H H 3B 2B B{zmlen}s 2B",
0x16,
0x16,
0x01, # msg start
len(body) + 1,
self.seqno,
0x00,
0x01,
0x00,
0x03,
maskbytes * 8,
0x00,
zmask,
0x02,
0x00, # header end
)
footer = pack("<HB", checksum(header + body), 0x04)
self.tx(pkt_escape(header + body + footer))
def zones(self, zones: dict[str, Zone] | None = None):
if zones != None:
self.zones_list = zones
for zid, zname in enumerate(sorted(self.zones_list.keys())):
zid += 1
self.zones_list[zname].id = zid
z = self.zones_list[zname]
ztext = z.initial_text.parse_msg() # FIXME: parse_msg once implemented
zlen = len(ztext)
print(z.scroll_speed.value)
body = pack(
f"<4B B4B 3B BH 8B B 4B 4B H5B 10B 3B 20BH3B11B{zlen}s B",
0x0F, # Body start
0x00,
0x0E,
0x02,
zid, # zone def
*z.rect,
0x0D, # Scroll speed
z.scroll_speed.value,
0x00,
0x0C, # Pause duration
z.pause_duration,
0x0B,
0xFE,
0x0A,
0xE8,
0x03,
0x09,
0x0E,
0x08, # msg def params
z.volume, # Volume,
0x07, # Font
z.default_font.value,
0x06, # Color
z.default_color.value,
0x05, # Font footer
0x00,
0x00,
0x04,
2012, # yyyy, mo, d, h, min, sec? #TODO: try out new dates and see if it changes shit
2,
10,
19,
21,
33,
0x00,
0x03,
0x00,
0x00,
0x2F,
0x02,
0xFF,
0x10,
0x3A,
0x01, # Def message hdr
zid, # ID
0x00,
0x03,
0x02, # Magic
0x00,
0x00,
0x00,
0x00,
0x00,
0x03,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
0x02,
0x00,
zlen,
0x00,
0x00,
0x04,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
ztext,
0x17,
)
header = pack(
"<3B H H 3B 2B 4B 2B 2B",
0x16, # Msg start
0x16,
0x01,
len(body),
self.seqno,
0x00,
0x01,
0x00,
0x01, # Type = init
0x01,
0x00, # Sign id
0xC8,
0x01,
0x00,
0x00, # Session packet count
self.sessno,
0x04, # Header end
0x00,
)
crc = checksum(header + body)
footer = pack("<HB", crc, 0x04)
self.tx(pkt_escape(header + body + footer))
print(f"Sent zone {zname}")
self.seqno += 1
self.sessno += 1
_ = None
netbrt = NetBrite("10.65.37.244")
zones = [
# Zone(
# 1,
# 0,
# 59,
# 7,
# initial_text=Message("{erase}{scrolloff}{left}Welcome to:"),
# default_color=Colors.RED,
# ),
# Zone(
# 73,
# 1,
# 35,
# 7,
# initial_text=Message("{erase}{scrolloff}ING R&D"),
# default_color=Colors.YELLOW,
# ),
# Zone(
# 110,
# 1,
# 10,
# 7,
# initial_text=Message("{erase}{scrolloff}HQ"),
# default_color=Colors.GREEN,
# ),
Zone(0, 0, 150, 7, initial_text=Message("{scrolloff}."))
]
netbrt.zones({str(k): v for k, v in enumerate(zones)})
from time import sleep, time
from datetime import datetime
while True:
now = datetime.now()
netbrt.message(
Message("{scrolloff}" + f"{now.hour}:{now.minute}:{now.second}"),
"0",
)
sleep(1)