From 6796b8373b2bdbe1ef2eca2041080619419b00cf Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Thu, 14 May 2026 11:25:37 -0600 Subject: [PATCH 1/6] DataTrackStreamReadResponse has optional DataTrackStreamEOS field --- include/livekit/data_track_stream.h | 11 +++-- src/data_track_stream.cpp | 52 +++++++++++++++++++---- src/tests/unit/test_data_track_stream.cpp | 47 ++++++++++++++++++++ 3 files changed, 99 insertions(+), 11 deletions(-) diff --git a/include/livekit/data_track_stream.h b/include/livekit/data_track_stream.h index 5c8c3cc3..5c3c5042 100644 --- a/include/livekit/data_track_stream.h +++ b/include/livekit/data_track_stream.h @@ -18,8 +18,6 @@ #include #include -#include -#include #include #include @@ -32,7 +30,8 @@ namespace livekit { namespace proto { class FfiEvent; -} +class DataTrackStreamReadResponse; +} // namespace proto /** * Represents a pull-based stream of frames from a remote data track. @@ -111,6 +110,12 @@ class LIVEKIT_API DataTrackStream { /// FFI event handler, called by FfiClient. void onFfiEvent(const proto::FfiEvent& event); + /// Handle the immediate response returned by a read request. + void handleReadResponse(const proto::DataTrackStreamReadResponse& response); + + /// Mark the stream failed due to an invalid FFI protocol response. + [[noreturn]] void failProtocolError(const char* message); + /// Push a received DataTrackFrame to the internal storage. void pushFrame(DataTrackFrame&& frame); diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index b7dca73f..b9f1cf16 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -16,6 +16,7 @@ #include "livekit/data_track_stream.h" +#include #include #include "data_track.pb.h" @@ -27,6 +28,19 @@ namespace livekit { using proto::FfiEvent; +namespace { + +constexpr char kMissingReadResponseError[] = "DataTrackStream::read: FFI response missing data_track_stream_read"; + +std::optional terminalErrorFromEos(const proto::DataTrackStreamEOS& eos) { + if (!eos.has_error()) { + return std::nullopt; + } + return SubscribeDataTrackError::fromProto(eos.error()); +} + +} // namespace + DataTrackStream::~DataTrackStream() { close(); } void DataTrackStream::init(FfiHandle subscription_handle) { @@ -36,6 +50,9 @@ void DataTrackStream::init(FfiHandle subscription_handle) { } bool DataTrackStream::read(DataTrackFrame& out) { + proto::DataTrackStreamReadResponse read_response; + bool missing_read_response = false; + { const std::scoped_lock lock(mutex_); if (closed_ || eof_) { @@ -50,9 +67,20 @@ bool DataTrackStream::read(DataTrackFrame& out) { proto::FfiRequest req; auto* msg = req.mutable_data_track_stream_read(); msg->set_stream_handle(subscription_handle); - FfiClient::instance().sendRequest(req); + const proto::FfiResponse resp = FfiClient::instance().sendRequest(req); + if (!resp.has_data_track_stream_read()) { + missing_read_response = true; + } else { + read_response = resp.data_track_stream_read(); + } + } + + if (missing_read_response) { + failProtocolError(kMissingReadResponseError); } + handleReadResponse(read_response); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; }); @@ -113,15 +141,23 @@ void DataTrackStream::onFfiEvent(const FfiEvent& event) { DataTrackFrame frame = DataTrackFrame::fromOwnedInfo(fr); pushFrame(std::move(frame)); } else if (dts.has_eos()) { - std::optional error; - const auto& eos = dts.eos(); - if (eos.has_error()) { - error = SubscribeDataTrackError::fromProto(eos.error()); - } - pushEos(std::move(error)); + pushEos(terminalErrorFromEos(dts.eos())); } } +void DataTrackStream::handleReadResponse(const proto::DataTrackStreamReadResponse& response) { + if (!response.has_eos_event()) { + return; + } + pushEos(terminalErrorFromEos(response.eos_event())); +} + +void DataTrackStream::failProtocolError(const char* message) { + LK_LOG_ERROR("{}", message); + pushEos(SubscribeDataTrackError{SubscribeDataTrackErrorCode::PROTOCOL_ERROR, message}); + close(); +} + void DataTrackStream::pushFrame(DataTrackFrame&& frame) { const std::scoped_lock lock(mutex_); @@ -141,7 +177,7 @@ void DataTrackStream::pushFrame(DataTrackFrame&& frame) { void DataTrackStream::pushEos(std::optional error) { { const std::scoped_lock lock(mutex_); - if (eof_) { + if (closed_ || eof_) { return; } eof_ = true; diff --git a/src/tests/unit/test_data_track_stream.cpp b/src/tests/unit/test_data_track_stream.cpp index 71d14e8c..dd15d3d6 100644 --- a/src/tests/unit/test_data_track_stream.cpp +++ b/src/tests/unit/test_data_track_stream.cpp @@ -33,6 +33,12 @@ class DataTrackStreamTest : public ::testing::Test { static void pushEvent(DataTrackStream& stream, const proto::FfiEvent& event) { stream.onFfiEvent(event); } + static void handleReadResponse(DataTrackStream& stream, const proto::DataTrackStreamReadResponse& response) { + stream.handleReadResponse(response); + } + + static void failProtocolError(DataTrackStream& stream, const char* message) { stream.failProtocolError(message); } + static proto::FfiEvent makeEosEvent(std::optional code = std::nullopt, const std::string& message = {}) { proto::FfiEvent event; @@ -47,6 +53,18 @@ class DataTrackStreamTest : public ::testing::Test { return event; } + static proto::DataTrackStreamReadResponse makeEosReadResponse( + std::optional code = std::nullopt, const std::string& message = {}) { + proto::DataTrackStreamReadResponse response; + auto* eos = response.mutable_eos_event(); + if (code.has_value()) { + auto* error = eos->mutable_error(); + error->set_code(code.value()); + error->set_message(message); + } + return response; + } + static proto::FfiEvent makeAudioStreamEvent() { proto::FfiEvent event; event.mutable_audio_stream_event()->set_stream_handle(0); @@ -71,6 +89,15 @@ TEST_F(DataTrackStreamTest, TerminalErrorEmptyForNormalEos) { EXPECT_FALSE(stream->terminalError().has_value()); } +TEST_F(DataTrackStreamTest, ReadResponseNormalEosEndsStreamWithoutTerminalError) { + auto stream = makeStream(); + handleReadResponse(*stream, makeEosReadResponse()); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + EXPECT_FALSE(stream->terminalError().has_value()); +} + TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) { auto stream = makeStream(); pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, @@ -85,6 +112,26 @@ TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) { EXPECT_EQ(error->message, "track unpublished before subscription completed"); } +TEST_F(DataTrackStreamTest, ReadResponseSubscribeFailureEosStoresTerminalError) { + auto stream = makeStream(); + handleReadResponse(*stream, makeEosReadResponse(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, + "track unpublished before read completed")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + expectTerminalError(*stream, SubscribeDataTrackErrorCode::UNPUBLISHED, "track unpublished before read completed"); +} + +TEST_F(DataTrackStreamTest, ProtocolErrorClosesStreamAndStoresTerminalError) { + auto stream = makeStream(); + + EXPECT_NO_THROW(failProtocolError(*stream, "malformed FFI response")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + expectTerminalError(*stream, SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "malformed FFI response"); +} + TEST_F(DataTrackStreamTest, CloseBeforeEosSuppressesLaterTerminalError) { auto stream = makeStream(); stream->close(); From 87752af2ea9e29f3f760cc919e05d1d922914bc5 Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Wed, 20 May 2026 10:56:21 -0700 Subject: [PATCH 2/6] update rust hash --- client-sdk-rust | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-sdk-rust b/client-sdk-rust index d74892de..cfe8af57 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit d74892de57724bd08dc2307b68e4251b8eae2f33 +Subproject commit cfe8af57de249cf4b1a051372740179a079e4900 From c1ddfca88f5c710eb6395f70451f377a22b44a5c Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Wed, 20 May 2026 11:35:37 -0700 Subject: [PATCH 3/6] rm no return --- include/livekit/data_track_stream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/livekit/data_track_stream.h b/include/livekit/data_track_stream.h index 5c3c5042..a88661a1 100644 --- a/include/livekit/data_track_stream.h +++ b/include/livekit/data_track_stream.h @@ -114,7 +114,7 @@ class LIVEKIT_API DataTrackStream { void handleReadResponse(const proto::DataTrackStreamReadResponse& response); /// Mark the stream failed due to an invalid FFI protocol response. - [[noreturn]] void failProtocolError(const char* message); + void failProtocolError(const char* message); /// Push a received DataTrackFrame to the internal storage. void pushFrame(DataTrackFrame&& frame); From 257a75f6fa128c76d8f2ad5ec255505270d6657f Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Wed, 20 May 2026 11:36:05 -0700 Subject: [PATCH 4/6] fix utests --- include/livekit/data_track_stream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/livekit/data_track_stream.h b/include/livekit/data_track_stream.h index a88661a1..6328d152 100644 --- a/include/livekit/data_track_stream.h +++ b/include/livekit/data_track_stream.h @@ -148,7 +148,7 @@ class LIVEKIT_API DataTrackStream { FfiHandle subscription_handle_; /** FfiClient listener id for routing FfiEvent callbacks to this object. */ - std::int32_t listener_id_{0}; + std::int32_t listener_id_{-1}; }; } // namespace livekit From 6d521a70886287a753bf14b93abec5c507dfd0bd Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Wed, 20 May 2026 16:18:20 -0700 Subject: [PATCH 5/6] update rust hash --- client-sdk-rust | 2 +- src/data_track_stream.cpp | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/client-sdk-rust b/client-sdk-rust index cfe8af57..516e43c2 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit cfe8af57de249cf4b1a051372740179a079e4900 +Subproject commit 516e43c23577cb9a3ab57dae7fc805984acb9c11 diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index b9f1cf16..155f4788 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -77,6 +77,7 @@ bool DataTrackStream::read(DataTrackFrame& out) { if (missing_read_response) { failProtocolError(kMissingReadResponseError); + return false; } handleReadResponse(read_response); From 4b60ee8278dbf3bb34e1e787a1538e3f8c15b2c2 Mon Sep 17 00:00:00 2001 From: Stephen DeRosa Date: Wed, 20 May 2026 16:46:30 -0700 Subject: [PATCH 6/6] restore rust hash --- client-sdk-rust | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-sdk-rust b/client-sdk-rust index 516e43c2..d74892de 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 516e43c23577cb9a3ab57dae7fc805984acb9c11 +Subproject commit d74892de57724bd08dc2307b68e4251b8eae2f33