6
6
package kubejwt
7
7
8
8
import (
9
+ "context"
9
10
"fmt"
10
11
"strings"
11
12
13
+ discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
12
14
"google.golang.org/grpc"
13
15
"google.golang.org/grpc/metadata"
14
16
"k8s.io/client-go/kubernetes"
@@ -34,33 +36,56 @@ func NewJWTAuthInterceptor(clientset *kubernetes.Clientset, issuer, audience str
34
36
}
35
37
}
36
38
37
- // Stream intercepts streaming gRPC calls for authentication.
38
- func (i * JWTAuthInterceptor ) Stream () grpc.StreamServerInterceptor {
39
- return func (srv any , ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
40
- if err := i .authorize (ss ); err != nil {
41
- return err
42
- }
43
- return handler (srv , ss )
44
- }
39
+ type wrappedStream struct {
40
+ grpc.ServerStream
41
+ ctx context.Context
42
+ interceptor * JWTAuthInterceptor
43
+ validated bool
45
44
}
46
45
47
- // authorize validates the Kubernetes Service Account JWT token from the metadata.
48
- func (i * JWTAuthInterceptor ) authorize (ss grpc.ServerStream ) error {
49
- md , ok := metadata .FromIncomingContext (ss .Context ())
50
- if ! ok {
51
- return fmt .Errorf ("missing metadata" )
46
+ func (w * wrappedStream ) RecvMsg (m any ) error {
47
+ err := w .ServerStream .RecvMsg (m )
48
+ if err != nil {
49
+ return err
52
50
}
53
51
54
- authHeader , exists := md ["authorization" ]
55
- if ! exists || len (authHeader ) == 0 {
56
- return fmt .Errorf ("missing authorization token in metadata: %s" , md )
57
- }
58
- tokenStr := strings .TrimPrefix (authHeader [0 ], "Bearer " )
52
+ if ! w .validated {
53
+ if req , ok := m .(* discoveryv3.DeltaDiscoveryRequest ); ok {
54
+ if req .Node == nil || req .Node .Id == "" {
55
+ return fmt .Errorf ("missing node ID in request" )
56
+ }
57
+ nodeID := req .Node .Id
59
58
60
- err := i .validateKubeJWT (ss .Context (), tokenStr )
61
- if err != nil {
62
- return fmt .Errorf ("failed to validate token: %w" , err )
59
+ md , ok := metadata .FromIncomingContext (w .ctx )
60
+ if ! ok {
61
+ return fmt .Errorf ("missing metadata" )
62
+ }
63
+
64
+ authHeader := md .Get ("authorization" )
65
+ if len (authHeader ) == 0 {
66
+ return fmt .Errorf ("missing authorization token in metadata: %s" , md )
67
+ }
68
+ token := strings .TrimPrefix (authHeader [0 ], "Bearer " )
69
+
70
+ if err := w .interceptor .validateKubeJWT (w .ctx , token , nodeID ); err != nil {
71
+ return fmt .Errorf ("failed to validate token: %w" , err )
72
+ }
73
+
74
+ w .validated = true
75
+ }
63
76
}
64
77
65
78
return nil
66
79
}
80
+
81
+ func newWrappedStream (s grpc.ServerStream , ctx context.Context , interceptor * JWTAuthInterceptor ) grpc.ServerStream {
82
+ return & wrappedStream {s , ctx , interceptor , false }
83
+ }
84
+
85
+ // Stream intercepts streaming gRPC calls for authorization.
86
+ func (i * JWTAuthInterceptor ) Stream () grpc.StreamServerInterceptor {
87
+ return func (srv any , ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
88
+ wrapped := newWrappedStream (ss , ss .Context (), i )
89
+ return handler (srv , wrapped )
90
+ }
91
+ }
0 commit comments