Migration to python, still a test script on bottom of script. Everything should be kind of implemented (execept some message colors and stuff)

This commit is contained in:
Jurn Wubben 2025-08-06 16:10:57 +02:00
parent b7c7a2e04c
commit 6291836bf8
9 changed files with 427 additions and 948 deletions

21
.gitignore vendored
View file

@ -1,22 +1 @@
*.bak
*.old
*.tmp
*.tar.gz
*.rej
*.orig
*~
/Build
/Build.bat
/Makefile
/_build
/blib
/cover_db
/pm_to_blib
/PM_to_blib
/META.*
/MYMETA.*
/*.tar.gz
*/Module-Build-*
/LICENSE
/README
.direnv

View file

@ -1,10 +0,0 @@
use Module::Build;
Module::Build->new(
module_name => 'Net::Symon::NetBrite',
license => 'perl',
requires => {
perl => '5.6.1',
'Digest::CRC' => '0.18',
'IO::Socket::INET' => '1.31',
},
)->create_build_script();

View file

@ -1,15 +0,0 @@
use strict;
use warnings;
use ExtUtils::MakeMaker;
WriteMakefile(
NAME => 'SignControl',
VERSION => '1.0',
EXE_FILES => ['signcontrol'],
PREREQ_PM => {
perl => '5.6.1',
'Digest::CRC' => '0.18',
'IO::Socket::INET' => '1.31',
},
INSTALLDIRS => 'site',
);

143
README.md
View file

@ -1,143 +0,0 @@
# SignControl
SignControl is a cli utility that controls Symon NetBrite LED signs. Huge thanks
to [kevinbosak](https://github.com/kevinbosak/Net-Symon-Netbrite).
## Installation Instructions
1. **Set Up Network:** Ensure your device is on the same subnet as the sign. For
example, add an IP address to your Ethernet interface:
```sh
sudo ip addr add 10.164.3.86/24 dev enp0s25 # Replace with your interface
```
This configures your device to communicate with the sign.
### Debian
I'm not completely sure about this one, I don't use debian.
2. **Install Perl Dependencies:** On a Debian-based system, install the required
packages:
```sh
sudo apt install libmodule-build-perl libdigest-crc-perl
```
3. **Build and Install the Module:**
```sh
perl Build.PL ./Build installdeps # Installs any additional dependencies
./Build manifest # Generates the MANIFEST file
```
These steps prepare the module for use. If you're building from source,
ensure you have Perl 5.10 or later.
### NixOS
You should know this one.
```sh
nix run github:jsw08/signcontrol -- parameters
```
## Usage
To send messages to the sign, edit and run the provided script (e.g.,
`signboard`). Example:
```sh
signboard --address 10.164.3.87 --zone zone1="x=0,y=0,width=20,height=5" --zone
zone2="x=10,y=10,width=10,height=10" --message zone1="Hello {scrolloff}"
--message zone2="World {red}"
```
- Replace `10.164.3.87` with your sign's IP address.
- Define zones with `--zone zonename="x=0,y=0,width=20,height=5"`.
- Provide messages with `--message zonename="Text with formatting"`.
- I reccommend adding `{erase}` to the beginning of your first message. This
will prevent glitches when changing the text.
### Flags
- `--zone zonename=""`: See the heading zones below.
- `--message zonename=""`: string with formatting options (see heading below).
- `--address`: IP Address
- `--priority`: The message priority determines how the new message will replace
an existing one. The default is PRI_OVERRIDE, but can also be PRI_FOLLOW,
PRI_INTERRUPT, PRI_YIELD or PRI_ROUNDROBIN.
- `--"activation_delay`: Message activation delay in milliseconds. Default is 0.
- `--display_delay`: Message display delay in milliseconds. Default is 0.
- `--display_repeat`: Not really sure. The default is 0.
- `--ttl`: The message will self-destruct in x seconds. Default is 0.
- `--sound_alarm`: If true, the sign will beep when the message is displayed.
- `--reset`: resets the device before sending the message.
### Message Formatting
Messages support inline markup for dynamic effects. Include these tags directly
in your message text:
- **Scrolling**: `{scrolloff}` (turns off), `{scrollon}` (turns on).
- **Blinking**: `{blinkon}` (turns on), `{blinkoff}` (turns off).
- **Color**: `{red}`, `{green}`, `{yellow}`.
- **Alignment**: `{left}`, `{center}`, `{right}`.
- **Pause**: `{pause}` (pauses display).
- **Erase**: `{erase}` (clears content).
- **Serial Number**: `{serial}` (inserts sign's MAC address).
- **Beep**: `{bell}` (It _should_ beep, but when I tried it it paused the text
at the end).
- **Note**: `{note [pitch] [duration]}` (e.g., `{note 100 500}` for a 100 Hz
tone lasting 500 ms).
- **Tune**: `{tune [1-9] ["repeat"]}` (e.g., `{tune 9}` for Charge!; add
"repeat" for looping).
- **Font**: `{font [font_name]}` (switches font; see below).
### Available Fonts
The following fonts can be used with `{font [font_name]}`:
- monospace_7
- monospace_16
- monospace_24
- proportional_7
- proportional_5
- proportional_9
- proportional_11
- bold_proportional_7
- bold_proportional_11
- script_16
- picture_24
Note: Ensure the font fits within the zone's height to avoid issues.
### Zone Parameters
To create a zone, add the `--zone` flag with the parameters in the following
format:
```
--zone name="param1=0,param2=0"
```
- `x`: The X coordinate to start the zone, required.
- `y`: The Y coordinate to start the zone, required.
- `width`: The zone width, required.
- `height`: The zone height, required.
- `scroll_speed`: The speed of scrolling text in the zone. The default is
`SCROLL_MED`, but can also be `SCROLL_SLOW` or `SCROLL_FAST`.
- `pause_duration`: The duration in milliseconds of any pause sequences in the
message text.
- `volume`: The volume of beeps, notes, alarms, and other noises. Valid range is
0 (off) to 8 (deadly). Default is 4.
- `default_font`: The default font. See 'Available Fonts'
- `default_color`: The default color. Can be `COLOR_RED`, `COLOR_GREEN`, or
`COLOR_YELLOW`. The default is red.
- `initial_text`: The text initially displayed in the zone. This is just "." by
default.

View file

@ -34,6 +34,10 @@
x.DigestCRC
x.ModuleBuild
]))
pkgs.entr
(pkgs.python3.withPackages (x: [x.crc]))
];
};
};

