11"""Module defining the base class and static func for interfaces."""
22from abc import ABCMeta , abstractmethod
33from dataclasses import dataclass , field
4+ from datetime import datetime
45
5- from uoshardware import Persistence , UOSUnsupportedError
6+ from uoshardware import Persistence , UOSRuntimeError , UOSUnsupportedError , logger
67
78
89@dataclass (frozen = True )
@@ -153,6 +154,15 @@ class ComResult:
153154 rx_packets : list = field (default_factory = list )
154155 tx_packet : NPCPacket | None = None
155156
157+ def get_rx_payload (self , packet_index : int ) -> list [int ]:
158+ """Return just the payload portion of a rx packet."""
159+ if len (self .rx_packets ) <= packet_index :
160+ raise UOSRuntimeError (
161+ f"Can't index payload { packet_index } of "
162+ f"{ len (self .rx_packets )} rx packet(s)."
163+ )
164+ return self .rx_packets [packet_index ][4 :- 2 ]
165+
156166
157167@dataclass
158168class InstructionArguments :
@@ -255,7 +265,44 @@ def enumerate_devices() -> list:
255265 )
256266
257267
258- @dataclass (frozen = True )
268+ @dataclass (init = False )
269+ class Sample :
270+ """A converted response from a reading on a pin."""
271+
272+ raw_value : int
273+ value : float
274+ time : datetime
275+
276+
277+ @dataclass (init = False )
278+ class ADCSample (Sample ):
279+ """ADC specific Sample constructor for ADC readings."""
280+
281+ def __init__ (self , raw_value : list [int ], steps : int , reference : float ):
282+ """Create an ADC Sample."""
283+ self .raw_value = int (bytes (raw_value ).hex (), 16 ) # convert bytes to int
284+ self .value = (self .raw_value / steps ) * reference
285+ self .time = datetime .now ()
286+ logger .debug (
287+ "Constructing ADCSample from `%s` with adc steps `%s` and ref `%s`" ,
288+ self .raw_value ,
289+ steps ,
290+ reference ,
291+ )
292+
293+
294+ @dataclass (init = False )
295+ class DigitalSample (Sample ):
296+ """Digital specific sample constructor for gpio reads."""
297+
298+ def __init__ (self , raw_value : int ):
299+ """Create a Digital GPIO Sample."""
300+ self .raw_value = raw_value
301+ self .value = raw_value
302+ self .time = datetime .now ()
303+
304+
305+ @dataclass
259306class Pin :
260307 """Defines supported features of the pin."""
261308
@@ -271,6 +318,10 @@ class Pin:
271318 pull_down : bool = False
272319 aliases : list = field (default_factory = list )
273320
321+ # Values updated during runtime.
322+ gpio_reading : DigitalSample | None = None
323+ adc_reading : ADCSample | None = None
324+
274325
275326@dataclass (frozen = True )
276327class Device :
@@ -279,7 +330,7 @@ class Device:
279330 name : str
280331 interfaces : list
281332 functions_enabled : dict
282- pins : dict = field (default_factory = dict )
333+ pins : dict [ int , Pin ] = field (default_factory = dict )
283334 aux_params : dict = field (default_factory = dict )
284335
285336 def get_compatible_pins (self , function : UOSFunction ) -> dict :
@@ -302,3 +353,55 @@ def get_compatible_pins(self, function: UOSFunction) -> dict:
302353 getattr (pin , requirement ) for requirement in function .pin_requirements
303354 )
304355 }
356+
357+ def update_adc_samples (self , result : ComResult ):
358+ """Update the pin samples with the response of a get_adc_input."""
359+ if not result .status or len (result .exception ) != 0 :
360+ raise UOSRuntimeError ("Can't update ADC samples from a failed response." )
361+ if result .tx_packet is None or len (result .rx_packets ) < 1 :
362+ raise UOSRuntimeError ("Can't update ADC samples without a valid result." )
363+ if (
364+ "adc_reference" not in self .aux_params
365+ or "adc_resolution" not in self .aux_params
366+ ):
367+ raise UOSRuntimeError ("Device not properly defined for ADC updates." )
368+ sample_values = result .get_rx_payload (0 )
369+ logger .debug ("Device returned sampled adc values %s" , sample_values )
370+ for sample_index , pin in enumerate (result .tx_packet .payload ):
371+ if pin not in self .pins :
372+ raise UOSRuntimeError (
373+ f"Can't update ADC samples on pin { pin } as it's invalid for { self .name } ."
374+ )
375+ self .pins [pin ].adc_reading = ADCSample (
376+ sample_values [sample_index * 2 : sample_index * 2 + 2 ],
377+ steps = pow (2 , self .aux_params ["adc_resolution" ]),
378+ reference = self .aux_params ["adc_reference" ],
379+ )
380+ logger .debug (
381+ "Setting pin %s adc reading to %s" ,
382+ pin ,
383+ # This is a false call as it can't be None here.
384+ self .pins [pin ].adc_reading .value , # type: ignore
385+ )
386+
387+ def update_gpio_samples (self , result : ComResult ):
388+ """Update the pin samples with the response of a get_gpio_inpout."""
389+ if not result .status or len (result .exception ) != 0 :
390+ raise UOSRuntimeError ("Can't update GPIO samples from a failed responsee." )
391+ if result .tx_packet is None or len (result .rx_packets ) < 1 :
392+ raise UOSRuntimeError ("Can't update GPIO samples without a valid result." )
393+ sample_values = result .get_rx_payload (0 )
394+ logger .debug ("Device returned sampled gpio values %s" , sample_values )
395+ for sample_index , pin in enumerate (sample_values ):
396+ pin = result .tx_packet .payload [2 * sample_index ]
397+ if pin not in self .pins :
398+ raise UOSRuntimeError (
399+ f"Can't update GPIO samples on pin { pin } as it's invalid for { self .name } ."
400+ )
401+ self .pins [pin ].gpio_reading = DigitalSample (sample_values [sample_index ])
402+ logger .debug (
403+ "Setting pin %s gpio reading to %s" ,
404+ pin ,
405+ # This is a false call as it can't be None here.
406+ self .pins [pin ].gpio_reading .value , # type: ignore
407+ )
0 commit comments