Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,3 @@ repos:
- "pytest"
- "pytest-asyncio"
- "pytest-mock"

- repo: local
hooks:
- id: pytest
name: pytest
entry: pytest
language: system
types: [python]
pass_filenames: false
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)

**Wersja: 0.1.0**

**Bridge** to biblioteka (SDK) i aplikacja wiersza poleceń w Pythonie, która tworzy ujednolicony interfejs do zbierania danych z różnych urządzeń EEG. Działa jako "most" między sprzętem a oprogramowaniem analitycznym.

## Główne Cechy
Expand Down
84 changes: 71 additions & 13 deletions bridge/eeg/brainaccess/device.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import multiprocessing
import time
from logging import Logger, getLogger
from queue import Empty, Queue
from typing import Generator

import brainaccess.core.eeg_channel as eeg_channel
import numpy as np
from brainaccess import core
from brainaccess.core.eeg_manager import EEGManager
from brainaccess.utils import acquisition
Expand All @@ -14,6 +18,7 @@
DATA_COLLECTION_TIME,
DEFAULT_BLUETOOTH_ADAPTER,
DEFAULT_DEVICE_PORT,
GAIN_MODE,
IMPEDANCE_MEASUREMENT_TIME,
)

Expand All @@ -27,6 +32,8 @@ def __init__(self, logger: Logger | None = None) -> None:
self._cap: dict[int, str] | None = None
self._mac_address: str | None = None
self._device_name: str | None = None
self._stream_queue: Queue[EEGArray] = Queue()
self._is_streaming: bool = False

super().__init__(logger or getLogger(__name__))

Expand All @@ -50,6 +57,15 @@ def _connect(self, device_name: str, cap: dict[int, str]) -> None:

self._logger.info("Connection successful.")

def _acq_callback(self, chunk: list[float], chunk_size: int) -> None:
"""Wewnętrzny callback wywoływany przez BrainAccess SDK."""
if self._is_streaming:
chunk_array = np.array(chunk)
num_channels = len(self._electrodes)
eeg_chunk = chunk_array[:num_channels, :].astype(np.float64)

self._stream_queue.put(eeg_chunk)

# IM-032
def _get_device_model(self, port: int) -> str:
status = self._manager.connect(port) # type: ignore[union-attr]
Expand Down Expand Up @@ -98,17 +114,24 @@ def connect(
model = self._get_device_model(port)
self._cap = get_cap_from_model(model)

self._electrodes = list(self._cap.values())

try:
self._connect(self._device_name, self._cap)
return
except Exception as e:
self._logger.exception(e)
if self._manager:
self._manager.__exit__(None, None, None)
self._logger.exception(f"Connection failed: {e}")
raise

# IM-032
def disconnect(self) -> None:
self._ensure_connected()
self._is_streaming = False
self._logger.debug("Disconnecting the device...")
if self._manager:
self._manager.stop_stream()
self._manager.disconnect()
self._manager.__exit__(None, None, None)
# self._manager.destroy()
Expand All @@ -117,6 +140,45 @@ def disconnect(self) -> None:

self._logger.info("Device disconnected successfully.")

def stream(self) -> Generator[EEGArray, None, None]:
"""
Generator strumieniujący dane EEG w czasie rzeczywistym.
Użycie:
for chunk in device.stream():
process(chunk)
"""
self._ensure_connected()
assert self._manager is not None

while not self._stream_queue.empty():
self._stream_queue.get()

num_channels = len(self._electrodes)
for i in range(num_channels):
self._manager.set_channel_enabled(eeg_channel.ELECTRODE_MEASUREMENT + i, True)
self._manager.set_channel_gain(eeg_channel.ELECTRODE_MEASUREMENT + i, GAIN_MODE)
self._manager.set_channel_bias(eeg_channel.ELECTRODE_MEASUREMENT + i, True)

self._manager.set_channel_enabled(eeg_channel.STREAMING, True)
self._manager.set_callback_chunk(self._acq_callback)

self._is_streaming = True
self._manager.start_stream()
self._logger.info("Started real-time stream.")

try:
while self._is_streaming:
try:
chunk = self._stream_queue.get(timeout=1.0)
yield chunk
except Empty:
continue
finally:
self._is_streaming = False
if self._manager:
self._manager.stop_stream()
self._logger.info("Stopped real-time stream.")

# IM-032
def get_impedance(self, duration: float = IMPEDANCE_MEASUREMENT_TIME) -> list[float]:
self._ensure_connected()
Expand Down Expand Up @@ -153,16 +215,12 @@ def get_output(self, duration: float = DATA_COLLECTION_TIME, output_file: str |
self._logger.info("Data acquisition completed.")
return raw_data # type: ignore[no-any-return]

def get_device_data(self) -> DeviceData | None:
def get_device_data(self) -> DeviceData:
self._ensure_connected()
try:
return DeviceData(
name=self._device_name,
mac_address=self._mac_address,
manufacturer=BRAINACCESS_MANUFACTURER,
electrodes_num=len(self._cap) if self._cap else None,
sample_rate=self._manager.get_sample_frequency() if self._manager else None,
)
except Exception as e:
self._logger.exception(f"Failed to fetch device data for device {self.__class__.__name__}: {e}")
return None
return DeviceData(
name=self._device_name,
mac_address=self._mac_address,
manufacturer=BRAINACCESS_MANUFACTURER,
electrodes_num=len(self._cap) if self._cap else None,
sample_rate=self._manager.get_sample_frequency() if self._manager else None,
)
10 changes: 6 additions & 4 deletions bridge/eeg/core/device.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from logging import Logger, getLogger
from types import TracebackType
from typing import Generator

from .device_data import DeviceData
from .typing import EEGArray
Expand All @@ -19,15 +20,17 @@ def connect(self) -> None:
def disconnect(self) -> None:
pass

@abstractmethod
def get_output(self, duration: float, output_file: str | None = None) -> EEGArray:
pass
raise NotImplementedError(f"Output retrieval not implemented for this class {self.__class__.__name__}.")

def get_impedance(self, duration: float) -> list[float]:
raise NotImplementedError(f"Impedance measurement not implemented for this class {self.__class__.__name__}.")

def stream(self) -> Generator[EEGArray, None, None]:
raise NotImplementedError(f"Streaming not implemented for this class {self.__class__.__name__}.")

@abstractmethod
def get_device_data(self) -> DeviceData | None:
def get_device_data(self) -> DeviceData:
pass

def __enter__(self) -> "EEGDevice":
Expand All @@ -43,4 +46,3 @@ def __exit__(
) -> None:
self._logger.debug("Exiting context manager...")
self.disconnect()
return None
10 changes: 9 additions & 1 deletion bridge/eeg/core/device_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from dataclasses import dataclass

from bridge.eeg.core.typing import EEGArray

@dataclass

@dataclass(frozen=True, slots=True, kw_only=True)
class DeviceData:
mac_address: str | None = None
name: str | None = None
manufacturer: str | None = None
electrodes_num: int | None = None
sample_rate: int | None = None


@dataclass(frozen=True, slots=True, kw_only=True)
class RecordingFrame:
timestamp: float
data: EEGArray
4 changes: 3 additions & 1 deletion bridge/eeg/core/typing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import TypeAlias

import numpy as np
from numpy.typing import NDArray

EEGArray = NDArray[np.float64]
EEGArray: TypeAlias = NDArray[np.float64]
3 changes: 3 additions & 0 deletions bridge/eeg/file/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .device import FileDevice

__all__ = ["FileDevice"]
60 changes: 60 additions & 0 deletions bridge/eeg/file/device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import time
from logging import Logger, getLogger
from pathlib import Path
from typing import Final, Generator

import numpy as np

from ..core import DeviceData, EEGArray, EEGDevice


class FileDevice(EEGDevice):
"""Emulator odtwarzający sesje z plików binarnych .npz."""

def __init__(self, file_path: str, sfreq: float = 250.0, logger: Logger | None = None) -> None:
super().__init__(logger or getLogger(__name__))
self._path: Final[Path] = Path(file_path)
self._sfreq: Final[float] = sfreq
self._data: np.ndarray | None = None
self._is_connected: bool = False

def connect(self) -> None:
if not self._path.exists():
raise FileNotFoundError(f"Binary file not found: {self._path}")

with np.load(self._path) as loader:
self._data = loader["data"]

self._is_connected = True
if not self._data:
raise ValueError(f"No data found in file: {self._path}")

self._logger.info("FileDevice connected. Loaded %d blocks.", len(self._data))

def disconnect(self) -> None:
self._is_connected = False

def stream(self) -> Generator[EEGArray, None, None]:
if not self._is_connected or self._data is None:
raise RuntimeError("FileDevice not connected.")

chunk_size: Final[int] = self._data.shape[2]
interval: Final[float] = chunk_size / self._sfreq

start_perf: Final[float] = time.perf_counter()

for count, chunk in enumerate(self._data, start=1):
if not self._is_connected:
break

target: float = start_perf + (count * interval)

while time.perf_counter() < target:
diff = target - time.perf_counter()
if diff > 0.002:
time.sleep(diff - 0.001)

yield chunk

def get_device_data(self) -> DeviceData:
return DeviceData(name=self._path.name, manufacturer="BinarySim", sample_rate=int(self._sfreq))
57 changes: 57 additions & 0 deletions bridge/eeg/recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
from logging import Logger, getLogger
from pathlib import Path
from typing import Any, Final, Generator

import numpy as np

from .core import EEGArray, EEGDevice
from .core.device_data import RecordingFrame


class EEGRecorder:
"""Rejestrator EEG wykorzystujący wysokowydajny format binarny NumPy."""

def __init__(self, device: EEGDevice, filename: str, logger: Logger | None = None, autosave: bool = True) -> None:
self._logger: Final[Logger] = logger or getLogger(__name__)
self._device: Final[EEGDevice] = device
self._filename: Final[str] = filename
self._autosave: Final[bool] = autosave
self._frames: list[RecordingFrame] = []

def __enter__(self) -> "EEGRecorder":
self._device.connect()
return self

def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._autosave:
self.save()
self._device.disconnect()

def stream(self) -> Generator[EEGArray, None, None]:
"""Strumieniuje dane i buforuje je w pamięci jako RecordingFrame."""
for chunk in self._device.stream():
self._frames.append(RecordingFrame(timestamp=time.time(), data=chunk))
yield chunk

def save(self) -> None:
"""Zapisuje dane do skompresowanego pliku binarnego NumPy (.npz)."""
if not self._frames:
self._logger.warning("No data to save.")
return

try:
output_dir: Final[Path] = Path("recordings")
output_dir.mkdir(exist_ok=True)
file_path: Final[Path] = output_dir / self._filename

timestamps: Final[np.ndarray] = np.array([f.timestamp for f in self._frames])
data_blocks: Final[np.ndarray] = np.array([f.data for f in self._frames])

np.savez_compressed(file_path, timestamps=timestamps, data=data_blocks)

self._logger.info("Saved session to binary file: %s", file_path)

except (OSError, IOError) as e:
self._logger.error("Failed to save recording to disk: %s", e)
raise
36 changes: 36 additions & 0 deletions examples/playback_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from bridge.eeg.file import FileDevice


def playback_session() -> None:
# Ścieżka do pliku utworzonego przez poprzedni skrypt
file_path = "recordings/my_brain_data.npz"

try:
# 1. Inicjalizujemy emulator pliku (sfreq=250 to standard dla BrainAccess)
device = FileDevice(file_path=file_path, sfreq=250.0)

print(f"Otwieranie pliku: {file_path}")

# 2. Łączymy się (wczytanie danych do pamięci)
with device:
info = device.get_device_data()
print(f"Symulacja urządzenia: {info.manufacturer} (Źródło: {info.name})")

print("Rozpoczynam odtwarzanie strumienia...")

# 3. Ta pętla działa identycznie jak przy prawdziwym czepku!
# FileDevice sam zadba o odpowiednie odstępy czasowe (timing),
# żeby symulować 250Hz.
for i, chunk in enumerate(device.stream()):
# Tutaj możesz wstawić swoją logikę analizy/procesowania
avg_signal = chunk.mean()
print(f"Ramka {i:03} | Średnie napięcie: {avg_signal:.2f} uV")

except FileNotFoundError:
print(f"Błąd: Nie znaleziono pliku {file_path}. Najpierw uruchom record_stream.py")
except Exception as e:
print(f"Wystąpił błąd: {e}")


if __name__ == "__main__":
playback_session()
Loading
Loading