View file

@ -1,537 +0,0 @@
package Net::Symon::NetBrite;
our $VERSION = '0.01';
=head1 NAME
Net::Symon::NetBrite - Talk to Symon NetBrite LED signs
=head1 SYNOPSIS
use Net::Symon::NetBrite qw(:constants);
use Net::Symon::NetBrite::Zone;
my $sign = new Net::Symon::NetBrite(
address => '192.168.34.56',
);
my $zone = new Net::Symon::NetBrite::Zone(
rect => [0, 0, 200, 24],
default_font => 'monospace_16',
);
$sign->zones(
myzone => $zone,
);
$sign->message('myzone', '{green}west philadelphia {red}born and raised');
$sign->reboot();
=head1 DESCRIPTION
Do you have a bunch of Symon NetBrite II signs laying around from a
company you acquired that had more money than sense? So do we!
=cut
use IO::Socket::INET;
use Digest::CRC;
use Carp;
require Exporter;
use constant {
COLOR_RED => 0x01,
COLOR_GREEN => 0x02,
COLOR_YELLOW => 0x03,
SCROLL_SLOW => 0x01,
SCROLL_MED => 0x02,
SCROLL_FAST => 0x03,
PRI_OVERRIDE => 0x01,
PRI_INTERRUPT => 0x02,
PRI_FOLLOW => 0x03,
PRI_YIELD => 0x04,
PRI_ROUNDROBIN => 0x0a,
};
our %fonts = (
monospace_16 => 0x00,
proportional_7 => 0x01,
proportional_5 => 0x02,
proportional_11 => 0x03,
monospace_24 => 0x04,
bold_proportional_7 => 0x05,
bold_proportional_11 => 0x06,
monospace_7 => 0x07,
script_16 => 0x08,
proportional_9 => 0x09,
picture_24 => 0x0a,
);
our @ISA = qw(Exporter);
my @consts = qw( COLOR_RED COLOR_GREEN COLOR_YELLOW
SCROLL_SLOW SCROLL_MED SCROLL_FAST
PRI_OVERRIDE PRI_INTERRUPT PRI_FOLLOW PRI_YIELD PRI_ROUNDROBIN);
our @EXPORT_OK = @consts;
our %EXPORT_TAGS = ( constants => \@consts );
=head1 METHODS
=head2 new()
Creates a new instance, which handles a single sign. The following
parameters are accepted:
=over
=item address
The address of the sign.
=item port
(optional) The destination port. Defaults to 700.
=item callback
If supplied, no socket will be created. Instead, the supplied coderef
will be called with a single argument whenever data needs to be sent.
This is intended for use with an external framework like L<POE|POE>.
=back
=cut
sub new {
my ($class, %data) = @_;
my $self = {};
if ($data{callback}) {
$self->{callback} = $data{callback};
} elsif ($data{address}) {
$self->{addr} = $data{address};
$self->{port} = $data{port} || 700;
} else {
croak 'Either address or callback must be supplied';
}
$self->{seqno} = 0;
$self->{sessno} = 0;
bless($self, $class);
return $self;
}
sub pkt_escape {
my $pkt = shift;
my $esc = pack('C', 0x10);
my $buf;
for (my $i = 0; $i < length $pkt; $i++) {
my $byte = unpack("x[$i]C", $pkt);
if ($i > 4 && $i < length($pkt) - 4 && ($byte == 0x10 || $byte == 0x01 || $byte == 0x04 || $byte == 0x17)) {
$buf .= $esc;
}
$buf .= pack('C', $byte);
}
return $buf;
}
sub crc { return Digest::CRC::crc(shift, 16, 0x0000, 0x0000, 1, 0x1021, 1, 0) }
sub tx {
my ($self, $pkt) = @_;
if (defined $self->{callback}) {
$self->{callback}->($pkt);
} else {
$self->{sock}->send($pkt);
$self->{sock}->flush();
}
}
sub connect {
my $self = shift;
$self->{sock} = IO::Socket::INET->new(
PeerAddr => $self->{addr},
PeerPort => 700,
Proto => 'tcp',
);
unless (defined $self->{sock}) {
croak "Socket: $!";
}
$self->{sessno} = 0;
#$self->zones();
}
=head2 zones()
Updates the list of zones associated with the sign. Any existing
zones are replaced. The zones will be sent to the sign immediately
and upon every successful reconnection.
The only parameter is a hash, in which the keys are zone names and the
values are L<Net::Symon::NetBrite::Zone|Net::Symon::NetBrite::Zone>
objects.
If called without a list of zones, the last provided zones will be
sent to the sign again.
=cut
sub zones {
my ($self, %zones) = @_;
if (%zones) {
$self->{zones} = \%zones;
} elsif (!defined $self->{zones}) {
return undef;
}
if (!defined $self->{sock}) {
$self->connect();
}
my $zid = 1;
foreach my $zname (sort { $a cmp $b } keys %{$self->{zones}}) {
my $z = $self->{zones}->{$zname};
$z->id($zid);
my $ztext = &parse_msg($z->get('initial_text'));
my $zlen = length $ztext;
my $body = pack("C4 CC4 C3 Cv C8 C C4 C4 vC5 C10 C3 C20vC3C11A[$zlen] C",
0x0f, 0x00, 0x0e, 0x02, # body start
$zid, @{$z->get('rect')}, # zone def
0x0d, $z->get('scroll_speed'), 0x00, # scroll rate
0x0c, $z->get('pause_duration'), # pause duration
0x0b, 0xfe, 0x0a, 0xe8, 0x03, 0x09, 0x0e, 0x08, # msg def params
$z->get('volume'), # volume (0-8)
0x07, $fonts{$z->get('default_font')}, 0x06, $z->get('default_color'), # default font
0x05, 0x00, 0x00, 0x04, # font footer
2012, 2, 10, 19, 21, 33, # timestamp: yyyy, mo, d, h, min, sec?
# tfmt
0x00, 0x03, 0x00, 0x00, 0x2f, 0x02, 0xff, 0x10, 0x3a, 0x01, # def message hdr
$zid, 0x00, 0x03,
# it's magic, i ain't gotta explain shit
#0x0e, 0x00,
0x02, 0x00, 0x00, 0x00,
0x00, 0x00, 0x03, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xfe,
0x7e, 0x00, 0x02, 0x00,
$zlen, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0xfe, 0x7e, 0x00, $ztext,
0x17, # body end
);
my $header = pack('C3 v v C3 C2 C4 C2 C2',
0x16, 0x16, 0x01, # msg start
length($body),
++$self->{seqno},
0x00, 0x01, 0x00,
0x01, 0x01, # type = init
0x00, 0xc8, 0x01, 0x00, # sign id
0x00, ++$self->{sessno}, # session packet count
0x04, 0x00, # header end
);
my $footer = pack('vC',
crc($header.$body),
0x04, # msg end
);
$self->tx(&pkt_escape($header.$body.$footer));
print "sent zone $zid with seqno ", $self->{seqno}, " sessno ", $self->{sessno}, "\n";
$zid++;
}
}
=head2 message()
Sends a message to a zone. Accepts the zone name, message text and an
optional hashref containing any of the following parameters:
=over
=item priority
The message priority determines how the new message will replace an
existing one. The default is C<PRI_FOLLOW>, but can also be
C<PRI_OVERRIDE>, C<PRI_INTERRUPT>, C<PRI_YIELD> or C<PRI_ROUNDROBIN>.
=item activation_delay
Message activation delay in milliseconds. Default is 0.
=item display_deplay
Message display delay in milliseconds. Default is 0.
=item display_repeat
Not really sure. The default is 0.
=item ttl
The message will self-destruct in C<ttl> seconds. Default is 0.
=item sound_alarm
If true, the sign will beep when the message is displayed.
=back
=cut
sub message {
my ($self, $zname, $text, $param) = @_;
my $z = $self->{zones}->{$zname};
unless ($z) {
return undef;
}
print "sending to zone $zname with id ", $z->id, "\n";
my $ztext = &parse_msg($text);
my $zlen = length $ztext;
# uck
if ($zlen == 4 || $zlen == 5) {
$ztext = pack('C2', 0x10, 0x15).$ztext;
$zlen += 2;
}
my $body = pack("V C v v v v C C2 A[$zlen] C",
$zlen,
$param->{priority} || PRI_OVERRIDE,
$param->{activation_delay} || 0,
$param->{display_delay} || 0,
$param->{display_repeat} || 0,
$param->{ttl} || 0,
($param->{sound_alarm} ? 0xff : 0xfe),
0x00, 0x00, # msg slot
$ztext,
0x17, # body end
);
my $maskbytes = $z->id / 8;
if (int($maskbytes) != $maskbytes) { $maskbytes = int($maskbytes) + 1 }
my $zmask = pack($z->id > 0xff ? 'v' : 'C', 1 << ($z->id - 1));
my $zmlen = length $zmask;
printf("zmask: %s bytes: %d\n", unpack('H*', $zmask), $maskbytes);
my $header = pack("C3 v v C3 C2 Ca[$zmlen] C2",
0x16, 0x16, 0x01, # msg start
length($body) + 1,
++$self->{seqno},
0x00, 0x01, 0x00,
0x03, $maskbytes * 8,
0x00, $zmask,
0x02, 0x00, # header end
);
my $footer = pack('vC',
crc($header.$body),
0x04, # msg end
);
$self->tx(&pkt_escape($header.$body.$footer));
}
=head2 reboot()
Instructs the sign to reboot.
=cut
sub reboot {
my $self = shift;
my $pkt = pack('C3vvC4C*',
0x16, 0x16, 0x01, # msg start
2, # body length
++$self->{seqno}, # packet count
0x00, 0x01, 0x00,
0x01, 0x01, # msg type: reset
0x00, 0xc8, 0x01, 0x00, # sign id
0x00, 0x01, 0x0f, 0x00, # reset msg
0x17, # crc follows
);
$pkt .= pack('vC',
crc($pkt),
0x04, # msg end
);
$self->tx(&pkt_escape($pkt));
}
=head1 Message Formatting
The NetBrite signs have a few formatting switches that can be applied
in-line to messages. This is implemented as a kind of markup.
=over
=item C<{scrolloff}>, C<{scrollon}>
Turns scrolling on or off. This works in the middle of a message, but
seems to have a tendency to mess things up.
=item C<{blinkon}>, C<{blinkoff}>
Turns blinking on or off.
=item C<{red}>, C<{green}>, C<{yellow}>
Changes the text color.
=item C<{left}>, C<{center}>, C<{right}>
Changes the text's alignment within its zone.
=item C<{pause}>
Briefly pauses the display, probably for the amount of time specified
in the zone definition.
=item C<{erase}>
Erases.
=item C<{serial}>
Inserts the sign's serial number, which seems to always be its
Ethernet MAC address.
=item C<{bell}>
Beeps.
=item C<{note [pitch] [duration]}>
Beeps at C<pitch> for C<duration>. The pitch is a positive integer, possibly
0-254 and the duration is in milliseconds.
=item C<{tune [1-9] ["repeat"]}>
Plays one of nine predefined tunes. #9 is Charge!
If I<repeat> is specified, the tune will play every time the message is
displayed. This is extremely annoying.
=item C<{font [font_name]}>
Switches to C<font_name>. See L</Available Fonts>.
Note that the sign won't change to a font that's taller than its
containing zone.
=back
=head1 Available Fonts
The following fonts are available:
=over
=item monospace_7
=item monospace_16
=item monospace_24
=item proportional_7
=item proportional_5
=item proportional_9
=item proportional_11
=item bold_proportional_7
=item bold_proportional_11
=item script_16
=item picture_24
=back
=cut
sub parse_msg {
my $msg = shift;
$msg =~ s!\{scrolloff\}!pack('C*', 0x10, 0x14)!ieg;
$msg =~ s!\{scrollon\}!pack('C*', 0x10, 0x15)!ieg;
$msg =~ s!\{blinkoff\}!pack('C*', 0x10, 0x01)!ieg;
$msg =~ s!\{blinkon\}!pack('C*', 0x10, 0x00)!ieg;
$msg =~ s!\{red\}!pack('C*', 0x10, 0x0c, COLOR_RED)!ieg;
$msg =~ s!\{green\}!pack('C*', 0x10, 0x0c, COLOR_GREEN)!ieg;
$msg =~ s!\{yellow\}!pack('C*', 0x10, 0x0c, COLOR_YELLOW)!ieg;
$msg =~ s!\{left\}!pack('C*', 0x10, 0x27)!ieg;
$msg =~ s!\{center\}!pack('C*', 0x10, 0x29)!ieg;
$msg =~ s!\{right\}!pack('C*', 0x10, 0x28)!ieg;
$msg =~ s!\{pause\}!pack('C*', 0x10, 0x05)!ieg;
$msg =~ s!\{erase\}!pack('C*', 0x10, 0x03)!ieg;
$msg =~ s!\{serial\}!pack('C*', 0x10, 0x09)!ieg;
$msg =~ s!\{bell\}!pack('C*', 0x10, 0x05)!ieg;
$msg =~ s!\{note\s+(\d+)\s+(\d+)\}!pack('C2Cv', 0x10, 0x11, $1, $2)!ieg;
$msg =~ s!\{tune\s+([1-9])(\s+repeat)?\}!pack('C2C', 0x10, ($2 ? 0x0a : 0x0b), $1)!ieg;
$msg =~ s!\{font\s+(\S+)\}!pack('C2C', 0x10, 0x0d, $fonts{$1})!ieg;
return $msg;
}
=head1 BUGS
There is no error checking of any kind.
Handling of sequence numbers should probably be better.
We don't bother to set the time on the sign, or do any of the
time/date formatting stuff. Sorry, I don't use it; send patches.
No support for message slots.
Socket handling stuff is embarrassing.
=head1 AUTHOR
Ben Wilber <ben@desync.com>
Most of the credit goes to the author of
L<NetPenguin Server|http://www.thepenguinmaster.com/>, who did the
hard work of figuring out the protocol. Consider supporting that
project if you find this useful.
=head1 LICENSE
This library is free software and may be distributed under the same
terms as Perl itself.
=cut
1;

