signcontrol/netbrite.py

509 lines
13 KiB
Python

import select
from typing import Callable
from crc import Calculator, Crc16
from enum import Enum
from socket import SocketType
from struct import pack
import socket
import re
DEFAULT_PORT = 700
class NetbriteConnectionException(Exception):
pass
class NetbriteTransferException(Exception):
pass
class Colors(Enum):
RED = 0x01
GREEN = 0x02
YELLOW = 0x03
class ScrollSpeeds(Enum):
SLOW = 0x00
NORMAL = 0x01
class Priorities(Enum):
OVERRIDE = 0x01
INTERRUPT = 0x02
FOLLOW = 0x03
YIELD = 0x04
ROUNDROBIN = 0x0A
class Fonts(Enum):
NORMAL_7 = 0x01
NORMAL_5 = 0x02
BOLD_7 = 0x05
MONOSPACE = 0x07
calculator = Calculator(Crc16.KERMIT.value)
def checksum(data: bytes) -> int:
return int(calculator.checksum(data))
def pkt_escape(pkt: bytes) -> bytes:
esc = pack("B", 0x10)
buf = bytearray()
for i, byte in enumerate(pkt):
if i > 4 and i < len(pkt) - 4 and byte in [0x10, 0x01, 0x04, 0x17]:
buf.extend(esc)
buf.append(byte)
return bytes(buf)
def is_socket_alive(sock: SocketType):
readable, _, exceptional = select.select([sock], [], [sock], 1)
if sock in exceptional:
return False
return bool(readable)
COLORS = [i.name.lower() for i in Colors]
COLORS_PATTERH = rb"\{(" + "|".join(COLORS).encode("ascii") + rb")\}"
class Message:
activation_delay: int = 0 # Message activation delay
display_delay: int = 0 # Message display delay
display_repeat: int = 0 # Not sure
priority: Priorities
sound_alarm: bool = False # Beep when message is displayed
text: str
ttl: int = 0 # Self destruct timeout
def __init__(
self,
text: str,
activation_delay: int = 0, # Message activation delay
display_delay: int = 0, # Message display delay
display_repeat: int = 0, # Not sure
priority: Priorities = Priorities.OVERRIDE,
sound_alarm: bool = False, # Beep when message is displayed
ttl: int = 0, # Self destruct timeout
):
self.activation_delay = activation_delay
self.display_delay = display_delay
self.display_repeat = display_repeat
self.priority = priority
self.sound_alarm = sound_alarm
self.text = text
self.ttl = ttl
def parse_msg(self) -> bytes:
msg_bytes = self.text.strip().encode("ascii")
replacements: list[
tuple[bytes, bytes] | tuple[bytes, Callable[[re.Match[bytes]], bytes]]
] = [
(rb"\{scrolloff\}", b"\x10\x14"),
(rb"\{scrollon\}", b"\x10\x15"),
(rb"\{blinkoff\}", b"\x10\x01"),
(rb"\{blinkon\}", b"\x10\x00"),
(rb"\{left\}", b"\x10\x27"),
(rb"\{center\}", b"\x10\x29"),
(rb"\{right\}", b"\x10\x28"),
(rb"\{pause\}", b"\x10\x05"),
(rb"\{erase\}", b"\x10\x03"),
(rb"\{serialnum\}", b"\x10\x09"),
(rb"\{bell\}", b"\x10\x05"),
(rb"\{red\}", b"\x10\x0c" + pack("B", Colors.RED.value)),
(rb"\{green\}", b"\x10\x0c" + pack("B", Colors.GREEN.value)),
(rb"\{yellow\}", b"\x10\x0c" + pack("B", Colors.YELLOW.value)),
(
COLORS_PATTERH,
lambda m: b"\x10\x0c" + pack("B", Colors[m[1].decode("ascii")].value),
),
(
rb"\{note\s+(\d+)\s+(\d+)\}",
lambda m: b"\x10\x11" + pack("<HH", int(m[1]), int(m[2])),
),
(
rb"\{tune\s+([1-9])(\s+repeat)?\}",
lambda m: bytes([0x10, 0x0A if m[2] else 0x0B, int(m[1])]),
),
(
rb"\{font\s+(\S+)\}",
lambda m: bytes(
[0x10, 0x0D, Fonts[m[1].decode("ascii").upper()].value]
),
),
]
for pat, repl in replacements:
msg_bytes = re.sub(pat, repl, msg_bytes, flags=re.IGNORECASE)
return msg_bytes
DEFAULT_TEXT = Message("zone")
class Zone:
id: int
rect: tuple[int, int, int, int]
scroll_speed: ScrollSpeeds
pause_duration: int
volume: int
default_font: Fonts
default_color: Colors
initial_text: Message
def __init__(
self,
x: int,
y: int,
width: int,
height: int,
scroll_speed: ScrollSpeeds = ScrollSpeeds.NORMAL,
pause_duration: int = 1000,
volume: int = 4,
default_font: Fonts = Fonts.NORMAL_7,
default_color: Colors = Colors.RED,
initial_text: Message = DEFAULT_TEXT,
):
self.id = 0
self.rect = (x, y, x + width - 1, y + height - 1)
self.scroll_speed = scroll_speed
self.pause_duration = pause_duration
self.volume = volume
self.default_font = default_font
self.default_color = default_color
self.initial_text = initial_text
class NetBrite:
address: str
port: int
seqno: int = 1
sessno: int = 1
sock: SocketType
zones_list: dict[str, Zone]
def __init__(self, address: str, port: int = DEFAULT_PORT):
self.address = address
self.port = port
self.zones_list = {}
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(2)
self.connect()
except OSError as e:
raise NetbriteConnectionException(
f"Error while opening network socket. {e}"
)
def connect(self):
try:
if not is_socket_alive(self.sock):
raise OSError("Socket dead")
self.sock.connect((self.address, self.port))
except OSError as e:
raise NetbriteConnectionException(
f"Error while opening network socket. {e}"
)
def tx(self, pkt: bytes):
try:
_ = self.sock.sendall(pkt_escape(pkt))
except OSError as e:
raise NetbriteTransferException(f"Error while opening network socket. {e}")
def reboot(self):
pkt = pack(
f"<3B H H 3B 2B 4B 4B 1B",
0x16,
0x16,
0x01, # msg start
2, # body length
self.seqno, # packet count
0x00,
0x01,
0x00,
0x01,
0x01, # msg type: reset
0x00,
0xC8,
0x01,
0x00, # sign id
0x00,
0x01,
0x0F,
0x00, # reset msg
0x17, # crc follows
)
pkt += pack("<HB", checksum(pkt), 0x04)
self.tx(pkt)
def message(self, msg: Message, zoneName: str):
z = self.zones_list.get(zoneName)
if z == None:
print("Zone doesn't exist, skipping message.")
return
ztext = msg.parse_msg()
zlen = len(ztext)
if zlen == 4 or zlen == 5:
ztext = pack("2B", 0x10, 0x15) + ztext
zlen += 2
body = pack(
f"<L B H H H H B 2B {zlen}s B",
zlen,
msg.priority.value,
msg.activation_delay,
msg.display_delay,
msg.display_repeat,
msg.ttl,
0xFF if msg.sound_alarm else 0xFE,
0x00,
0x00,
ztext,
0x17,
)
oMaskbytes = z.id / 8
maskbytes = int(oMaskbytes)
if maskbytes != oMaskbytes:
maskbytes += 1
zmask = pack("<H" if z.id > 0xFF else "B", 1 << (z.id - 1))
zmlen = len(zmask)
header = pack(
f"<3B H H 3B 2B B{zmlen}s 2B",
0x16,
0x16,
0x01, # msg start
len(body) + 1,
self.seqno,
0x00,
0x01,
0x00,
0x03,
maskbytes * 8,
0x00,
zmask,
0x02,
0x00, # header end
)
footer = pack("<HB", checksum(header + body), 0x04)
self.tx(header + body + footer)
# print(f"Sent message to zone {zoneName}")
def zones(self, zones: dict[str, Zone] | None = None):
if zones != None:
self.zones_list = zones
for zid, zname in enumerate(sorted(self.zones_list.keys())):
zid += 1
self.zones_list[zname].id = zid
z = self.zones_list[zname]
ztext = z.initial_text.parse_msg()
zlen = len(ztext)
rect: list[int] = list(z.rect)
if rect[0] > 254:
rect[0] = 254
if rect[1] > 254:
rect[1] = 254
if rect[0] >= rect[2]:
rect[2] = rect[0] + 1
if rect[1] >= rect[3]:
rect[3] = rect[1] + 1
z.rect = tuple(rect) # pyright: ignore[reportAttributeAccessIssue]
body = pack(
f"<4B B4B 3B BH 8B B 4B 4B H5B 10B 3B 20BH3B11B{zlen}s B",
0x0F, # Body start
0x00,
0x0E,
0x02,
zid, # zone def
*z.rect,
0x0D, # Scroll speed
z.scroll_speed.value,
0x00,
0x0C, # Pause duration
z.pause_duration,
0x0B,
0xFE,
0x0A,
0xE8,
0x03,
0x09,
0x0E,
0x08, # msg def params
z.volume, # Volume,
0x07, # Font
z.default_font.value,
0x06, # Color
z.default_color.value,
0x05, # Font footer
0x00,
0x00,
0x04,
2012, # yyyy, mo, d, h, min, sec? #TODO: try out new dates and see if it changes shit
2,
10,
19,
21,
33,
0x00,
0x03,
0x00,
0x00,
0x2F,
0x02,
0xFF,
0x10,
0x3A,
0x01, # Def message hdr
zid, # ID
0x00,
0x03,
0x02, # Magic
0x00,
0x00,
0x00,
0x00,
0x00,
0x03,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
0x02,
0x00,
zlen,
0x00,
0x00,
0x04,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
ztext,
0x17,
)
header = pack(
"<3B H H 3B 2B 4B 2B 2B",
0x16, # Msg start
0x16,
0x01,
len(body),
self.seqno,
0x00,
0x01,
0x00,
0x01, # Type = init
0x01,
0x00, # Sign id
0xC8,
0x01,
0x00,
0x00, # Session packet count
self.sessno,
0x04, # Header end
0x00,
)
crc = checksum(header + body)
footer = pack("<HB", crc, 0x04)
self.tx(header + body + footer)
print(f"Sent zone {zname}")
self.seqno += 1
self.sessno += 1
# _ = None
# netbrt = NetBrite("10.65.37.244")
# zones = [
# Zone(
# 1,
# 0,
# 50,
# 7,
# initial_text=Message("{erase}{scrollon}{left}ING R&D"),
# default_color=Colors.YELLOW,
# default_font=Fonts.BOLD_7,
# scroll_speed=ScrollSpeeds.SLOW,
# ),
# # Zone(
# # 47,
# # 0,
# # 15,
# # 7,
# # initial_text=Message("{erase}{scrolloff}HQ"),
# # default_color=Colors.GREEN,
# # default_font=Fonts.BOLD_7,
# # ),
# Zone(
# 80,
# 1,
# 59,
# 6,
# initial_text=Message("{erase}{scrolloff}{left}Loading..."),
# default_color=Colors.RED,
# default_font=Fonts.NORMAL_5,
# ),
# ]
# netbrt.zones({str(k): v for k, v in enumerate(zones)})
#
# from time import sleep, time
# from datetime import datetime
#
#
# def z(num: int) -> str:
# return str(num).rjust(2, "0")
#
#
# while True:
# now = datetime.now()
# netbrt.message(
# Message("{scrolloff}{left}" + f"{z(now.hour)}:{z(now.minute)}:{z(now.second)}"),
# "1",
# )
#
# t = time()
# sleep(1 - (t - int(t)))