Skip to content

Commit 6770ac5

Browse files
committed
WIP: gRpc: works for jetty, almost work for undertow
- grpcurl list doesn't work for undertow (sadly)
1 parent 6295db7 commit 6770ac5

12 files changed

Lines changed: 623 additions & 593 deletions

File tree

dump.txt

Lines changed: 0 additions & 423 deletions
Large diffs are not rendered by default.

modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcRequestBridge.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public void onNext(byte[] item) {
5555

5656
log.info("asking for more request(1)");
5757
internalObserver.request(1);
58+
// subscription.request(1);
5859
} catch (Throwable t) {
5960
subscription.cancel();
6061
internalObserver.onError(t);

modules/jooby-grpc/src/main/java/io/jooby/grpc/UnifiedGrpcBridge.java

Lines changed: 54 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public StreamObserver<byte[]> startCall(Context ctx) {
7171
var descriptor = methodRegistry.get(path.substring(1));
7272
if (descriptor == null) {
7373
terminateWithStatus(
74-
null,
74+
ctx,
7575
Status.UNIMPLEMENTED.withDescription("Method not found in bridge registry: " + path));
7676
return null;
7777
}
@@ -90,94 +90,55 @@ public StreamObserver<byte[]> startCall(Context ctx) {
9090

9191
ClientResponseObserver<byte[], byte[]> responseObserver;
9292
log.info("method type: {}", method.getType());
93-
if (method.getType() == MethodDescriptor.MethodType.UNARY) {
94-
// Atomic guard to prevent multiple terminal calls
95-
var isFinished = new AtomicBoolean(false);
96-
// 3. Unified Response Observer (Handles data coming BACK from the server)
97-
responseObserver =
98-
new ClientResponseObserver<>() {
99-
@Override
100-
public void beforeStart(ClientCallStreamObserver<byte[]> requestStream) {
101-
requestStream.disableAutoInboundFlowControl();
102-
}
103-
104-
@Override
105-
public void onNext(byte[] value) {
106-
if (isFinished.get()) return;
107-
log.info("onNext Send {}", HexFormat.of().formatHex(value));
108-
109-
// Professional Framing: 5-byte header + payload
110-
ctx.setResponseTrailer("grpc-status", "0");
111-
byte[] framed = addGrpcHeader(value);
112-
ctx.send(framed);
113-
}
114-
115-
@Override
116-
public void onError(Throwable t) {
117-
if (isFinished.compareAndSet(false, true)) {
118-
log.info(" error", t);
119-
terminateWithStatus(ctx, Status.fromThrowable(t));
120-
}
121-
}
122-
123-
@Override
124-
public void onCompleted() {
125-
if (isFinished.compareAndSet(false, true)) {
126-
log.info("onCompleted");
127-
terminateWithStatus(ctx, Status.OK);
128-
}
129-
}
130-
};
131-
} else {
132-
var sender = ctx.responseSender(false);
133-
// Atomic guard to prevent multiple terminal calls
134-
var isFinished = new AtomicBoolean(false);
135-
// 3. Unified Response Observer (Handles data coming BACK from the server)
136-
responseObserver =
137-
new ClientResponseObserver<>() {
138-
@Override
139-
public void beforeStart(ClientCallStreamObserver<byte[]> requestStream) {
140-
requestStream.disableAutoInboundFlowControl();
141-
}
142-
143-
@Override
144-
public void onNext(byte[] value) {
145-
if (isFinished.get()) return;
146-
log.info("onNext Send {}", HexFormat.of().formatHex(value));
147-
148-
// Professional Framing: 5-byte header + payload
149-
sender.setTrailer("grpc-status", "0");
150-
byte[] framed = addGrpcHeader(value);
151-
sender.write(
152-
framed,
153-
new Sender.Callback() {
154-
@Override
155-
public void onComplete(@NonNull Context ctx, @Nullable Throwable cause) {
156-
log.info("onNext Sent {}", ctx);
157-
if (cause != null) {
158-
onError(cause);
159-
}
93+
var sender = ctx.responseSender(false);
94+
// Atomic guard to prevent multiple terminal calls
95+
var isFinished = new AtomicBoolean(false);
96+
sender.setTrailer("grpc-status", "0");
97+
// 3. Unified Response Observer (Handles data coming BACK from the server)
98+
responseObserver =
99+
new ClientResponseObserver<>() {
100+
@Override
101+
public void beforeStart(ClientCallStreamObserver<byte[]> requestStream) {
102+
requestStream.disableAutoInboundFlowControl();
103+
}
104+
105+
@Override
106+
public void onNext(byte[] value) {
107+
if (isFinished.get()) return;
108+
log.info("onNext Send {}", HexFormat.of().formatHex(value));
109+
110+
// Professional Framing: 5-byte header + payload
111+
112+
byte[] framed = addGrpcHeader(value);
113+
sender.write(
114+
framed,
115+
new Sender.Callback() {
116+
@Override
117+
public void onComplete(@NonNull Context ctx, @Nullable Throwable cause) {
118+
log.info("onNext Sent {}", ctx);
119+
if (cause != null) {
120+
onError(cause);
160121
}
161-
});
162-
}
163-
164-
@Override
165-
public void onError(Throwable t) {
166-
if (isFinished.compareAndSet(false, true)) {
167-
log.info(" error", t);
168-
terminateWithStatus(ctx, Status.fromThrowable(t));
169-
}
122+
}
123+
});
124+
}
125+
126+
@Override
127+
public void onError(Throwable t) {
128+
if (isFinished.compareAndSet(false, true)) {
129+
log.info(" error", t);
130+
terminateWithStatus(sender, Status.fromThrowable(t));
170131
}
132+
}
171133

172-
@Override
173-
public void onCompleted() {
174-
if (isFinished.compareAndSet(false, true)) {
175-
log.info("onCompleted");
176-
terminateWithStatus(ctx, Status.OK);
177-
}
134+
@Override
135+
public void onCompleted() {
136+
if (isFinished.compareAndSet(false, true)) {
137+
log.info("onCompleted");
138+
terminateWithStatus(sender, Status.OK);
178139
}
179-
};
180-
}
140+
}
141+
};
181142

182143
// 4. Map gRPC Method Type to the correct ClientCalls utility
183144
return switch (method.getType()) {
@@ -228,6 +189,14 @@ public void onCompleted() {
228189
};
229190
}
230191

192+
private void terminateWithStatus(Sender ctx, Status status) {
193+
ctx.setTrailer("grpc-status", String.valueOf(status.getCode().value()));
194+
if (status.getDescription() != null) {
195+
ctx.setTrailer("grpc-message", status.getDescription());
196+
}
197+
ctx.close();
198+
}
199+
231200
/**
232201
* Professional Status Termination. Sets gRPC trailers and closes the Jetty response correctly.
233202
*/

modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettyRequestPublisher.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ private void process(String call) {
8787
buffer.get(bytes);
8888

8989
log.info("{}- byte read: {}", call, HexFormat.of().formatHex(bytes));
90-
// demand.decrementAndGet();
9190
subscriber.onNext(bytes);
9291
}
9392
chunk.release();

modules/jooby-jetty/src/main/java/io/jooby/internal/jetty/JettySender.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,25 @@ public Sender write(@NonNull Output output, @NonNull Callback callback) {
4848
}
4949

5050
public Sender write(@NonNull ByteBuffer buffer, @NonNull Callback callback) {
51-
response.write(false, buffer, toJettyCallback(ctx, callback));
51+
if (trailers != null) {
52+
var copy = HttpFields.build(trailers);
53+
response.setTrailersSupplier(() -> copy);
54+
this.trailers = null;
55+
}
56+
response.write(
57+
false,
58+
buffer,
59+
new org.eclipse.jetty.util.Callback() {
60+
@Override
61+
public void succeeded() {
62+
org.eclipse.jetty.util.Callback.super.succeeded();
63+
}
64+
65+
@Override
66+
public void failed(Throwable x) {
67+
org.eclipse.jetty.util.Callback.super.failed(x);
68+
}
69+
});
5270
// if (trailers == null) {
5371
// response.write(false, buffer, toJettyCallback(ctx, callback));
5472
// } else {
@@ -63,10 +81,23 @@ public Sender write(@NonNull ByteBuffer buffer, @NonNull Callback callback) {
6381

6482
@Override
6583
public void close() {
66-
if (trailers != null) {
67-
response.setTrailersSupplier(() -> trailers);
68-
response.write(true, null, ctx);
69-
}
84+
// if (trailers != null) {
85+
// response.setTrailersSupplier(() -> trailers);
86+
// response.write(true, null, new org.eclipse.jetty.util.Callback() {
87+
// @Override
88+
// public void succeeded() {
89+
// System.out.println("Succeed");
90+
// }
91+
//
92+
// @Override
93+
// public void failed(Throwable throwable) {
94+
// System.out.println("Failed");
95+
// throwable.printStackTrace();
96+
// }
97+
// });
98+
// } else {
99+
response.write(true, null, ctx);
100+
// }
70101
// if (pending != null) {
71102
// response.setTrailersSupplier(() -> trailers);
72103
// response.write(true, pending, ctx);

modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ public Context send(@NonNull ByteBuffer[] data) {
487487
public Context send(@NonNull ByteBuffer data) {
488488
ifUnDispatch(data);
489489
exchange.setResponseContentLength(data.remaining());
490-
exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, Long.toString(data.remaining()));
490+
// exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, Long.toString(data.remaining()));
491491
exchange.getResponseSender().send(data, this);
492492
return this;
493493
}

modules/jooby-undertow/src/main/java/io/jooby/internal/undertow/UndertowHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
6161
.get(Headers.CONTENT_TYPE)
6262
.getFirst()
6363
.contains("application/grpc")) {
64-
// var route = router.match(context);
65-
// context.setRoute(route.route());
6664
var subscriber = router.require(ServiceKey.key(Function.class, "gRPC"));
6765
new UndertowGrpcHandler(this, router, bufferSize, subscriber).handleRequest(exchange);
6866
return;

0 commit comments

Comments
 (0)