diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0ab759c..32c4092 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -35,12 +35,3 @@ repos: - "pytest" - "pytest-asyncio" - "pytest-mock" - - - repo: local - hooks: - - id: pytest - name: pytest - entry: pytest - language: system - types: [python] - pass_filenames: false diff --git a/README.md b/README.md index 3acf72e..0c4c93a 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,6 @@ [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/) -**Wersja: 0.1.0** - **Bridge** to biblioteka (SDK) i aplikacja wiersza poleceń w Pythonie, która tworzy ujednolicony interfejs do zbierania danych z różnych urządzeń EEG. Działa jako "most" między sprzętem a oprogramowaniem analitycznym. ## Główne Cechy diff --git a/bridge/eeg/brainaccess/device.py b/bridge/eeg/brainaccess/device.py index 545bf35..0655b17 100644 --- a/bridge/eeg/brainaccess/device.py +++ b/bridge/eeg/brainaccess/device.py @@ -1,7 +1,11 @@ import multiprocessing import time from logging import Logger, getLogger +from queue import Empty, Queue +from typing import Generator +import brainaccess.core.eeg_channel as eeg_channel +import numpy as np from brainaccess import core from brainaccess.core.eeg_manager import EEGManager from brainaccess.utils import acquisition @@ -14,6 +18,7 @@ DATA_COLLECTION_TIME, DEFAULT_BLUETOOTH_ADAPTER, DEFAULT_DEVICE_PORT, + GAIN_MODE, IMPEDANCE_MEASUREMENT_TIME, ) @@ -27,6 +32,8 @@ def __init__(self, logger: Logger | None = None) -> None: self._cap: dict[int, str] | None = None self._mac_address: str | None = None self._device_name: str | None = None + self._stream_queue: Queue[EEGArray] = Queue() + self._is_streaming: bool = False super().__init__(logger or getLogger(__name__)) @@ -50,6 +57,15 @@ def _connect(self, device_name: str, cap: dict[int, str]) -> None: self._logger.info("Connection successful.") + def _acq_callback(self, chunk: list[float], chunk_size: int) -> None: + """Wewnętrzny callback wywoływany przez BrainAccess SDK.""" + if self._is_streaming: + chunk_array = np.array(chunk) + num_channels = len(self._electrodes) + eeg_chunk = chunk_array[:num_channels, :].astype(np.float64) + + self._stream_queue.put(eeg_chunk) + # IM-032 def _get_device_model(self, port: int) -> str: status = self._manager.connect(port) # type: ignore[union-attr] @@ -98,17 +114,24 @@ def connect( model = self._get_device_model(port) self._cap = get_cap_from_model(model) + self._electrodes = list(self._cap.values()) + try: self._connect(self._device_name, self._cap) return except Exception as e: - self._logger.exception(e) + if self._manager: + self._manager.__exit__(None, None, None) + self._logger.exception(f"Connection failed: {e}") + raise # IM-032 def disconnect(self) -> None: self._ensure_connected() + self._is_streaming = False self._logger.debug("Disconnecting the device...") if self._manager: + self._manager.stop_stream() self._manager.disconnect() self._manager.__exit__(None, None, None) # self._manager.destroy() @@ -117,6 +140,45 @@ def disconnect(self) -> None: self._logger.info("Device disconnected successfully.") + def stream(self) -> Generator[EEGArray, None, None]: + """ + Generator strumieniujący dane EEG w czasie rzeczywistym. + Użycie: + for chunk in device.stream(): + process(chunk) + """ + self._ensure_connected() + assert self._manager is not None + + while not self._stream_queue.empty(): + self._stream_queue.get() + + num_channels = len(self._electrodes) + for i in range(num_channels): + self._manager.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True) + self._manager.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GAIN_MODE) + self._manager.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True) + + self._manager.set_channel_enabled(eeg_channel.STREAMING, True) + self._manager.set_callback_chunk(self._acq_callback) + + self._is_streaming = True + self._manager.start_stream() + self._logger.info("Started real-time stream.") + + try: + while self._is_streaming: + try: + chunk = self._stream_queue.get(timeout=1.0) + yield chunk + except Empty: + continue + finally: + self._is_streaming = False + if self._manager: + self._manager.stop_stream() + self._logger.info("Stopped real-time stream.") + # IM-032 def get_impedance(self, duration: float = IMPEDANCE_MEASUREMENT_TIME) -> list[float]: self._ensure_connected() @@ -153,16 +215,12 @@ def get_output(self, duration: float = DATA_COLLECTION_TIME, output_file: str | self._logger.info("Data acquisition completed.") return raw_data # type: ignore[no-any-return] - def get_device_data(self) -> DeviceData | None: + def get_device_data(self) -> DeviceData: self._ensure_connected() - try: - return DeviceData( - name=self._device_name, - mac_address=self._mac_address, - manufacturer=BRAINACCESS_MANUFACTURER, - electrodes_num=len(self._cap) if self._cap else None, - sample_rate=self._manager.get_sample_frequency() if self._manager else None, - ) - except Exception as e: - self._logger.exception(f"Failed to fetch device data for device {self.__class__.__name__}: {e}") - return None + return DeviceData( + name=self._device_name, + mac_address=self._mac_address, + manufacturer=BRAINACCESS_MANUFACTURER, + electrodes_num=len(self._cap) if self._cap else None, + sample_rate=self._manager.get_sample_frequency() if self._manager else None, + ) diff --git a/bridge/eeg/core/device.py b/bridge/eeg/core/device.py index e55394f..32eddb1 100644 --- a/bridge/eeg/core/device.py +++ b/bridge/eeg/core/device.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from logging import Logger, getLogger from types import TracebackType +from typing import Generator from .device_data import DeviceData from .typing import EEGArray @@ -19,15 +20,17 @@ def connect(self) -> None: def disconnect(self) -> None: pass - @abstractmethod def get_output(self, duration: float, output_file: str | None = None) -> EEGArray: - pass + raise NotImplementedError(f"Output retrieval not implemented for this class {self.__class__.__name__}.") def get_impedance(self, duration: float) -> list[float]: raise NotImplementedError(f"Impedance measurement not implemented for this class {self.__class__.__name__}.") + def stream(self) -> Generator[EEGArray, None, None]: + raise NotImplementedError(f"Streaming not implemented for this class {self.__class__.__name__}.") + @abstractmethod - def get_device_data(self) -> DeviceData | None: + def get_device_data(self) -> DeviceData: pass def __enter__(self) -> "EEGDevice": @@ -43,4 +46,3 @@ def __exit__( ) -> None: self._logger.debug("Exiting context manager...") self.disconnect() - return None diff --git a/bridge/eeg/core/device_data.py b/bridge/eeg/core/device_data.py index eb889b9..10a982f 100644 --- a/bridge/eeg/core/device_data.py +++ b/bridge/eeg/core/device_data.py @@ -1,10 +1,18 @@ from dataclasses import dataclass +from bridge.eeg.core.typing import EEGArray -@dataclass + +@dataclass(frozen=True, slots=True, kw_only=True) class DeviceData: mac_address: str | None = None name: str | None = None manufacturer: str | None = None electrodes_num: int | None = None sample_rate: int | None = None + + +@dataclass(frozen=True, slots=True, kw_only=True) +class RecordingFrame: + timestamp: float + data: EEGArray diff --git a/bridge/eeg/core/typing.py b/bridge/eeg/core/typing.py index 801aa89..fa086bd 100644 --- a/bridge/eeg/core/typing.py +++ b/bridge/eeg/core/typing.py @@ -1,4 +1,6 @@ +from typing import TypeAlias + import numpy as np from numpy.typing import NDArray -EEGArray = NDArray[np.float64] +EEGArray: TypeAlias = NDArray[np.float64] diff --git a/bridge/eeg/file/__init__.py b/bridge/eeg/file/__init__.py new file mode 100644 index 0000000..3c39b8c --- /dev/null +++ b/bridge/eeg/file/__init__.py @@ -0,0 +1,3 @@ +from .device import FileDevice + +__all__ = ["FileDevice"] diff --git a/bridge/eeg/file/device.py b/bridge/eeg/file/device.py new file mode 100644 index 0000000..60b5e84 --- /dev/null +++ b/bridge/eeg/file/device.py @@ -0,0 +1,60 @@ +import time +from logging import Logger, getLogger +from pathlib import Path +from typing import Final, Generator + +import numpy as np + +from ..core import DeviceData, EEGArray, EEGDevice + + +class FileDevice(EEGDevice): + """Emulator odtwarzający sesje z plików binarnych .npz.""" + + def __init__(self, file_path: str, sfreq: float = 250.0, logger: Logger | None = None) -> None: + super().__init__(logger or getLogger(__name__)) + self._path: Final[Path] = Path(file_path) + self._sfreq: Final[float] = sfreq + self._data: np.ndarray | None = None + self._is_connected: bool = False + + def connect(self) -> None: + if not self._path.exists(): + raise FileNotFoundError(f"Binary file not found: {self._path}") + + with np.load(self._path) as loader: + self._data = loader["data"] + + self._is_connected = True + if not self._data: + raise ValueError(f"No data found in file: {self._path}") + + self._logger.info("FileDevice connected. Loaded %d blocks.", len(self._data)) + + def disconnect(self) -> None: + self._is_connected = False + + def stream(self) -> Generator[EEGArray, None, None]: + if not self._is_connected or self._data is None: + raise RuntimeError("FileDevice not connected.") + + chunk_size: Final[int] = self._data.shape[2] + interval: Final[float] = chunk_size / self._sfreq + + start_perf: Final[float] = time.perf_counter() + + for count, chunk in enumerate(self._data, start=1): + if not self._is_connected: + break + + target: float = start_perf + (count * interval) + + while time.perf_counter() < target: + diff = target - time.perf_counter() + if diff > 0.002: + time.sleep(diff - 0.001) + + yield chunk + + def get_device_data(self) -> DeviceData: + return DeviceData(name=self._path.name, manufacturer="BinarySim", sample_rate=int(self._sfreq)) diff --git a/bridge/eeg/recorder.py b/bridge/eeg/recorder.py new file mode 100644 index 0000000..7c80e1c --- /dev/null +++ b/bridge/eeg/recorder.py @@ -0,0 +1,57 @@ +import time +from logging import Logger, getLogger +from pathlib import Path +from typing import Any, Final, Generator + +import numpy as np + +from .core import EEGArray, EEGDevice +from .core.device_data import RecordingFrame + + +class EEGRecorder: + """Rejestrator EEG wykorzystujący wysokowydajny format binarny NumPy.""" + + def __init__(self, device: EEGDevice, filename: str, logger: Logger | None = None, autosave: bool = True) -> None: + self._logger: Final[Logger] = logger or getLogger(__name__) + self._device: Final[EEGDevice] = device + self._filename: Final[str] = filename + self._autosave: Final[bool] = autosave + self._frames: list[RecordingFrame] = [] + + def __enter__(self) -> "EEGRecorder": + self._device.connect() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if self._autosave: + self.save() + self._device.disconnect() + + def stream(self) -> Generator[EEGArray, None, None]: + """Strumieniuje dane i buforuje je w pamięci jako RecordingFrame.""" + for chunk in self._device.stream(): + self._frames.append(RecordingFrame(timestamp=time.time(), data=chunk)) + yield chunk + + def save(self) -> None: + """Zapisuje dane do skompresowanego pliku binarnego NumPy (.npz).""" + if not self._frames: + self._logger.warning("No data to save.") + return + + try: + output_dir: Final[Path] = Path("recordings") + output_dir.mkdir(exist_ok=True) + file_path: Final[Path] = output_dir / self._filename + + timestamps: Final[np.ndarray] = np.array([f.timestamp for f in self._frames]) + data_blocks: Final[np.ndarray] = np.array([f.data for f in self._frames]) + + np.savez_compressed(file_path, timestamps=timestamps, data=data_blocks) + + self._logger.info("Saved session to binary file: %s", file_path) + + except (OSError, IOError) as e: + self._logger.error("Failed to save recording to disk: %s", e) + raise diff --git a/examples/playback_stream.py b/examples/playback_stream.py new file mode 100644 index 0000000..7ecb043 --- /dev/null +++ b/examples/playback_stream.py @@ -0,0 +1,36 @@ +from bridge.eeg.file import FileDevice + + +def playback_session() -> None: + # Ścieżka do pliku utworzonego przez poprzedni skrypt + file_path = "recordings/my_brain_data.npz" + + try: + # 1. Inicjalizujemy emulator pliku (sfreq=250 to standard dla BrainAccess) + device = FileDevice(file_path=file_path, sfreq=250.0) + + print(f"Otwieranie pliku: {file_path}") + + # 2. Łączymy się (wczytanie danych do pamięci) + with device: + info = device.get_device_data() + print(f"Symulacja urządzenia: {info.manufacturer} (Źródło: {info.name})") + + print("Rozpoczynam odtwarzanie strumienia...") + + # 3. Ta pętla działa identycznie jak przy prawdziwym czepku! + # FileDevice sam zadba o odpowiednie odstępy czasowe (timing), + # żeby symulować 250Hz. + for i, chunk in enumerate(device.stream()): + # Tutaj możesz wstawić swoją logikę analizy/procesowania + avg_signal = chunk.mean() + print(f"Ramka {i:03} | Średnie napięcie: {avg_signal:.2f} uV") + + except FileNotFoundError: + print(f"Błąd: Nie znaleziono pliku {file_path}. Najpierw uruchom record_stream.py") + except Exception as e: + print(f"Wystąpił błąd: {e}") + + +if __name__ == "__main__": + playback_session() diff --git a/examples/record_stream.py b/examples/record_stream.py new file mode 100644 index 0000000..7601e1b --- /dev/null +++ b/examples/record_stream.py @@ -0,0 +1,45 @@ +import time + +from bridge.eeg import EEGConnector, close, init +from bridge.eeg.recorder import EEGRecorder + + +def record_session() -> None: + # 1. Inicjalizacja sterowników SDK + init() + + try: + # 2. Używamy Connectora, aby automatycznie znalazł urządzenie + with EEGConnector() as connector: + device = connector._eeg_device # Pobieramy dostęp do instancji urządzenia + if not device: + print("Nie znaleziono urządzenia!") + return + + print(f"Połączono z: {device.get_device_data().name}") + + # 3. Tworzymy rekorder (automatycznie zapisze do .npz przy wyjściu z context managera) + # Plik trafi do folderu recordings/my_brain_data.npz + with EEGRecorder(device, filename="my_brain_data.npz") as recorder: + print("Rozpoczynam zbieranie danych (10 sekund)...") + + start_time = time.time() + # recorder.stream() to generator, który pod spodem wywołuje device.stream() + for chunk in recorder.stream(): + print(f"Odebrano paczkę o kształcie: {chunk.shape}") + + # Przerwij po 10 sekundach + if time.time() - start_time > 10: + break + + print("Zakończono zbieranie. Zapisywanie...") + + except Exception as e: + print(f"Wystąpił błąd: {e}") + finally: + # 4. Zwolnienie zasobów SDK + close() + + +if __name__ == "__main__": + record_session() diff --git a/pyproject.toml b/pyproject.toml index 0f25212..e5bff50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ packages = [ [project] name = "neuron-bridge" -version = "0.2.0" +version = "0.3.0" authors = [ { name = "mateusz-kow", email = "kowalski.mateusz.1lo1@gmail.com" }, ]