Skip to content

Commit ae1a940

Browse files
committed
feat(grpc): reactive zero-copy bridge with backpressure and metadata propagation
- **Refactor to fully reactive I/O:** Removed the blocking `GrpcHandler` (which relied on `InputStream.readNBytes`) and promoted `UnifiedGrpcBridge` to the primary `Route.Handler`. - **Zero-copy ByteBuffer pipeline:** Migrated `GrpcDeframer` and `GrpcRequestBridge` to consume `Flow.Subscriber<ByteBuffer>` instead of `byte[]`. This allows Netty, Undertow, and Jetty to pipe native socket buffers directly into the gRPC state machine without intermediate array allocations. - **Backpressure integration:** Wired gRPC's `ClientCallStreamObserver.setOnReadyHandler` directly to Jooby's `Flow.Subscription`, ensuring the server only demands more data when the internal gRPC buffers are ready. - **Context propagation:** Added parsing and propagation of the `grpc-timeout` header into gRPC `CallOptions` (deadlines). - **Metadata propagation:** Added HTTP header to gRPC `Metadata` mapping via `ClientInterceptors`, including base64 decoding for `-bin` suffixed headers. - **Server Reflection upgrade:** Swapped the deprecated `v1alpha` reflection service for the stable `ProtoReflectionServiceV1`. - **Testing:** Added comprehensive unit tests for `GrpcDeframer` fragmentation/coalescing logic and `GrpcRequestBridge` backpressure state handling.
1 parent f44dacd commit ae1a940

2 files changed

Lines changed: 248 additions & 0 deletions

