6
6
package kubejwt
7
7
8
8
import (
9
- "context"
10
9
"fmt"
11
10
"strings"
12
11
13
- discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
14
12
"google.golang.org/grpc"
15
13
"google.golang.org/grpc/metadata"
16
14
"k8s.io/client-go/kubernetes"
@@ -36,56 +34,33 @@ func NewJWTAuthInterceptor(clientset *kubernetes.Clientset, issuer, audience str
36
34
}
37
35
}
38
36
39
- type wrappedStream struct {
40
- grpc.ServerStream
41
- ctx context.Context
42
- interceptor * JWTAuthInterceptor
43
- validated bool
37
+ // Stream intercepts streaming gRPC calls for authentication.
38
+ func (i * JWTAuthInterceptor ) Stream () grpc.StreamServerInterceptor {
39
+ return func (srv any , ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
40
+ if err := i .authorize (ss ); err != nil {
41
+ return err
42
+ }
43
+ return handler (srv , ss )
44
+ }
44
45
}
45
46
46
- func (w * wrappedStream ) RecvMsg (m any ) error {
47
- err := w .ServerStream .RecvMsg (m )
48
- if err != nil {
49
- return err
47
+ // authorize validates the Kubernetes Service Account JWT token from the metadata.
48
+ func (i * JWTAuthInterceptor ) authorize (ss grpc.ServerStream ) error {
49
+ md , ok := metadata .FromIncomingContext (ss .Context ())
50
+ if ! ok {
51
+ return fmt .Errorf ("missing metadata" )
50
52
}
51
53
52
- if ! w .validated {
53
- if req , ok := m .(* discoveryv3.DeltaDiscoveryRequest ); ok {
54
- if req .Node == nil || req .Node .Id == "" {
55
- return fmt .Errorf ("missing node ID in request" )
56
- }
57
- nodeID := req .Node .Id
58
-
59
- md , ok := metadata .FromIncomingContext (w .ctx )
60
- if ! ok {
61
- return fmt .Errorf ("missing metadata" )
62
- }
63
-
64
- authHeader := md .Get ("authorization" )
65
- if len (authHeader ) == 0 {
66
- return fmt .Errorf ("missing authorization token in metadata: %s" , md )
67
- }
68
- token := strings .TrimPrefix (authHeader [0 ], "Bearer " )
69
-
70
- if err := w .interceptor .validateKubeJWT (w .ctx , token , nodeID ); err != nil {
71
- return fmt .Errorf ("failed to validate token: %w" , err )
72
- }
54
+ authHeader , exists := md ["authorization" ]
55
+ if ! exists || len (authHeader ) == 0 {
56
+ return fmt .Errorf ("missing authorization token in metadata: %s" , md )
57
+ }
58
+ tokenStr := strings .TrimPrefix (authHeader [0 ], "Bearer " )
73
59
74
- w .validated = true
75
- }
60
+ err := i .validateKubeJWT (ss .Context (), tokenStr )
61
+ if err != nil {
62
+ return fmt .Errorf ("failed to validate token: %w" , err )
76
63
}
77
64
78
65
return nil
79
66
}
80
-
81
- func newWrappedStream (s grpc.ServerStream , ctx context.Context , interceptor * JWTAuthInterceptor ) grpc.ServerStream {
82
- return & wrappedStream {s , ctx , interceptor , false }
83
- }
84
-
85
- // Stream intercepts streaming gRPC calls for authorization.
86
- func (i * JWTAuthInterceptor ) Stream () grpc.StreamServerInterceptor {
87
- return func (srv any , ss grpc.ServerStream , info * grpc.StreamServerInfo , handler grpc.StreamHandler ) error {
88
- wrapped := newWrappedStream (ss , ss .Context (), i )
89
- return handler (srv , wrapped )
90
- }
91
- }
0 commit comments