View file

@ -1,119 +0,0 @@
package Net::Symon::NetBrite::Zone;
require Carp;
=head1 NAME
Net::Symon::NetBrite::Zone - Define a NetBrite zone
=head1 SYNOPSIS
my $zone = new Net::Symon::NetBrite::Zone(
rect => [0, 0, 200, 24],
);
=head1 CREATING A ZONE
To create a zone, call C<new()>. The only required parameter is C<rect>; the
rest are optional.
=over
=item rect
Accepts an arrayref that defines the position and size of the zone in LEDs.
The first two parameters are the upper left bound of the rectangle. The last
two are the lower right bound. For example,
rect => [10, 10, 20, 20]
would create a 10x10 area 10 LEDs from the top left corner.
B<ACHTUNG:> Don't create zones that overlap, are bigger than your sign,
zero/negative size or other stupidity. This can crash or hang your sign.
=item scroll_speed
The speed of scrolling text in the zone. The default is C<SCROLL_MED>, but can
also be C<SCROLL_SLOW> or C<SCROLL_FAST>.
=item pause_duration
The duration in milliseconds of any pause sequences in the message text. I
think.
=item volume
The volume of beeps, notes, alarms and other noises. Valid range is 0 (off) to
8 (deadly). Default is 4.
=item default_font
The default font. See L<Net::Symon::NetBrite/available_fonts>.
=item default_color
The default color. Can be C<COLOR_RED>, C<COLOR_GREEN> or C<COLOR_YELLOW>.
The default is red.
=item initial_text
The text initially displayed in the zone. This is just "." by default.
=back
=cut
sub new {
my ($class, %data) = @_;
my $self = {};
if (defined $data{rect}) {
$self->{rect} = $data{rect};
} else {
croak("Must supply rect");
}
$self->{scroll_speed} = $data{scroll_speed} || Net::Symon::NetBrite::SCROLL_MED;
$self->{pause_duration} = $data{pause_duration} || 1000;
$self->{volume} = $data{volume} || 4;
$self->{default_font} = $data{default_font} || 'proportional_5';
$self->{default_color} = $data{default_color} || Net::Symon::NetBrite::COLOR_RED;
$self->{initial_text} = $data{initial_text} || '.';
bless($self, $class);
return $self;
}
sub id {
my ($self, $newid) = @_;
if ($newid) {
$self->{id} = $newid;
}
return $self->{id};
}
sub get {
my ($self, $k) = @_;
unless (defined $self->{$k}) { warn "$k undefined" }
return $self->{$k};
}
=head1 AUTHOR
Ben Wilber <ben@desync.com>
=head1 LICENSE
This library is free software and may be distributed under the same
terms as Perl itself.
=cut
1;