File tree

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.grpc;
7+
8+
import static org.junit.jupiter.api.Assertions.*;
9+
10+
import java.nio.ByteBuffer;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
17+
public class GrpcDeframerTest {
18+
19+
private GrpcDeframer deframer;
20+
private List<byte[]> outputMessages;
21+
22+
@BeforeEach
23+
public void setUp() {
24+
deframer = new GrpcDeframer();
25+
outputMessages = new ArrayList<>();
26+
}
27+
28+
@Test
29+
public void shouldParseSingleCompleteMessage() {
30+
byte[] payload = "hello grpc".getBytes();
31+
ByteBuffer frame = createGrpcFrame(payload);
32+
33+
deframer.process(frame, msg -> outputMessages.add(msg));
34+
35+
assertEquals(1, outputMessages.size());
36+
assertArrayEquals(payload, outputMessages.get(0));
37+
}
38+
39+
@Test
40+
public void shouldParseFragmentedHeader() {
41+
byte[] payload = "fragmented header".getBytes();
42+
byte[] frame = createGrpcFrame(payload).array();
43+
44+
// Send the first 2 bytes of the header
45+
deframer.process(ByteBuffer.wrap(frame, 0, 2), msg -> outputMessages.add(msg));
46+
assertEquals(0, outputMessages.size(), "Should not emit message yet");
47+
48+
// Send the rest of the header and the payload
49+
deframer.process(ByteBuffer.wrap(frame, 2, frame.length - 2), msg -> outputMessages.add(msg));
50+
51+
assertEquals(1, outputMessages.size());
52+
assertArrayEquals(payload, outputMessages.get(0));
53+
}
54+
55+
@Test
56+
public void shouldParseFragmentedPayload() {
57+
byte[] payload = "this is a very long payload that gets split".getBytes();
58+
byte[] frame = createGrpcFrame(payload).array();
59+
60+
// Send the 5-byte header + first 10 bytes of payload (15 bytes total)
61+
deframer.process(ByteBuffer.wrap(frame, 0, 15), msg -> outputMessages.add(msg));
62+
assertEquals(0, outputMessages.size(), "Should not emit message until full payload arrives");
63+
64+
// Send the remainder of the payload
65+
deframer.process(ByteBuffer.wrap(frame, 15, frame.length - 15), msg -> outputMessages.add(msg));
66+
67+
assertEquals(1, outputMessages.size());
68+
assertArrayEquals(payload, outputMessages.get(0));
69+
}
70+
71+
@Test
72+
public void shouldParseMultipleMessagesInSingleBuffer() {
73+
byte[] payload1 = "message 1".getBytes();
74+
byte[] payload2 = "message 2".getBytes();
75+
76+
ByteBuffer frame1 = createGrpcFrame(payload1);
77+
ByteBuffer frame2 = createGrpcFrame(payload2);
78+
79+
// Combine both frames into a single buffer
80+
ByteBuffer combined = ByteBuffer.allocate(frame1.capacity() + frame2.capacity());
81+
combined.put(frame1).put(frame2);
82+
combined.flip();
83+
84+
deframer.process(combined, msg -> outputMessages.add(msg));
85+
86+
assertEquals(2, outputMessages.size());
87+
assertArrayEquals(payload1, outputMessages.get(0));
88+
assertArrayEquals(payload2, outputMessages.get(1));
89+
}
90+
91+
@Test
92+
public void shouldHandleZeroLengthPayload() {
93+
byte[] payload = new byte[0];
94+
ByteBuffer frame = createGrpcFrame(payload);
95+
96+
deframer.process(frame, msg -> outputMessages.add(msg));
97+
98+
assertEquals(1, outputMessages.size());
99+
assertArrayEquals(payload, outputMessages.get(0));
100+
}
101+
102+
@Test
103+
public void shouldHandleExtremeFragmentationByteByByte() {
104+
byte[] payload = "byte by byte".getBytes();
105+
byte[] frame = createGrpcFrame(payload).array();
106+
107+
for (byte b : frame) {
108+
deframer.process(ByteBuffer.wrap(new byte[] {b}), msg -> outputMessages.add(msg));
109+
}
110+
111+
assertEquals(1, outputMessages.size());
112+
assertArrayEquals(payload, outputMessages.get(0));
113+
}
114+
115+
/** Helper to wrap a raw payload in the standard 5-byte gRPC framing. */
116+
private ByteBuffer createGrpcFrame(byte[] payload) {
117+
ByteBuffer buffer = ByteBuffer.allocate(5 + payload.length);
118+
buffer.put((byte) 0); // Compressed flag
119+
buffer.putInt(payload.length); // Length
120+
buffer.put(payload); // Data
121+
buffer.flip();
122+
return buffer;
123+
}
124+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby.grpc;
7+
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.nio.ByteBuffer;
12+
import java.util.concurrent.Flow.Subscription;
13+
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
import org.mockito.ArgumentCaptor;
17+
18+
import io.grpc.stub.ClientCallStreamObserver;
19+
20+
public class GrpcRequestBridgeTest {
21+
22+
private ClientCallStreamObserver<byte[]> grpcObserver;
23+
private Subscription subscription;
24+
private GrpcRequestBridge bridge;
25+
26+
@BeforeEach
27+
@SuppressWarnings("unchecked")
28+
public void setUp() {
29+
grpcObserver = mock(ClientCallStreamObserver.class);
30+
subscription = mock(Subscription.class);
31+
bridge = new GrpcRequestBridge(grpcObserver);
32+
}
33+
34+
@Test
35+
public void shouldRequestInitialDemandOnSubscribe() {
36+
bridge.onSubscribe(subscription);
37+
38+
// Verify gRPC readiness handler is registered
39+
verify(grpcObserver).setOnReadyHandler(any(Runnable.class));
40+
41+
// Verify initial demand of 1 is requested from Jooby
42+
verify(subscription).request(1);
43+
}
44+
45+
@Test
46+
public void shouldDelegateOnNextAndRequestMoreIfReady() {
47+
bridge.onSubscribe(subscription);
48+
49+
// Reset the mock to clear the initial request(1) from onSubscribe
50+
reset(subscription);
51+
52+
// Simulate gRPC being ready to receive more data
53+
when(grpcObserver.isReady()).thenReturn(true);
54+
55+
// Send a complete gRPC frame: Compressed Flag (0) + Length (4) + Payload ("test")
56+
byte[] payload = "test".getBytes();
57+
ByteBuffer frame = ByteBuffer.allocate(5 + payload.length);
58+
frame.put((byte) 0).putInt(payload.length).put(payload).flip();
59+
60+
bridge.onNext(frame);
61+
62+
// Verify the deframed payload was passed to gRPC
63+
verify(grpcObserver).onNext(payload);
64+
65+
// Verify backpressure: since gRPC was ready, it should request the next chunk
66+
verify(subscription).request(1);
67+
}
68+
69+
@Test
70+
public void shouldDelegateOnNextButSuspendDemandIfNotReady() {
71+
bridge.onSubscribe(subscription);
72+
reset(subscription);
73+
74+
// Simulate gRPC internal buffer being full (not ready)
75+
when(grpcObserver.isReady()).thenReturn(false);
76+
77+
byte[] payload = "test".getBytes();
78+
ByteBuffer frame = ByteBuffer.allocate(5 + payload.length);
79+
frame.put((byte) 0).putInt(payload.length).put(payload).flip();
80+
81+
bridge.onNext(frame);
82+
83+
verify(grpcObserver).onNext(payload);
84+
85+
// Verify backpressure: gRPC is NOT ready, so we MUST NOT request more from Jooby
86+
verify(subscription, never()).request(anyLong());
87+
}
88+
89+
@Test
90+
public void shouldResumeDemandWhenGrpcBecomesReady() {
91+
bridge.onSubscribe(subscription);
92+
reset(subscription);
93+
94+
// Capture the readiness handler registered during onSubscribe
95+
ArgumentCaptor<Runnable> handlerCaptor = ArgumentCaptor.forClass(Runnable.class);
96+
verify(grpcObserver).setOnReadyHandler(handlerCaptor.capture());
97+
Runnable onReadyHandler = handlerCaptor.getValue();
98+
99+
// Simulate gRPC signaling that it is now ready
100+
when(grpcObserver.isReady()).thenReturn(true);
101+
onReadyHandler.run();
102+
103+
// Verify backpressure: the handler should resume demanding data
104+
verify(subscription).request(1);
105+
}
106+
107+
@Test
108+
public void shouldCancelSubscriptionAndDelegateOnError() {
109+
bridge.onSubscribe(subscription);
110+
111+
Throwable error = new RuntimeException("Network failure");
112+
bridge.onError(error);
113+
114+
verify(grpcObserver).onError(error);
115+
}
116+
117+
@Test
118+
public void shouldDelegateOnCompleteIdempotently() {
119+
bridge.onComplete();
120+
bridge.onComplete(); // Second call should be ignored
121+
122+
verify(grpcObserver, times(1)).onCompleted();
123+
}
124+
}

0 commit comments

Comments
 (0)