|
60 | 60 | import io.quarkus.runtime.ShutdownContext;
|
61 | 61 | import io.quarkus.runtime.annotations.Recorder;
|
62 | 62 | import io.quarkus.vertx.http.runtime.PortSystemProperties;
|
| 63 | +import io.quarkus.vertx.http.runtime.security.HttpAuthenticator; |
63 | 64 | import io.quarkus.virtual.threads.VirtualThreadsRecorder;
|
64 | 65 | import io.vertx.core.AbstractVerticle;
|
65 | 66 | import io.vertx.core.AsyncResult;
|
@@ -99,7 +100,7 @@ public void initializeGrpcServer(RuntimeValue<Vertx> vertxSupplier,
|
99 | 100 | ShutdownContext shutdown,
|
100 | 101 | Map<String, List<String>> blockingMethodsPerService,
|
101 | 102 | Map<String, List<String>> virtualMethodsPerService,
|
102 |
| - LaunchMode launchMode, boolean securityPresent) { |
| 103 | + LaunchMode launchMode, boolean securityPresent, Map<Integer, Handler<RoutingContext>> securityHandlers) { |
103 | 104 | GrpcContainer grpcContainer = Arc.container().instance(GrpcContainer.class).get();
|
104 | 105 | if (grpcContainer == null) {
|
105 | 106 | throw new IllegalStateException("gRPC not initialized, GrpcContainer not found");
|
@@ -137,15 +138,16 @@ public void initializeGrpcServer(RuntimeValue<Vertx> vertxSupplier,
|
137 | 138 | }
|
138 | 139 | } else {
|
139 | 140 | buildGrpcServer(vertx, configuration, routerSupplier, shutdown, blockingMethodsPerService, virtualMethodsPerService,
|
140 |
| - grpcContainer, launchMode, securityPresent); |
| 141 | + grpcContainer, launchMode, securityPresent, securityHandlers); |
141 | 142 | }
|
142 | 143 | }
|
143 | 144 |
|
144 | 145 | // TODO -- handle XDS
|
145 | 146 | private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, RuntimeValue<Router> routerSupplier,
|
146 | 147 | ShutdownContext shutdown, Map<String, List<String>> blockingMethodsPerService,
|
147 | 148 | Map<String, List<String>> virtualMethodsPerService,
|
148 |
| - GrpcContainer grpcContainer, LaunchMode launchMode, boolean securityPresent) { |
| 149 | + GrpcContainer grpcContainer, LaunchMode launchMode, boolean securityPresent, |
| 150 | + Map<Integer, Handler<RoutingContext>> securityHandlers) { |
149 | 151 |
|
150 | 152 | GrpcServerOptions options = new GrpcServerOptions();
|
151 | 153 | if (!configuration.maxInboundMessageSize.isEmpty()) {
|
@@ -193,8 +195,45 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration,
|
193 | 195 |
|
194 | 196 | initHealthStorage();
|
195 | 197 |
|
| 198 | + Router router = routerSupplier.getValue(); |
| 199 | + if (securityHandlers != null) { |
| 200 | + for (Map.Entry<Integer, Handler<RoutingContext>> e : securityHandlers.entrySet()) { |
| 201 | + Handler<RoutingContext> handler = e.getValue(); |
| 202 | + Route route = router.route().order(e.getKey()).handler(new Handler<RoutingContext>() { |
| 203 | + @Override |
| 204 | + public void handle(RoutingContext ctx) { |
| 205 | + if (!isGrpc(ctx)) { |
| 206 | + ctx.next(); |
| 207 | + } else if (ctx.get(HttpAuthenticator.class.getName()) != null) { |
| 208 | + // this IF branch shouldn't be invoked with current implementation |
| 209 | + // when gRPC is attached to the main router when the root path is not '/' |
| 210 | + // because HTTP authenticator and authorizer handlers are not added by default on the main |
| 211 | + // router; adding it in case someone made changes without consider this use case |
| 212 | + // so that we prevent repeated authentication |
| 213 | + ctx.next(); |
| 214 | + } else { |
| 215 | + if (!Context.isOnEventLoopThread()) { |
| 216 | + Context capturedVertxContext = Vertx.currentContext(); |
| 217 | + if (capturedVertxContext != null) { |
| 218 | + capturedVertxContext.runOnContext(new Handler<Void>() { |
| 219 | + @Override |
| 220 | + public void handle(Void unused) { |
| 221 | + handler.handle(ctx); |
| 222 | + } |
| 223 | + }); |
| 224 | + return; |
| 225 | + } |
| 226 | + } |
| 227 | + handler.handle(ctx); |
| 228 | + } |
| 229 | + } |
| 230 | + }); |
| 231 | + shutdown.addShutdownTask(route::remove); // remove this route at shutdown, this should reset it |
| 232 | + } |
| 233 | + } |
| 234 | + |
196 | 235 | LOGGER.info("Starting new Quarkus gRPC server (using Vert.x transport)...");
|
197 |
| - Route route = routerSupplier.getValue().route().handler(ctx -> { |
| 236 | + Route route = router.route().handler(ctx -> { |
198 | 237 | if (!isGrpc(ctx)) {
|
199 | 238 | ctx.next();
|
200 | 239 | } else {
|
|
0 commit comments