Skip to content
This repository was archived by the owner on Nov 23, 2025. It is now read-only.

Commit dc93048

Browse files
committed
* Updated interface execute_instruction prototype to take in a pre-built
packet rather than all of these needing to build the packet internally. * Shifted NPC based functionality into an explict class that bundles this. ``abstractions.NPCPacket``
1 parent 75a481b commit dc93048

7 files changed

Lines changed: 133 additions & 87 deletions

File tree

CHANGES.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ Version 0.5.0
2424
* Removed get gpio config as this functionality is no longer applicable in
2525
the new UOS design.
2626
* Updated the get_gpio_input interface to allow for enabling pull-up.
27+
* Updated interface execute_instruction prototype to take in a pre-built
28+
packet rather than all of these needing to build the packet internally.
29+
* Shifted NPC based functionality into an explict class that bundles this.
30+
``abstractions.NPCPacket``
2731

2832
Version 0.4.0
2933
-------------

tests/interface/test_package.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55

66
from uoshardware import UOSCommunicationError
7+
from uoshardware.abstractions import NPCPacket
78
from uoshardware.interface.serial import Serial
89

910
# Allow access to protected members in test module.
@@ -36,7 +37,7 @@ def test_basic_functions(npc_serial_port: Serial):
3637
assert npc_serial_port.open() is None
3738
assert npc_serial_port._device is not None
3839
sleep(2) # Allow the system time to boot
39-
assert npc_serial_port.execute_instruction(64, (13, 0, 1)).status
40+
assert npc_serial_port.execute_instruction(NPCPacket(61, 0, (13, 1))).status
4041
response = npc_serial_port.read_response(expect_packets=1, timeout_s=2)
4142
assert response.status
4243
assert npc_serial_port.hard_reset()
@@ -55,6 +56,6 @@ def test_basic_fault_cases(invalid_serial_port: Serial):
5556
invalid_serial_port.open()
5657
assert invalid_serial_port.close() is None
5758
with pytest.raises(UOSCommunicationError):
58-
invalid_serial_port.execute_instruction(64, (13, 0, 1))
59+
invalid_serial_port.execute_instruction(NPCPacket(64, 0, (13, 0, 1)))
5960
with pytest.raises(UOSCommunicationError):
6061
invalid_serial_port.read_response(expect_packets=1, timeout_s=1)

tests/test_abstractions.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
"""Unit tests for the abstractions module."""
1+
"""Unit tests for the `abstractions` module."""
22
import pytest
33

44
from tests import Packet
55
from uoshardware import UOSUnsupportedError
6-
from uoshardware.abstractions import UOSInterface
6+
from uoshardware.abstractions import NPCPacket, UOSFunction, UOSFunctions, UOSInterface
77

