Skip to content

Commit 6498b2f

Browse files
committed
feat(virtq): send logs over virtq
Signed-off-by: Tomasz Andrzejak <andreiltd@gmail.com>
1 parent da79c03 commit 6498b2f

12 files changed

Lines changed: 715 additions & 140 deletions

File tree

src/hyperlight_common/src/virtq/buffer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ pub enum AllocError {
3333
InvalidArg,
3434
#[error("Empty region")]
3535
EmptyRegion,
36-
#[error("Out of memory")]
36+
#[error("No space available")]
37+
NoSpace,
38+
#[error("Requested size exceeds pool capacity")]
3739
OutOfMemory,
3840
#[error("Overflow")]
3941
Overflow,

src/hyperlight_common/src/virtq/mod.rs

Lines changed: 181 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,13 @@ pub trait Notifier {
181181
#[derive(Error, Debug)]
182182
pub enum VirtqError {
183183
#[error("Ring error: {0}")]
184-
RingError(#[from] RingError),
184+
RingError(RingError),
185185
#[error("Allocation error: {0}")]
186-
Alloc(#[from] AllocError),
186+
Alloc(AllocError),
187+
#[error("Ring or pool temporarily full")]
188+
Backpressure,
189+
#[error("Allocation exceeds pool capacity")]
190+
OutOfMemory,
187191
#[error("Invalid token")]
188192
BadToken,
189193
#[error("Invalid chain received")]
@@ -202,6 +206,33 @@ pub enum VirtqError {
202206
NoReadableBuffer,
203207
}
204208

209+
impl VirtqError {
210+
/// Check if this error is transient or unrecoverable.
211+
#[inline(always)]
212+
pub fn is_transient(&self) -> bool {
213+
matches!(self, Self::Backpressure)
214+
}
215+
}
216+
217+
impl From<RingError> for VirtqError {
218+
fn from(e: RingError) -> Self {
219+
match e {
220+
RingError::WouldBlock => Self::Backpressure,
221+
other => Self::RingError(other),
222+
}
223+
}
224+
}
225+
226+
impl From<AllocError> for VirtqError {
227+
fn from(e: AllocError) -> Self {
228+
match e {
229+
AllocError::NoSpace => Self::Backpressure,
230+
AllocError::OutOfMemory => Self::OutOfMemory,
231+
other => Self::Alloc(other),
232+
}
233+
}
234+
}
235+
205236
/// Layout of a packed virtqueue ring in shared memory.
206237
///
207238
/// Describes the memory addresses for the descriptor table and event suppression
@@ -424,7 +455,7 @@ pub(crate) mod test_utils {
424455
let addr = self.next.fetch_add(len as u64, Ordering::Relaxed);
425456
let end = addr + len as u64;
426457
if end > self.base + self.size as u64 {
427-
return Err(AllocError::OutOfMemory);
458+
return Err(AllocError::NoSpace);
428459
}
429460
Ok(Allocation { addr, len })
430461
}
@@ -794,6 +825,153 @@ mod tests {
794825
assert_eq!(&expected_first.1[..], b"resp1");
795826
assert_eq!(&expected_second.1[..], b"resp2");
796827
}
828+
829+
/// Helper: submit a ReadOnly entry (entry data, no completion).
830+
fn send_readonly(
831+
producer: &mut VirtqProducer<TestMem, TestNotifier, TestPool>,
832+
entry_data: &[u8],
833+
) -> Token {
834+
let mut se = producer.chain().entry(entry_data.len()).build().unwrap();
835+
se.write_all(entry_data).unwrap();
836+
producer.submit(se).unwrap()
837+
}
838+
839+
#[test]
840+
fn test_reclaim_frees_ring_slots() {
841+
let ring = make_ring(4);
842+
let (mut producer, mut consumer, _) = make_test_producer(&ring);
843+
844+
// Fill the ring with ReadOnly entries
845+
send_readonly(&mut producer, b"a");
846+
send_readonly(&mut producer, b"b");
847+
send_readonly(&mut producer, b"c");
848+
send_readonly(&mut producer, b"d");
849+
850+
// Ring is now full - next submit should fail with Backpressure
851+
let mut se = producer.chain().entry(1).build().unwrap();
852+
se.write_all(b"e").unwrap();
853+
let res = producer.submit(se);
854+
assert!(
855+
matches!(res, Err(VirtqError::Backpressure)),
856+
"expected Backpressure from full ring"
857+
);
858+
859+
// Consumer acks all entries
860+
while let Some((_, completion)) = consumer.poll(1024).unwrap() {
861+
consumer.complete(completion).unwrap();
862+
}
863+
864+
// Reclaim should free ring slots without losing data
865+
let count = producer.reclaim().unwrap();
866+
assert_eq!(count, 4, "expected 4 reclaimed entries");
867+
868+
// Ring should have space now
869+
send_readonly(&mut producer, b"e");
870+
}
871+
872+
#[test]
873+
fn test_reclaim_buffers_rw_completions() {
874+
let ring = make_ring(4);
875+
let (mut producer, mut consumer, _) = make_test_producer(&ring);
876+
877+
// Submit a ReadWrite entry
878+
let tok = send_readwrite(&mut producer, b"request", 64);
879+
880+
// Consumer processes and writes response
881+
let (_, completion) = consumer.poll(1024).unwrap().unwrap();
882+
let SendCompletion::Writable(mut wc) = completion else {
883+
panic!("expected writable");
884+
};
885+
wc.write_all(b"response-data").unwrap();
886+
consumer.complete(wc.into()).unwrap();
887+
888+
// Reclaim buffers the completion (doesn't discard it)
889+
let count = producer.reclaim().unwrap();
890+
assert_eq!(count, 1);
891+
892+
// poll() should return the buffered completion
893+
let cqe = producer.poll().unwrap().unwrap();
894+
assert_eq!(cqe.token, tok);
895+
assert_eq!(&cqe.data[..], b"response-data");
896+
}
897+
898+
#[test]
899+
fn test_reclaim_then_poll_preserves_order() {
900+
let ring = make_ring(8);
901+
let (mut producer, mut consumer, _) = make_test_producer(&ring);
902+
903+
// Submit 3 entries: RO, RW, RO
904+
let tok_ro1 = send_readonly(&mut producer, b"log1");
905+
let tok_rw = send_readwrite(&mut producer, b"call", 64);
906+
let tok_ro2 = send_readonly(&mut producer, b"log2");
907+
908+
// Consumer processes all 3
909+
let (_, c1) = consumer.poll(1024).unwrap().unwrap();
910+
consumer.complete(c1).unwrap(); // ack RO
911+
912+
let (_, c2) = consumer.poll(1024).unwrap().unwrap();
913+
let SendCompletion::Writable(mut wc) = c2 else {
914+
panic!("expected writable");
915+
};
916+
wc.write_all(b"result").unwrap();
917+
consumer.complete(wc.into()).unwrap(); // complete RW
918+
919+
let (_, c3) = consumer.poll(1024).unwrap().unwrap();
920+
consumer.complete(c3).unwrap(); // ack RO
921+
922+
// Reclaim all 3
923+
let count = producer.reclaim().unwrap();
924+
assert_eq!(count, 3);
925+
926+
// poll() returns them in order
927+
let cqe1 = producer.poll().unwrap().unwrap();
928+
assert_eq!(cqe1.token, tok_ro1);
929+
assert!(cqe1.data.is_empty());
930+
931+
let cqe2 = producer.poll().unwrap().unwrap();
932+
assert_eq!(cqe2.token, tok_rw);
933+
assert_eq!(&cqe2.data[..], b"result");
934+
935+
let cqe3 = producer.poll().unwrap().unwrap();
936+
assert_eq!(cqe3.token, tok_ro2);
937+
assert!(cqe3.data.is_empty());
938+
939+
// No more
940+
assert!(producer.poll().unwrap().is_none());
941+
}
942+
943+
#[test]
944+
fn test_reclaim_mixed_with_poll() {
945+
let ring = make_ring(8);
946+
let (mut producer, mut consumer, _) = make_test_producer(&ring);
947+
948+
// Submit and complete 2 entries
949+
send_readonly(&mut producer, b"x");
950+
let tok_rw = send_readwrite(&mut producer, b"y", 64);
951+
952+
let (_, c1) = consumer.poll(1024).unwrap().unwrap();
953+
consumer.complete(c1).unwrap();
954+
955+
let (_, c2) = consumer.poll(1024).unwrap().unwrap();
956+
let SendCompletion::Writable(mut wc) = c2 else {
957+
panic!("expected writable");
958+
};
959+
wc.write_all(b"reply").unwrap();
960+
consumer.complete(wc.into()).unwrap();
961+
962+
// poll() consumes first entry directly from ring
963+
let cqe1 = producer.poll().unwrap().unwrap();
964+
assert!(cqe1.data.is_empty());
965+
966+
// reclaim() buffers second entry
967+
let count = producer.reclaim().unwrap();
968+
assert_eq!(count, 1);
969+
970+
// poll() returns the buffered one
971+
let cqe2 = producer.poll().unwrap().unwrap();
972+
assert_eq!(cqe2.token, tok_rw);
973+
assert_eq!(&cqe2.data[..], b"reply");
974+
}
797975
}
798976
#[cfg(all(test, loom))]
799977
mod fuzz {

src/hyperlight_common/src/virtq/msg.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,24 @@ pub enum MsgKind {
3434
StreamEnd = 0x04,
3535
/// Cancel a pending request.
3636
Cancel = 0x05,
37+
/// A guest log message (GuestLogData payload follows).
38+
Log = 0x06,
39+
}
40+
41+
impl TryFrom<u8> for MsgKind {
42+
type Error = u8;
43+
44+
fn try_from(value: u8) -> Result<Self, Self::Error> {
45+
match value {
46+
0x01 => Ok(Self::Request),
47+
0x02 => Ok(Self::Response),
48+
0x03 => Ok(Self::StreamChunk),
49+
0x04 => Ok(Self::StreamEnd),
50+
0x05 => Ok(Self::Cancel),
51+
0x06 => Ok(Self::Log),
52+
other => Err(other),
53+
}
54+
}
3755
}
3856

3957
/// Wire header for all virtqueue messages
@@ -72,4 +90,9 @@ impl VirtqMsgHeader {
7290
payload_len,
7391
}
7492
}
93+
94+
/// Parse the kind field into a [`MsgKind`] enum.
95+
pub fn msg_kind(&self) -> Result<MsgKind, u8> {
96+
MsgKind::try_from(self.kind)
97+
}
7598
}

src/hyperlight_common/src/virtq/pool.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ impl<const N: usize> Slab<N> {
169169
return Err(AllocError::OutOfMemory);
170170
}
171171

172-
let idx = self.find_slots(need_slots).ok_or(AllocError::OutOfMemory)?;
172+
let idx = self.find_slots(need_slots).ok_or(AllocError::NoSpace)?;
173173
self.used_slots.insert_range(idx..idx + need_slots);
174174
let addr = self.addr_of(idx).ok_or(AllocError::Overflow)?;
175175

@@ -463,7 +463,7 @@ impl<const L: usize, const U: usize> Inner<L, U> {
463463
if len <= L {
464464
match self.lower.alloc(len) {
465465
Ok(alloc) => return Ok(alloc),
466-
Err(AllocError::OutOfMemory) => {}
466+
Err(AllocError::NoSpace) => {}
467467
Err(e) => return Err(e),
468468
}
469469
}
@@ -614,7 +614,7 @@ impl BufferProvider for RecyclePool {
614614
return Err(AllocError::OutOfMemory);
615615
}
616616

617-
let addr = inner.free.pop().ok_or(AllocError::OutOfMemory)?;
617+
let addr = inner.free.pop().ok_or(AllocError::NoSpace)?;
618618

619619
Ok(Allocation {
620620
addr,
@@ -727,7 +727,7 @@ mod tests {
727727

728728
// Next allocation should fail
729729
let result = slab.alloc(256);
730-
assert!(matches!(result, Err(AllocError::OutOfMemory)));
730+
assert!(matches!(result, Err(AllocError::NoSpace)));
731731

732732
// Free one and retry
733733
slab.dealloc(a2).unwrap();
@@ -1287,7 +1287,7 @@ mod fuzz {
12871287
assert!(alloc.len >= *size);
12881288
allocations.push(alloc);
12891289
}
1290-
Err(AllocError::OutOfMemory) => {}
1290+
Err(AllocError::NoSpace | AllocError::OutOfMemory) => {}
12911291
Err(_) => {
12921292
return false;
12931293
}
@@ -1318,7 +1318,7 @@ mod fuzz {
13181318
assert!(new_alloc.len >= *new_size);
13191319
allocations[idx] = new_alloc;
13201320
}
1321-
Err(AllocError::OutOfMemory) => {}
1321+
Err(AllocError::NoSpace | AllocError::OutOfMemory) => {}
13221322
Err(_) => return false,
13231323
}
13241324
}

src/hyperlight_common/src/virtq/producer.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17+
use alloc::collections::VecDeque;
1718
use alloc::vec;
1819
use alloc::vec::Vec;
1920

@@ -124,6 +125,7 @@ pub struct VirtqProducer<M, N, P> {
124125
notifier: N,
125126
pool: P,
126127
inflight: Vec<Option<Inflight>>,
128+
pending: VecDeque<RecvCompletion>,
127129
}
128130

129131
impl<M, N, P> VirtqProducer<M, N, P>
@@ -149,11 +151,15 @@ where
149151
pool,
150152
notifier,
151153
inflight,
154+
pending: VecDeque::new(),
152155
}
153156
}
154157

155158
/// Poll for a single completion from the device.
156159
///
160+
/// Returns buffered completions from prior [`reclaim`](Self::reclaim)
161+
/// calls first, then checks the ring for new completions.
162+
///
157163
/// Returns `Ok(Some(completion))` if a completion is available, `Ok(None)` if no
158164
/// completions are ready (would block), or an error if the device misbehaved.
159165
///
@@ -167,6 +173,39 @@ where
167173
/// - [`VirtqError::InvalidState`] - Device returned invalid descriptor ID or
168174
/// wrote more data than the completion buffer capacity
169175
pub fn poll(&mut self) -> Result<Option<RecvCompletion>, VirtqError>
176+
where
177+
M: Send + 'static,
178+
P: Send + 'static,
179+
{
180+
if let Some(cqe) = self.pending.pop_front() {
181+
return Ok(Some(cqe));
182+
}
183+
self.poll_ring()
184+
}
185+
186+
/// Reclaim ring slots and pool entries from completed descriptors.
187+
///
188+
/// Processes all available used entries from the ring: frees entry
189+
/// buffer allocations immediately, and buffers completion data for
190+
/// later retrieval via [`poll`](Self::poll).
191+
///
192+
/// Use this to free resources under backpressure without losing
193+
/// completion data. Returns the number of entries reclaimed.
194+
pub fn reclaim(&mut self) -> Result<usize, VirtqError>
195+
where
196+
M: Send + 'static,
197+
P: Send + 'static,
198+
{
199+
let mut count = 0;
200+
while let Some(cqe) = self.poll_ring()? {
201+
self.pending.push_back(cqe);
202+
count += 1;
203+
}
204+
Ok(count)
205+
}
206+
207+
/// Poll one completion directly from the ring (bypassing pending buffer).
208+
fn poll_ring(&mut self) -> Result<Option<RecvCompletion>, VirtqError>
170209
where
171210
M: Send + 'static,
172211
P: Send + 'static,
@@ -363,6 +402,7 @@ where
363402
self.inner.reset();
364403
self.pool.reset();
365404
self.inflight.fill(None);
405+
self.pending.clear();
366406
}
367407
}
368408

0 commit comments

Comments
 (0)