423
netbrite.py Normal file
View file

@ -0,0 +1,423 @@
from typing import Callable
from crc import Calculator, Crc16
from enum import Enum
from socket import SocketType
from struct import pack
import socket
import re
DEFAULT_PORT = 700
class Colors(Enum):
RED = 0x01
GREEN = 0x02
YELLOW = 0x03
class ScrollSpeeds(Enum):
SLOW = 0x00
NORMAL = 0x01
class Priorities(Enum):
OVERRIDE = 0x01
INTERRUPT = 0x02
FOLLOW = 0x03
YIELD = 0x04
ROUNDROBIN = 0x0A
class Fonts(Enum):
NORMAL_7 = 0x01
NORMAL_5 = 0x02
BOLD_7 = 0x05
MONOSPACE = 0x07
calculator = Calculator(Crc16.KERMIT.value)
def checksum(data: bytes) -> int:
return int(calculator.checksum(data))
def pkt_escape(pkt: bytes) -> bytes:
esc = pack("B", 0x10)
buf = bytearray()
for i, byte in enumerate(pkt):
if i > 4 and i < len(pkt) - 4 and byte in [0x10, 0x01, 0x04, 0x17]:
buf.extend(esc)
buf.append(byte)
return bytes(buf)
class Message:
activation_delay: int = 0 # Message activation delay
display_delay: int = 0 # Message display delay
display_repeat: int = 0 # Not sure
priority: Priorities
sound_alarm: bool = False # Beep when message is displayed
text: str
ttl: int = 0 # Self destruct timeout
def __init__(
self,
text: str,
activation_delay: int = 0, # Message activation delay
display_delay: int = 0, # Message display delay
display_repeat: int = 0, # Not sure
priority: Priorities = Priorities.OVERRIDE,
sound_alarm: bool = False, # Beep when message is displayed
ttl: int = 0, # Self destruct timeout
):
self.activation_delay = activation_delay
self.display_delay = display_delay
self.display_repeat = display_repeat
self.priority = priority
self.sound_alarm = sound_alarm
self.text = text
self.ttl = ttl
def parse_msg(self) -> bytes:
msg_bytes = self.text.strip().encode("ascii")
replacements: list[
tuple[bytes, bytes] | tuple[bytes, Callable[[re.Match[bytes]], bytes]]
] = [
(rb"\{scrolloff\}", b"\x10\x14"),
(rb"\{scrollon\}", b"\x10\x15"),
(rb"\{blinkoff\}", b"\x10\x01"),
(rb"\{blinkon\}", b"\x10\x00"),
(rb"\{left\}", b"\x10\x27"),
(rb"\{center\}", b"\x10\x29"),
(rb"\{right\}", b"\x10\x28"),
(rb"\{pause\}", b"\x10\x05"),
(rb"\{erase\}", b"\x10\x03"),
(rb"\{serial\}", b"\x10\x09"),
(rb"\{bell\}", b"\x10\x05"),
(rb"\{red\}", b"\x10\x05" + pack("B", Colors.RED.value)),
(rb"\{green\}", b"\x10\x05" + pack("B", Colors.GREEN.value)),
(rb"\{yellow\}", b"\x10\x05" + pack("B", Colors.YELLOW.value)),
# (
# rb"\{(red|green|yellow)\}",
# lambda m: b"\x10\x0c" + pack("B", Colors[m[1].decode("ascii")].value),
# ),
# (
# rb"\{note\s+(\d+)\s+(\d+)\}",
# lambda m: b"\x10\x11" + pack("<HH", int(m[1]), int(m[2])),
# ),
# (
# rb"\{tune\s+([1-9])(\s+repeat)?\}",
# lambda m: bytes([0x10, 0x0A if m[2] else 0x0B, int(m[1])]),
# ),
# (
# rb"\{font\s+(\S+)\}",
# lambda m: bytes(
# [0x10, 0x0D, Fonts[m[1].decode("ascii").upper()].value]
# ),
# ),
]
for pat, repl in replacements:
msg_bytes = re.sub(pat, repl, msg_bytes, flags=re.IGNORECASE)
return msg_bytes
DEFAULT_TEXT = Message("zone")
class Zone:
id: int
rect: tuple[int, int, int, int]
scroll_speed: ScrollSpeeds
pause_duration: int
volume: int
default_font: Fonts
default_color: Colors
initial_text: Message
def __init__(
self,
x: int,
y: int,
width: int,
height: int,
scroll_speed: ScrollSpeeds = ScrollSpeeds.NORMAL,
pause_duration: int = 1000,
volume: int = 4,
default_font: Fonts = Fonts.NORMAL_7,
default_color: Colors = Colors.RED,
initial_text: Message = DEFAULT_TEXT,
):
self.id = 0
self.rect = (x, y, x + width - 1, y + height - 1)
self.scroll_speed = scroll_speed
self.pause_duration = pause_duration
self.volume = volume
self.default_font = default_font
self.default_color = default_color
self.initial_text = initial_text
class NetBrite:
address: str
port: int
seqno: int = 1
sessno: int = 1
sock: SocketType
zones_list: dict[str, Zone]
def __init__(self, address: str, port: int = DEFAULT_PORT):
self.address = address
self.port = port
self.zones_list = {}
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect()
except OSError as e:
raise ConnectionError(f"Error while opening network socket. {e}")
def connect(self):
self.sock.connect((self.address, self.port))
def tx(self, pkt: bytes):
_ = self.sock.send(pkt)
def message(self, msg: Message, zoneName: str):
z = self.zones_list.get(zoneName)
if z == None:
print("Zone doesn't exist, skipping message.")
return
ztext = msg.parse_msg()
zlen = len(ztext)
if zlen == 4 or zlen == 5:
ztext = pack("2B", 0x10, 0x15) + ztext
zlen += 2
body = pack(
f"<L B H H H H B 2B {zlen}s B",
zlen,
msg.priority.value,
msg.activation_delay,
msg.display_delay,
msg.display_repeat,
msg.ttl,
0xFF if msg.sound_alarm else 0xFE,
0x00,
0x00,
ztext,
0x17,
)
oMaskbytes = z.id / 8
maskbytes = int(oMaskbytes)
if maskbytes != oMaskbytes:
maskbytes += 1
zmask = pack("<H" if z.id > 0xFF else "B", 1 << (z.id - 1))
zmlen = len(zmask)
header = pack(
f"<3B H H 3B 2B B{zmlen}s 2B",
0x16,
0x16,
0x01, # msg start
len(body) + 1,
self.seqno,
0x00,
0x01,
0x00,
0x03,
maskbytes * 8,
0x00,
zmask,
0x02,
0x00, # header end
)
footer = pack("<HB", checksum(header + body), 0x04)
self.tx(pkt_escape(header + body + footer))
def zones(self, zones: dict[str, Zone] | None = None):
if zones != None:
self.zones_list = zones
for zid, zname in enumerate(sorted(self.zones_list.keys())):
zid += 1
self.zones_list[zname].id = zid
z = self.zones_list[zname]
ztext = z.initial_text.parse_msg() # FIXME: parse_msg once implemented
zlen = len(ztext)
print(z.scroll_speed.value)
body = pack(
f"<4B B4B 3B BH 8B B 4B 4B H5B 10B 3B 20BH3B11B{zlen}s B",
0x0F, # Body start
0x00,
0x0E,
0x02,
zid, # zone def
*z.rect,
0x0D, # Scroll speed
z.scroll_speed.value,
0x00,
0x0C, # Pause duration
z.pause_duration,
0x0B,
0xFE,
0x0A,
0xE8,
0x03,
0x09,
0x0E,
0x08, # msg def params
z.volume, # Volume,
0x07, # Font
z.default_font.value,
0x06, # Color
z.default_color.value,
0x05, # Font footer
0x00,
0x00,
0x04,
2012, # yyyy, mo, d, h, min, sec? #TODO: try out new dates and see if it changes shit
2,
10,
19,
21,
33,
0x00,
0x03,
0x00,
0x00,
0x2F,
0x02,
0xFF,
0x10,
0x3A,
0x01, # Def message hdr
zid, # ID
0x00,
0x03,
0x02, # Magic
0x00,
0x00,
0x00,
0x00,
0x00,
0x03,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
0x02,
0x00,
zlen,
0x00,
0x00,
0x04,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0xFE,
0x7E,
0x00,
ztext,
0x17,
)
header = pack(
"<3B H H 3B 2B 4B 2B 2B",
0x16, # Msg start
0x16,
0x01,
len(body),
self.seqno,
0x00,
0x01,
0x00,
0x01, # Type = init
0x01,
0x00, # Sign id
0xC8,
0x01,
0x00,
0x00, # Session packet count
self.sessno,
0x04, # Header end
0x00,
)
crc = checksum(header + body)
footer = pack("<HB", crc, 0x04)
self.tx(pkt_escape(header + body + footer))
print(f"Sent zone {zname}")
self.seqno += 1
self.sessno += 1
_ = None
netbrt = NetBrite("10.65.37.244")
zones = [
# Zone(
# 1,
# 0,
# 59,
# 7,
# initial_text=Message("{erase}{scrolloff}{left}Welcome to:"),
# default_color=Colors.RED,
# ),
# Zone(
# 73,
# 1,
# 35,
# 7,
# initial_text=Message("{erase}{scrolloff}ING R&D"),
# default_color=Colors.YELLOW,
# ),
# Zone(
# 110,
# 1,
# 10,
# 7,
# initial_text=Message("{erase}{scrolloff}HQ"),
# default_color=Colors.GREEN,
# ),
Zone(0, 0, 150, 7, initial_text=Message("{scrolloff}."))
]
netbrt.zones({str(k): v for k, v in enumerate(zones)})
from time import sleep, time
from datetime import datetime
while True:
now = datetime.now()
netbrt.message(
Message(f"{"{scrolloff}"}{now.hour}:{now.minute}:{now.second}"),
"0",
)
sleep(1)

