Better event handling#256
Merged
Merged
Conversation
Barecheck - Code coverage reportTotal: 96.97%Your code coverage diff: 0.06% ▴ Uncovered files and lines |
bprobert97
requested changes
Jun 1, 2026
bprobert97
requested changes
Jun 3, 2026
This refactors handling of property and action observations. In particular, it: * Introduces a MessageBroker class to handle pub/sub messaging centrally. This eliminates duplicated code from descriptors and should be much clearer. * Adds a `publish` method to the thing server interface for easy publication of events. * No longer errors if events are published before the event loop is active: they are silently ignored. * Removes the option to set properties without emitting an event: this is no longer needed - it was only ever done to suppress errors. * Separates handling of pub/sub messages from websocket protocol considerations. This does not change the websocket protocol. I've updated tests, but have not yet added tests for `MessageBroker`.
This adds tests for `Message` and `MessageBroker`. As a result of testing, I've tightened up a few things: * `MessageBroker` now has a method to close all its send streams. This should help achieve a clean shutdown, and stops tests hanging. * `Message` is now a `pydantic` dataclass so it validates types. We may drop this back to a regular dataclass in the future for performance reasons. * mock_thing_instance now mocks a sensible `name` property so properties can work properly.
The sets of observers for properties and actions used to be stored in a `_labthings_data` attribute of the `Thing`. They now live in the `MessageBroker` so I have eliminated `LabThingsData`. Good riddance :)
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
The parameters are now documented properly. I've also removed "event" from the `message_type` literal, because events aren't supported yet. That's for a future PR.
I think the first check (for Thing) is far more likely to prove useful - but it's a cheap check so we may as well do both as checking only one seems odd.
Previously, unsubscribing from a non-existent subscription may or may not have raised a KeyError. Now, we always get the error. Test updated accordingly.
This should significantly improve the robustness of the message broker thanks to comments from @bprobert97. The main changes are: * If a stream is closed, it's automatically unsubscribed from messages. This avoids errors that could derail the message broker. * If a stream's buffer is full, we drop messages rather than blocking. This avoids gumming up the event loop with stuck tasks, which is a potential memory leak. * I've made the subscribe and unsubscribe functions `async` because they must be called in the event loop. They don't actually need to be async, but they do need to be in the event loop, and the async keyword is a good way to ensure that. I considered logging a warning when streams are automatically unsubscribed, but decided against it. My thinking is that a closed stream can't be reopened, so there's no risk that the unsubscription is losing messages unexpectedly. Also, saying that closing or deleting the stream is an official way to unsubscribe removes the need for lots of logic to track and explicitly remove subscriptions, which simplifies the websocket code. I have added an `anyio` line to pytest configuration so we can have `async` tests now. This supports the new tests in `test_message_broker.py` and avoids lots of `anyio.run()` clutter.
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
Co-authored-by: Beth <167304066+bprobert97@users.noreply.github.com>
bprobert97
approved these changes
Jun 4, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR makes a start on improving event handling in LabThings. It centralises event handling into a
MessageBrokerclass, and adds apublishmethod to theThingServerInterface. This removes ugly boilerplate from property and action descriptors, and creates a separation between pub/sub message handling, and the details of the websocket protocol.Note: this PR originally contained a bunch more stuff, and was too big. I've split it up, so this is now the first in a few planned PRs.
After this is done, I plan to update the websocket protocol to support more message types and realign with standards.
This PR does not change how websockets work, just tidies up the implementation and moves code from descriptors into a coherent module.
It does introduce a few changes, which are mostly unlikely to be noticed:
BaseProperty.__set__no longer accepts an argument to suppress emitting events. That was only used internally, and was only needed because of the previous behaviour where it errored if events fired before the event loop started.set_without_emitmethods are deleted, andload_settingsusessetinstead.observableproperty to properties. This is true for data properties and false for functional properties. It does not yet propagate to the Thing Description.In principle, if the
observableproperty propagates to the TD, then aThingimplementation could manually callpublishto notify observers when a functional property changes. However, if it's always know when the property changes, it's likely that a data property (possibly with an@lt.on_setfunction from #331) may be a better choice.