Skip to content

Better event handling#256

Merged
rwb27 merged 14 commits into
mainfrom
publish-event
Jun 4, 2026
Merged

Better event handling#256
rwb27 merged 14 commits into
mainfrom
publish-event

Conversation

@rwb27
Copy link
Copy Markdown
Collaborator

@rwb27 rwb27 commented Feb 2, 2026

This PR makes a start on improving event handling in LabThings. It centralises event handling into a MessageBroker class, and adds a publish method to the ThingServerInterface. This removes ugly boilerplate from property and action descriptors, and creates a separation between pub/sub message handling, and the details of the websocket protocol.

Note: this PR originally contained a bunch more stuff, and was too big. I've split it up, so this is now the first in a few planned PRs.
After this is done, I plan to update the websocket protocol to support more message types and realign with standards.

This PR does not change how websockets work, just tidies up the implementation and moves code from descriptors into a coherent module.

It does introduce a few changes, which are mostly unlikely to be noticed:

  • Messages published before the event loop starts are silently ignored. This feels reasonable, as there's (currently) no way to subscribe to them before the event loop starts.
  • BaseProperty.__set__ no longer accepts an argument to suppress emitting events. That was only used internally, and was only needed because of the previous behaviour where it errored if events fired before the event loop started.
  • Corresponding set_without_emit methods are deleted, and load_settings uses set instead.
  • I've added an observable property to properties. This is true for data properties and false for functional properties. It does not yet propagate to the Thing Description.

In principle, if the observable property propagates to the TD, then a Thing implementation could manually call publish to notify observers when a functional property changes. However, if it's always know when the property changes, it's likely that a data property (possibly with an @lt.on_set function from #331) may be a better choice.

@rwb27 rwb27 added this to the v0.3.0 milestone May 7, 2026
@barecheck
Copy link
Copy Markdown

barecheck Bot commented May 28, 2026

@rwb27 rwb27 marked this pull request as ready for review May 28, 2026 23:04
Comment thread src/labthings_fastapi/exceptions.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py
Comment thread src/labthings_fastapi/message_broker.py
Comment thread src/labthings_fastapi/message_broker.py
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py
Comment thread src/labthings_fastapi/websockets.py
Comment thread tests/test_message_broker.py
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread src/labthings_fastapi/message_broker.py Outdated
Comment thread tests/test_message_broker.py Outdated
Comment thread src/labthings_fastapi/exceptions.py
Comment thread tests/test_message_broker.py Outdated
Comment thread tests/test_message_broker.py
rwb27 and others added 13 commits June 3, 2026 21:53
This refactors handling of property and action observations. In particular, it:

* Introduces a MessageBroker class to handle pub/sub messaging centrally. This eliminates duplicated code from descriptors and should be much clearer.
* Adds a `publish` method to the thing server interface for easy publication of events.
* No longer errors if events are published before the event loop is active: they are silently ignored.
* Removes the option to set properties without emitting an event: this is no longer needed - it was only ever done to suppress errors.
* Separates handling of pub/sub messages from websocket protocol considerations.

This does not change the websocket protocol.

I've updated tests, but have not yet added tests for `MessageBroker`.
This adds tests for `Message` and `MessageBroker`. As a result of testing, I've tightened up a few things:
* `MessageBroker` now has a method to close all its send streams.
  This should help achieve a clean shutdown, and stops tests hanging.
* `Message` is now a `pydantic` dataclass so it validates types. We may drop this back to a regular
  dataclass in the future for performance reasons.
* mock_thing_instance now mocks a sensible `name` property so properties
  can work properly.
The sets of observers for properties and actions used to be stored in a
`_labthings_data` attribute of the `Thing`.
They now live in the `MessageBroker` so I have eliminated `LabThingsData`.
Good riddance :)
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
The parameters are now documented properly.

I've also removed "event" from the `message_type` literal, because
events aren't supported yet. That's for a future PR.
I think the first check (for Thing) is far more likely to prove useful - but
it's a cheap check so we may as well do both as checking only one seems
odd.
Previously, unsubscribing from a non-existent subscription may or
may not have raised a KeyError.
Now, we always get the error.

Test updated accordingly.
This should significantly improve the robustness of the message broker
thanks to comments from @bprobert97. The main changes are:

* If a stream is closed, it's automatically unsubscribed from
  messages. This avoids errors that could derail the message broker.
* If a stream's buffer is full, we drop messages rather than blocking. This
  avoids gumming up the event loop with stuck tasks, which is a potential
  memory leak.
* I've made the subscribe and unsubscribe functions `async`
  because they must be called in the event loop. They don't actually need
  to be async, but they do need to be in the event loop, and the async
  keyword is a good way to ensure that.

I considered logging a warning when streams are automatically
unsubscribed, but decided against it.
My thinking is that a closed stream can't be reopened, so there's no risk
that the unsubscription is losing messages unexpectedly.
Also, saying that closing or deleting the stream is an official way to unsubscribe
removes the need for lots of logic to track and explicitly remove subscriptions, which simplifies the websocket code.

I have added an `anyio` line to pytest configuration so we can have `async`
tests now. This supports the new
tests in `test_message_broker.py` and
avoids lots of `anyio.run()` clutter.
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
@rwb27 rwb27 requested a review from bprobert97 June 3, 2026 21:57
@rwb27 rwb27 merged commit 755f1ca into main Jun 4, 2026
16 checks passed
@rwb27 rwb27 deleted the publish-event branch June 4, 2026 09:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants