Skip to content

Commit 5f30660

Browse files
committed
fix(rs): allow indexing partial paths
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
1 parent 8d8d510 commit 5f30660

7 files changed

Lines changed: 264 additions & 142 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,6 @@ wrpc-cli = { version = "0.3", path = "./crates/cli", default-features = false }
141141
wrpc-introspect = { version = "0.4.1", default-features = false, path = "./crates/introspect" }
142142
wrpc-runtime-wasmtime = { version = "0.23", path = "./crates/runtime-wasmtime", default-features = false }
143143
wrpc-transport = { version = "0.27", path = "./crates/transport", default-features = false }
144-
wrpc-transport-nats = { version = "0.24", path = "./crates/transport-nats", default-features = false }
145-
wrpc-transport-quic = { version = "0.2", path = "./crates/transport-quic", default-features = false }
144+
wrpc-transport-nats = { version = "0.24.1", path = "./crates/transport-nats", default-features = false }
145+
wrpc-transport-quic = { version = "0.2.1", path = "./crates/transport-quic", default-features = false }
146146
wrpc-wasmtime-nats-cli = { version = "0.9", path = "./crates/wasmtime-nats-cli", default-features = false }

crates/transport-nats/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-nats"
3-
version = "0.24.0"
3+
version = "0.24.1"
44
description = "wRPC NATS transport"
55

66
authors.workspace = true

crates/transport-nats/src/lib.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl SubscriberTree {
306306

307307
pub struct Reader {
308308
buffer: Bytes,
309-
incoming: Subscriber,
309+
incoming: Option<Subscriber>,
310310
nested: Arc<std::sync::Mutex<SubscriberTree>>,
311311
path: Box<[usize]>,
312312
}
@@ -323,9 +323,7 @@ impl wrpc_transport::Index<Self> for Reader {
323323
trace!("taking index subscription");
324324
let mut p = self.path.to_vec();
325325
p.extend_from_slice(path);
326-
let incoming = nested
327-
.take(&p)
328-
.with_context(|| format!("unknown subscription for path `{p:?}`"))?;
326+
let incoming = nested.take(&p);
329327
Ok(Self {
330328
buffer: Bytes::default(),
331329
incoming,
@@ -358,8 +356,14 @@ impl AsyncRead for Reader {
358356
}
359357
return Poll::Ready(Ok(()));
360358
}
359+
let Some(incoming) = self.incoming.as_mut() else {
360+
return Poll::Ready(Err(std::io::Error::new(
361+
std::io::ErrorKind::NotFound,
362+
format!("subscription not found for path {:?}", self.path),
363+
)));
364+
};
361365
trace!("polling for next message");
362-
match self.incoming.poll_next_unpin(cx) {
366+
match incoming.poll_next_unpin(cx) {
363367
Poll::Ready(Some(Message { mut payload, .. })) => {
364368
trace!(?payload, "received message");
365369
if payload.is_empty() {
@@ -961,7 +965,7 @@ impl wrpc_transport::Invoke for Client {
961965
)),
962966
Reader {
963967
buffer: Bytes::default(),
964-
incoming: result_rx,
968+
incoming: Some(result_rx),
965969
nested: Arc::new(std::sync::Mutex::new(nested)),
966970
path: Box::default(),
967971
},
@@ -1042,7 +1046,7 @@ impl wrpc_transport::Serve for Client {
10421046
SubjectWriter::new((*nats).clone(), Subject::from(result_subject(&tx))),
10431047
Reader {
10441048
buffer: payload,
1045-
incoming: param_rx,
1049+
incoming: Some(param_rx),
10461050
nested: Arc::new(std::sync::Mutex::new(nested)),
10471051
path: Box::default(),
10481052
},

crates/transport-quic/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "wrpc-transport-quic"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
description = "wRPC QUIC transport"
55

66
authors.workspace = true

crates/transport-quic/src/lib.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl<P: AsRef<[Option<usize>]>> FromIterator<P> for IndexTree {
134134
}
135135

136136
impl IndexTree {
137-
#[instrument(level = "trace", skip(self))]
137+
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
138138
fn take_rx(&mut self, path: &[usize]) -> Option<oneshot::Receiver<RecvStream>> {
139139
let Some((i, path)) = path.split_first() else {
140140
return match self {
@@ -168,7 +168,7 @@ impl IndexTree {
168168
}
169169
}
170170

171-
#[instrument(level = "trace", skip(self))]
171+
#[instrument(level = "trace", skip(self), ret(level = "trace"))]
172172
fn take_tx(&mut self, path: &[usize]) -> Option<oneshot::Sender<RecvStream>> {
173173
let Some((i, path)) = path.split_first() else {
174174
return match self {
@@ -204,7 +204,7 @@ impl IndexTree {
204204

205205
/// Inserts `sender` and `receiver` under a `path` - returns `false` if it failed and `true` if it succeeded.
206206
/// Tree state after `false` is returned is undefined
207-
#[instrument(level = "trace", skip(self, sender, receiver), ret)]
207+
#[instrument(level = "trace", skip(self, sender, receiver), ret(level = "trace"))]
208208
fn insert(
209209
&mut self,
210210
path: &[Option<usize>],
@@ -366,7 +366,7 @@ pin_project! {
366366
index: Arc<std::sync::Mutex<IndexTree>>,
367367
path: Arc<[usize]>,
368368
#[pin]
369-
rx: oneshot::Receiver<RecvStream>,
369+
rx: Option<oneshot::Receiver<RecvStream>>,
370370
io: Arc<JoinSet<std::io::Result<()>>>,
371371
},
372372
Active {
@@ -406,12 +406,7 @@ impl wrpc_transport::Index<Self> for Incoming {
406406
std::io::Error::new(std::io::ErrorKind::Other, err.to_string())
407407
})?;
408408
trace!(?path, "taking index subscription");
409-
let rx = lock.take_rx(&path).ok_or_else(|| {
410-
std::io::Error::new(
411-
std::io::ErrorKind::NotFound,
412-
format!("`{path:?}` subscription not found"),
413-
)
414-
})?;
409+
let rx = lock.take_rx(&path);
415410
Ok(Self::Accepting {
416411
index: Arc::clone(index),
417412
path,
@@ -424,7 +419,7 @@ impl wrpc_transport::Index<Self> for Incoming {
424419
}
425420

426421
impl AsyncRead for Incoming {
427-
#[instrument(level = "trace", skip_all, ret)]
422+
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
428423
fn poll_read(
429424
mut self: Pin<&mut Self>,
430425
cx: &mut Context<'_>,
@@ -434,11 +429,17 @@ impl AsyncRead for Incoming {
434429
IncomingProj::Accepting {
435430
index,
436431
path,
437-
mut rx,
432+
rx,
438433
io,
439434
} => {
435+
let Some(rx) = rx.as_pin_mut() else {
436+
return Poll::Ready(Err(std::io::Error::new(
437+
std::io::ErrorKind::NotFound,
438+
format!("subscription not found for path {:?}", path),
439+
)));
440+
};
440441
trace!(?path, "polling channel");
441-
let rx = ready!(rx.as_mut().poll(cx))
442+
let rx = ready!(rx.poll(cx))
442443
.map_err(|err| std::io::Error::new(std::io::ErrorKind::BrokenPipe, err))?;
443444
*self = Self::Active {
444445
index: Arc::clone(index),
@@ -536,7 +537,7 @@ fn poll_write_header(
536537
}
537538

538539
impl Outgoing {
539-
#[instrument(level = "trace", skip_all, ret)]
540+
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
540541
fn poll_flush_header(
541542
mut self: Pin<&mut Self>,
542543
cx: &mut Context<'_>,
@@ -571,7 +572,7 @@ fn corrupted_memory_error() -> std::io::Error {
571572
}
572573

573574
impl AsyncWrite for Outgoing {
574-
#[instrument(level = "trace", skip_all, ret, fields(buf = format!("{buf:02x?}")))]
575+
#[instrument(level = "trace", skip_all, fields(buf = format!("{buf:02x?}")), ret(level = "trace"))]
575576
fn poll_write(
576577
mut self: Pin<&mut Self>,
577578
cx: &mut Context<'_>,
@@ -587,7 +588,7 @@ impl AsyncWrite for Outgoing {
587588
}
588589
}
589590

590-
#[instrument(level = "trace", skip_all, ret)]
591+
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
591592
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
592593
ready!(self.as_mut().poll_flush_header(cx))?;
593594
match self.as_mut().project() {
@@ -599,7 +600,7 @@ impl AsyncWrite for Outgoing {
599600
}
600601
}
601602

602-
#[instrument(level = "trace", skip_all, ret)]
603+
#[instrument(level = "trace", skip_all, ret(level = "trace"))]
603604
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
604605
ready!(self.as_mut().poll_flush_header(cx))?;
605606
match self.as_mut().project() {

0 commit comments

Comments
 (0)