88
TEST_PACKETS = [
99
Packet(
@@ -34,7 +34,7 @@ def test_execute_instruction():
3434
"""Using the base class directly should throw an error."""
3535
with pytest.raises(UOSUnsupportedError):
3636
# noinspection PyTypeChecker
37-
UOSInterface.execute_instruction(self=None, address=10, payload=())
37+
UOSInterface.execute_instruction(self=None, packet=NPCPacket(0, 10, tuple([])))
3838

3939

4040
def test_read_response():
@@ -107,7 +107,7 @@ def test_close():
107107
def test_get_npc_checksum(test_packet_data: tuple, expected_lrc: int):
108108
"""Checks the computation of LRC checksums for some known packets."""
109109
print(f"\n -> packet: {test_packet_data}, lrc:{expected_lrc}")
110-
assert UOSInterface.get_npc_checksum(test_packet_data) == expected_lrc
110+
assert NPCPacket.get_npc_checksum(test_packet_data) == expected_lrc
111111

112112

113113
@pytest.mark.parametrize(
@@ -120,10 +120,23 @@ def test_get_npc_packet(test_packet: Packet):
120120
f"payload: {test_packet.payload}, packet: {test_packet.binary!r}"
121121
)
122122
assert (
123-
UOSInterface.get_npc_packet(
123+
NPCPacket(
124124
test_packet.address_to,
125125
test_packet.address_from,
126126
tuple(test_packet.payload),
127-
)
127+
).packet
128128
== test_packet.binary
129129
)
130+
131+
132+
@pytest.mark.parametrize(
133+
"address,function",
134+
[
135+
(60, UOSFunctions.set_gpio_output),
136+
(79, UOSFunctions.reset_all_io),
137+
(257, None),
138+
],
139+
)
140+
def test_get_uos_function_from_address(address: int, function: UOSFunction):
141+
"""Checks the function for looking up a function from its UOS addr."""
142+
assert UOSFunctions.get_from_address(address) == function

uoshardware/abstractions.py

Lines changed: 76 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
"""Module defining the base class and static func for interfaces."""
22
from abc import ABCMeta, abstractmethod
33
from dataclasses import dataclass, field
4-
from functools import lru_cache
54

65
from uoshardware import Persistence, UOSUnsupportedError
76

@@ -69,6 +68,14 @@ def enumerate_functions() -> list:
6968
if isinstance(getattr(UOSFunctions, member_name), UOSFunction)
7069
]
7170

71+
@staticmethod
72+
def get_from_address(address: int) -> UOSFunction | None:
73+
"""Look up function from the address."""
74+
for function in UOSFunctions.enumerate_functions():
75+
if address in function.address_lut.values():
76+
return function # function located.
77+
return None # function not found.
78+
7279

7380
@dataclass
7481
class ComResult:
@@ -78,7 +85,6 @@ class ComResult:
7885
exception: str = ""
7986
ack_packet: list = field(default_factory=list)
8087
rx_packets: list = field(default_factory=list)
81-
aux_data: dict = field(default_factory=dict)
8288

8389

8490
@dataclass
@@ -91,18 +97,81 @@ class InstructionArguments:
9197
volatility: Persistence = Persistence.NONE
9298

9399

100+
@dataclass(init=False)
101+
class NPCPacket:
102+
"""Class contains functions and data for the packet based communication."""
103+
104+
to_address: int
105+
from_address: int
106+
payload: tuple[int, ...]
107+
packet: bytes
108+
109+
def __init__(self, to_address: int, from_address: int, payload: tuple[int, ...]):
110+
"""Construct a new packet object."""
111+
self.to_address = to_address
112+
self.from_address = from_address
113+
self.payload = payload
114+
self.packet = self.compute_packet()
115+
116+
def compute_packet(self) -> bytes:
117+
"""Generate a standardised NPC binary packet."""
118+
if (
119+
self.to_address < 256
120+
and self.from_address < 256
121+
and len(self.payload) < 256
122+
): # check input is possible to parse
123+
packet_data = tuple(
124+
[self.to_address, self.from_address, len(self.payload)]
125+
+ list(self.payload)
126+
)
127+
lrc = NPCPacket.get_npc_checksum(packet_data)
128+
return bytes(
129+
[0x3E, packet_data[0], packet_data[1], len(self.payload)]
130+
+ list(self.payload)
131+
+ [lrc, 0x3C]
132+
)
133+
return bytes([])
134+
135+
@staticmethod
136+
def get_npc_checksum(packet_data: tuple[int, ...]) -> int:
137+
"""Generate a NPC LRC checksum.
138+
139+
:param packet_data: List of the uint8 values from an NPC packet.
140+
:return: NPC checksum as an 8-bit integer.
141+
"""
142+
lrc = 0
143+
for byte in packet_data:
144+
lrc = (lrc + byte) & 0xFF
145+
return ((lrc ^ 0xFF) + 1) & 0xFF
146+
147+
def expects_ack(self) -> bool:
148+
"""Check if this packet is expected to be acknowledged."""
149+
if function := UOSFunctions.get_from_address(self.to_address):
150+
return function.ack
151+
raise UOSUnsupportedError(
152+
"When checking `gets_ack`, "
153+
f"function for address {self.to_address} could not be located.",
154+
)
155+
156+
def expects_rx_packets(self) -> list[int]:
157+
"""Check if this packet expects rx packets from the function def."""
158+
if function := UOSFunctions.get_from_address(self.to_address):
159+
return function.rx_packets_expected
160+
raise UOSUnsupportedError(
161+
"When checking `expects_rx_packets, "
162+
f"function for address {self.to_address} could not be located."
163+
)
164+
165+
94166
class UOSInterface(metaclass=ABCMeta):
95167
"""Base class for low level UOS interfaces classes to inherit."""
96168

97169
# Dead code suppression used as abstract interfaces are false positives.
98170
@abstractmethod
99-
def execute_instruction(
100-
self, address: int, payload: tuple[int, ...], **kwargs # dead: disable
101-
) -> ComResult:
171+
def execute_instruction(self, packet: NPCPacket) -> ComResult: # dead: disable
102172
"""Abstract method for executing instructions on UOSInterfaces.
103173
104-
:param address: An 8-bit unsigned integer of the UOS subsystem targeted by the instruction.
105-
:param payload: A tuple containing the uint8 parameters of the UOS instruction.
174+
:param packet: A tuple containing the uint8 npc packet for the UOS instruction.
106175
:returns: ComResult object.
107176
:raises: UOSUnsupportedError if the interface hasn't been built correctly.
108177
:raises: UOSCommunicationError if there is a problem completing the action.
@@ -184,40 +253,6 @@ def enumerate_devices() -> list:
184253
f"UOSInterfaces must over-ride {UOSInterface.enumerate_devices.__name__} prototype."
185254
)
186255

187-
@staticmethod
188-
@lru_cache(maxsize=100)
189-
def get_npc_packet(to_addr: int, from_addr: int, payload: tuple[int, ...]) -> bytes:
190-
"""Generate a standardised NPC binary packet.
191-
192-
:param to_addr: An 8-bit unsigned integer of the UOS subsystem targeted by the instruction.
193-
:param from_addr: An 8-bit unsigned integer of the host system, usually 0.
194-
:param payload: A tuple containing the unsigned 8-bit integers of the command.
195-
:return: NPC packet as a bytes object. No bytes returned on fault.
196-
"""
197-
if (
198-
to_addr < 256 and from_addr < 256 and len(payload) < 256
199-
): # check input is possible to parse
200-
packet_data = tuple([to_addr, from_addr, len(payload)] + list(payload))
201-
lrc = UOSInterface.get_npc_checksum(packet_data)
202-
return bytes(
203-
[0x3E, packet_data[0], packet_data[1], len(payload)]
204-
+ list(payload)
205-
+ [lrc, 0x3C]
206-
)
207-
return bytes([])
208-
209-
@staticmethod
210-
def get_npc_checksum(packet_data: tuple[int, ...]) -> int:
211-
"""Generate a NPC LRC checksum.
212-
213-
:param packet_data: List of the uint8 values from an NPC packet.
214-
:return: NPC checksum as a 8-bit integer.
215-
"""
216-
lrc = 0
217-
for byte in packet_data:
218-
lrc = (lrc + byte) & 0xFF
219-
return ((lrc ^ 0xFF) + 1) & 0xFF
220-
221256

222257
@dataclass(frozen=True)
223258
class Pin:

uoshardware/api.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
ComResult,
55
Device,
66
InstructionArguments,
7+
NPCPacket,
78
UOSFunction,
89
UOSFunctions,
910
UOSInterface,
@@ -262,11 +263,13 @@ def __execute_instruction(
262263
self.open()
263264
if function.address_lut[instruction_data.volatility] >= 0:
264265
# a normal instruction
265-
tx_response = self.__device_interface.execute_instruction(
266-
function.address_lut[instruction_data.volatility],
267-
instruction_data.payload,
268-
function=function,
266+
packet = NPCPacket(
267+
to_address=function.address_lut[instruction_data.volatility],
268+
from_address=0,
269+
payload=instruction_data.payload,
269270
)
271+
logger.debug("Function %s assembled packet: %s", function.name, packet)
272+
tx_response = self.__device_interface.execute_instruction(packet)
270273
if tx_response.status:
271274
rx_response = self.__device_interface.read_response(
272275
instruction_data.expected_rx_packets, 2
@@ -279,10 +282,8 @@ def __execute_instruction(
279282
if count == 0
280283
else rx_response.rx_packets[count - 1]
281284
)
282-
computed_checksum = (
283-
self.__device_interface.get_npc_checksum(
284-
current_packet[1:-2]
285-
)
285+
computed_checksum = NPCPacket.get_npc_checksum(
286+
current_packet[1:-2]
286287
)
287288
logger.debug(
288289
"Calculated checksum %s must match rx %s",
@@ -297,9 +298,8 @@ def __execute_instruction(
297298
finally: # Safety check for lazy loading being used outside of context manager
298299
if self.loading == Loading.LAZY: # Lazy loaded
299300
self.close()
300-
if (
301-
not rx_response.status and retry
302-
): # allow one retry per instruction due to DTR resets
301+
if not rx_response.status and retry:
302+
# allow one retry per instruction due to DTR resets
303303
return self.__execute_instruction(function, instruction_data, False)
304304
return rx_response
305305

uoshardware/interface/serial.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from serial.tools import list_ports
88

99
from uoshardware import UOSCommunicationError, logger
10-
from uoshardware.abstractions import ComResult, UOSInterface
10+
from uoshardware.abstractions import ComResult, NPCPacket, UOSInterface
1111

1212
if platform.system() == "Linux":
1313
import termios # pylint: disable=E0401
@@ -96,22 +96,18 @@ def close(self):
9696
logger.debug("Connection closed successfully")
9797
self._device = None
9898

99-
# Kwargs is defined in the abstractmethod definition, false positive.
100-
def execute_instruction(self, address, payload, **kwargs): # dead: disable
99+
def execute_instruction(self, packet: NPCPacket):
101100
"""Build and execute a new instruction packet.
102101
103-
:param address: An 8-bit unsigned integer of the UOS subsystem targeted by the instruction.
104-
:param payload: A tuple containing the uint8 parameters of the UOS instruction.
102+
:param packet: A tuple containing the uint8 npc packet for the UOS instruction.
105103
:return: Tuple containing a status boolean and index 0 and a result-set dict at index 1.
106104
"""
107105
if self._device is None:
108106
raise UOSCommunicationError(
109107
"Connection must be open to execute instructions."
110108
)
111-
packet = self.get_npc_packet(to_addr=address, from_addr=0, payload=payload)
112-
logger.debug("packet formed %s", packet)
113109
try: # Send the packet.
114-
num_bytes = self._device.write(packet)
110+
num_bytes = self._device.write(packet.packet)
115111
self._device.flush()
116112
logger.debug("Sent %s bytes of data", num_bytes)
117113
except serial.SerialException as exception:
@@ -120,7 +116,7 @@ def execute_instruction(self, address, payload, **kwargs): # dead: disable
120116
) from exception
121117
finally:
122118
self._device.reset_output_buffer()
123-
return ComResult(num_bytes == len(packet))
119+
return ComResult(num_bytes == len(packet.packet))
124120

125121
def read_response(self, expect_packets: int, timeout_s: float):
126122
"""Read ACK and response packets from the serial device.

uoshardware/interface/stub.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Package is used as a simulated UOSInterface for test purposes."""
22
from uoshardware import UOSCommunicationError
3-
from uoshardware.abstractions import ComResult, UOSInterface
3+
from uoshardware.abstractions import ComResult, NPCPacket, UOSInterface
44

55

66
class Stub(UOSInterface):
@@ -13,13 +13,7 @@ def __init__(self, connection: str, errored: int = 0):
1313
self.errored = errored
1414
self.connection = connection
1515

16-
def execute_instruction(
17-
self,
18-
address: int,
19-
# Dead code false positive as this is over-riding an interface.
20-
payload: tuple[int, ...], # dead: disable
21-
**kwargs,
22-
) -> ComResult:
16+
def execute_instruction(self, packet: NPCPacket) -> ComResult:
2317
"""Simulate executing an instruction on a UOS endpoint.
2418
2519
Should check whether the last instruction was valid and store
@@ -28,16 +22,19 @@ def execute_instruction(
2822
"""
2923
if not self.__open:
3024
raise UOSCommunicationError("Port must be open to execute instructions.")
31-
function = kwargs["function"]
32-
if function is not None:
33-
if function.ack:
34-
self.__packet_buffer.append(self.get_npc_packet(0, address, tuple([0])))
35-
for rx_packet in function.rx_packets_expected:
36-
self.__packet_buffer.append(
37-
self.get_npc_packet(0, address, tuple(0 for _ in range(rx_packet)))
38-
)
39-
return ComResult(True)
40-
raise UOSCommunicationError("Cannot execute function 'None'.")
25+
if packet.expects_ack():
26+
# Dummy an ack packet.
27+
self.__packet_buffer.append(
28+
NPCPacket(0, packet.to_address, tuple([0])).packet
29+
)
30+
for rx_packet in packet.expects_rx_packets():
31+
# Dummy response packets
32+
self.__packet_buffer.append(
33+
NPCPacket(
34+
0, packet.to_address, tuple(0 for _ in range(rx_packet))
35+
).packet
36+
)
37+
return ComResult(True)
4138

4239
# Dead code detection false positive due to abstract interface.
4340
def read_response(

0 commit comments

Comments
 (0)