33 * Apache License Version 2.0 https://jooby.io/LICENSE.txt
44 * Copyright 2014 Edgar Espina
55 */
6- package io .jooby .grpc ;
6+ package io .jooby .internal . grpc ;
77
88import java .io .ByteArrayInputStream ;
99import java .io .IOException ;
1616import org .slf4j .Logger ;
1717import org .slf4j .LoggerFactory ;
1818
19+ import edu .umd .cs .findbugs .annotations .NonNull ;
1920import io .grpc .CallOptions ;
20- import io .grpc .ClientCall ;
2121import io .grpc .ManagedChannel ;
2222import io .grpc .MethodDescriptor ;
2323import io .grpc .Status ;
2727import io .jooby .GrpcExchange ;
2828import io .jooby .GrpcProcessor ;
2929
30- public class UnifiedGrpcBridge implements GrpcProcessor {
30+ public class DefaultGrpcProcessor implements GrpcProcessor {
3131
3232 // Minimal Marshaller to pass raw bytes through the bridge
3333 private static class RawMarshaller implements MethodDescriptor .Marshaller <byte []> {
@@ -48,24 +48,39 @@ public byte[] parse(InputStream stream) {
4848
4949 private final Logger log = LoggerFactory .getLogger (getClass ());
5050 private final ManagedChannel channel ;
51- private final GrpcMethodRegistry methodRegistry ;
51+ private final Map < String , MethodDescriptor <?, ?>> registry ;
5252
53- public UnifiedGrpcBridge (ManagedChannel channel , GrpcMethodRegistry methodRegistry ) {
53+ public DefaultGrpcProcessor (
54+ ManagedChannel channel , Map <String , MethodDescriptor <?, ?>> registry ) {
5455 this .channel = channel ;
55- this .methodRegistry = methodRegistry ;
56+ this .registry = registry ;
5657 }
5758
5859 @ Override
59- public Flow .Subscriber <ByteBuffer > process (GrpcExchange exchange ) {
60+ public boolean isGrpcMethod (String path ) {
61+ // gRPC paths typically come in as "/package.Service/Method"
62+ // Our registry stores them as "package.Service/Method"
63+ String methodName = path .startsWith ("/" ) ? path .substring (1 ) : path ;
64+
65+ // Quick O(1) hash map lookup
66+ return registry .get (methodName ) != null ;
67+ }
68+
69+ @ Override
70+ public @ NonNull Flow .Subscriber <ByteBuffer > process (@ NonNull GrpcExchange exchange ) {
6071 // Route paths: /{package.Service}/{Method}
6172 String path = exchange .getRequestPath ();
6273 // Remove the leading slash to match the gRPC method registry format
63- var descriptor = methodRegistry .get (path .substring (1 ));
74+ var descriptor = registry .get (path .substring (1 ));
6475
6576 if (descriptor == null ) {
66- log .warn ("Method not found in bridge registry: {}" , path );
67- exchange .close (Status .UNIMPLEMENTED .getCode ().value (), "Method not found" );
68- return null ;
77+ // MUST never occur, it is guarded by {@link #isGrpcMethod}
78+ throw new IllegalStateException (
79+ "Unregistered gRPC method: '"
80+ + path
81+ + "'. "
82+ + "This request bypassed the GrpcProcessor.isGrpcMethod() guard, "
83+ + "indicating a bug or misconfiguration in the native server interceptor." );
6984 }
7085
7186 var method =
@@ -76,25 +91,24 @@ public Flow.Subscriber<ByteBuffer> process(GrpcExchange exchange) {
7691 .setResponseMarshaller (new RawMarshaller ())
7792 .build ();
7893
79- CallOptions callOptions = extractCallOptions (exchange );
80- io . grpc . Metadata metadata = extractMetadata (exchange );
94+ var callOptions = extractCallOptions (exchange );
95+ var metadata = extractMetadata (exchange );
8196
82- io . grpc . Channel interceptedChannel =
97+ var interceptedChannel =
8398 io .grpc .ClientInterceptors .intercept (
8499 channel , io .grpc .stub .MetadataUtils .newAttachHeadersInterceptor (metadata ));
85100
86- ClientCall < byte [], byte []> call = interceptedChannel .newCall (method , callOptions );
87- AtomicBoolean isFinished = new AtomicBoolean (false );
101+ var call = interceptedChannel .newCall (method , callOptions );
102+ var isFinished = new AtomicBoolean (false );
88103
89104 boolean isUnaryOrServerStreaming =
90105 method .getType () == MethodDescriptor .MethodType .UNARY
91106 || method .getType () == MethodDescriptor .MethodType .SERVER_STREAMING ;
92107
93- // 1. Create the effectively final bridge
94- GrpcRequestBridge requestBridge = new GrpcRequestBridge (call , method .getType ());
108+ var requestBridge = new GrpcRequestBridge (call , method .getType ());
95109
96- ClientResponseObserver < byte [], byte []> responseObserver =
97- new ClientResponseObserver <>() {
110+ var responseObserver =
111+ new ClientResponseObserver <byte [], byte [] >() {
98112
99113 @ Override
100114 public void beforeStart (ClientCallStreamObserver <byte []> requestStream ) {
@@ -158,10 +172,10 @@ private CallOptions extractCallOptions(GrpcExchange exchange) {
158172 }
159173
160174 try {
161- char unit = timeout .charAt (timeout .length () - 1 );
162- long value = Long .parseLong (timeout .substring (0 , timeout .length () - 1 ));
175+ var unit = timeout .charAt (timeout .length () - 1 );
176+ var value = Long .parseLong (timeout .substring (0 , timeout .length () - 1 ));
163177
164- java . util . concurrent . TimeUnit timeUnit =
178+ var timeUnit =
165179 switch (unit ) {
166180 case 'H' -> java .util .concurrent .TimeUnit .HOURS ;
167181 case 'M' -> java .util .concurrent .TimeUnit .MINUTES ;
@@ -184,10 +198,10 @@ private CallOptions extractCallOptions(GrpcExchange exchange) {
184198
185199 /** Maps standard HTTP headers from the GrpcExchange into gRPC Metadata. */
186200 private io .grpc .Metadata extractMetadata (GrpcExchange exchange ) {
187- io . grpc . Metadata metadata = new io .grpc .Metadata ();
201+ var metadata = new io .grpc .Metadata ();
188202
189- for (Map . Entry < String , String > header : exchange .getHeaders ().entrySet ()) {
190- String key = header .getKey ().toLowerCase ();
203+ for (var header : exchange .getHeaders ().entrySet ()) {
204+ var key = header .getKey ().toLowerCase ();
191205
192206 if (key .startsWith (":" )
193207 || key .startsWith ("grpc-" )
@@ -212,7 +226,7 @@ private io.grpc.Metadata extractMetadata(GrpcExchange exchange) {
212226
213227 /** Prepends the 5-byte gRPC header and returns a ready-to-write ByteBuffer. */
214228 private ByteBuffer addGrpcHeader (byte [] payload ) {
215- ByteBuffer buffer = ByteBuffer .allocate (5 + payload .length );
229+ var buffer = ByteBuffer .allocate (5 + payload .length );
216230 buffer .put ((byte ) 0 ); // Compressed flag (0 = none)
217231 buffer .putInt (payload .length );
218232 buffer .put (payload );
0 commit comments