View file

@ -1,103 +0,0 @@
#!/usr/bin/perl
use strict;
use warnings;
use lib 'lib';
use Net::Symon::NetBrite qw(:constants);
use Net::Symon::NetBrite::Zone;
use Getopt::Long;
use Net::Symon::NetBrite;
my $zone_array_ref;
my $message_array_ref;
my $address;
my $priority;
my $activation_delay;
my $display_delay;
my $display_repeat;
my $ttl;
my $sound_alarm;
my $reset; # New flag for resetting the sign
GetOptions(
"zone=s@" => \$zone_array_ref,
"message=s@" => \$message_array_ref,
"address=s" => \$address,
"priority=s" => \$priority,
"activation_delay=i"=> \$activation_delay,
"display_delay=i" => \$display_delay,
"display_repeat=i" => \$display_repeat,
"ttl=i" => \$ttl,
"sound_alarm" => \$sound_alarm,
"reset" => \$reset # Add the reset flag
);
die "Error: --address parameter is required.\n" unless defined $address;
die "Error: At least one --zone parameter is required.\n" unless defined $zone_array_ref && @{$zone_array_ref};
die "Error: --message parameter must be provided for each zone.\n" unless defined $message_array_ref && @{$message_array_ref} == @{$zone_array_ref};
my %zones;
foreach my $zone_str (@{$zone_array_ref}) {
my ($name, $definition) = split /=/, $zone_str, 2;
die "Error: Invalid zone format. Use name=definition.\n" unless defined $name && defined $definition;
my %props = parse_zone_string($definition);
if (exists $props{'x'} && exists $props{'y'} && exists $props{'width'} && exists $props{'height'}) {
my $x1 = $props{'x'};
my $y1 = $props{'y'};
my $x2 = $x1 + $props{'width'} - 1;
my $y2 = $y1 + $props{'height'} - 1;
$props{'rect'} = [$x1, $y1, $x2, $y2];
delete $props{'x'};
delete $props{'y'};
delete $props{'width'};
delete $props{'height'};
}
$zones{$name} = Net::Symon::NetBrite::Zone->new(%props);
}
my %messages;
foreach my $message_str (@{$message_array_ref}) {
my ($name, $text) = split /=/, $message_str, 2;
die "Error: Invalid message format. Use name=message text.\n" unless defined $name && defined $text;
die "Error: Zone '$name' not defined.\n" unless exists $zones{$name};
$messages{$name} = $text; # Use the message as-is, including any formatting tags
}
my $sign = Net::Symon::NetBrite->new(address => $address);
if ($reset) { # If --reset is provided, reboot the sign
$sign->reboot();
}
$sign->zones(%zones);
my %message_params;
$message_params{activation_delay} = $activation_delay if defined $activation_delay;
$message_params{display_delay} = $display_delay if defined $display_delay;
$message_params{display_repeat} = $display_repeat if defined $display_repeat;
$message_params{ttl} = $ttl if defined $ttl;
$message_params{sound_alarm} = 1 if defined $sound_alarm;
if ($priority) {
$message_params{priority} = PRI_OVERRIDE if $priority eq 'override';
$message_params{priority} = PRI_INTERRUPT if $priority eq 'interrupt';
$message_params{priority} = PRI_FOLLOW if $priority eq 'follow';
$message_params{priority} = PRI_YIELD if $priority eq 'yield';
$message_params{priority} = PRI_ROUNDROBIN if $priority eq 'roundrobin';
}
foreach my $name (keys %messages) {
$sign->message($name, $messages{$name}, \%message_params); # Pass raw message
}
sub parse_zone_string {
my ($str) = @_;
my %hash;
for my $pair (split /,/, $str) {
my ($key, $value) = split /=/, $pair;
$hash{$key} = $value;
}
return %hash;
}