Skip to content

Commit 8b66f00

Browse files
committed
use service lister instead of endpoints cache to get port from portName
Signed-off-by: Jan Wozniak <[email protected]>
1 parent 36b9348 commit 8b66f00

File tree

8 files changed

+136
-46
lines changed

8 files changed

+136
-46
lines changed

config/interceptor/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,14 @@ rules:
1212
- get
1313
- list
1414
- watch
15+
- apiGroups:
16+
- ""
17+
resources:
18+
- services
19+
verbs:
20+
- get
21+
- list
22+
- watch
1523
- apiGroups:
1624
- http.keda.sh
1725
resources:

interceptor/main.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/prometheus/client_golang/prometheus/promhttp"
1818
"golang.org/x/exp/maps"
1919
"golang.org/x/sync/errgroup"
20+
k8sinformers "k8s.io/client-go/informers"
2021
"k8s.io/client-go/kubernetes"
2122
ctrl "sigs.k8s.io/controller-runtime"
2223
"sigs.k8s.io/controller-runtime/pkg/log/zap"
@@ -42,6 +43,7 @@ var (
4243

4344
// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch
4445
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
46+
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
4547

4648
func main() {
4749
timeoutCfg := config.MustParseTimeouts()
@@ -85,11 +87,10 @@ func main() {
8587
setupLog.Error(err, "creating new Kubernetes ClientSet")
8688
os.Exit(1)
8789
}
88-
endpointsCache := k8s.NewInformerBackedEndpointsCache(
89-
ctrl.Log,
90-
cl,
91-
time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS),
92-
)
90+
91+
k8sSharedInformerFactory := k8sinformers.NewSharedInformerFactory(cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
92+
svcCache := k8s.NewInformerBackedServiceCache(ctrl.Log, cl, k8sSharedInformerFactory)
93+
endpointsCache := k8s.NewInformerBackedEndpointsCache(ctrl.Log, cl, time.Millisecond*time.Duration(servingCfg.EndpointsCachePollIntervalMS))
9394
if err != nil {
9495
setupLog.Error(err, "creating new endpoints cache")
9596
os.Exit(1)
@@ -123,6 +124,7 @@ func main() {
123124
setupLog.Info("starting the endpoints cache")
124125

125126
endpointsCache.Start(ctx)
127+
k8sSharedInformerFactory.Start(ctx.Done())
126128
return nil
127129
})
128130

@@ -173,10 +175,11 @@ func main() {
173175
eg.Go(func() error {
174176
proxyTLSConfig := map[string]string{"certificatePath": servingCfg.TLSCertPath, "keyPath": servingCfg.TLSKeyPath, "certstorePaths": servingCfg.TLSCertStorePaths}
175177
proxyTLSPort := servingCfg.TLSPort
178+
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
176179

177180
setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort)
178181

179-
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
182+
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
180183
setupLog.Error(err, "tls proxy server failed")
181184
return err
182185
}
@@ -186,9 +189,11 @@ func main() {
186189

187190
// start a proxy server without TLS.
188191
eg.Go(func() error {
192+
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
189193
setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort)
190194

191-
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
195+
k8sSharedInformerFactory.WaitForCacheSync(ctx.Done())
196+
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, svcCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
192197
setupLog.Error(err, "proxy server failed")
193198
return err
194199
}
@@ -369,7 +374,7 @@ func runProxyServer(
369374
q queue.Counter,
370375
waitFunc forwardWaitFunc,
371376
routingTable routing.Table,
372-
endpointsCache k8s.EndpointsCache,
377+
svcCache k8s.ServiceCache,
373378
timeouts *config.Timeouts,
374379
port int,
375380
tlsEnabled bool,
@@ -417,7 +422,7 @@ func runProxyServer(
417422
routingTable,
418423
probeHandler,
419424
upstreamHandler,
420-
endpointsCache,
425+
svcCache,
421426
tlsEnabled,
422427
)
423428
rootHandler = middleware.NewLogging(

interceptor/main_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
6363
// server
6464
routingTable := routingtest.NewTable()
6565
routingTable.Memory[host] = httpso
66-
endpointsCache := k8s.NewFakeEndpointsCache()
66+
svcCache := k8s.NewFakeServiceCache()
6767

6868
timeouts := &config.Timeouts{}
6969
waiterCh := make(chan struct{})
@@ -78,7 +78,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
7878
q,
7979
waitFunc,
8080
routingTable,
81-
endpointsCache,
81+
svcCache,
8282
timeouts,
8383
port,
8484
false,
@@ -196,7 +196,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
196196
// server
197197
routingTable := routingtest.NewTable()
198198
routingTable.Memory[host] = httpso
199-
endpointsCache := k8s.NewFakeEndpointsCache()
199+
svcCache := k8s.NewFakeServiceCache()
200200

201201
timeouts := &config.Timeouts{}
202202
waiterCh := make(chan struct{})
@@ -212,7 +212,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
212212
q,
213213
waitFunc,
214214
routingTable,
215-
endpointsCache,
215+
svcCache,
216216
timeouts,
217217
port,
218218
true,
@@ -343,7 +343,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
343343
// server
344344
routingTable := routingtest.NewTable()
345345
routingTable.Memory[host] = httpso
346-
endpointsCache := k8s.NewFakeEndpointsCache()
346+
svcCache := k8s.NewFakeServiceCache()
347347

348348
timeouts := &config.Timeouts{}
349349
waiterCh := make(chan struct{})
@@ -359,7 +359,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
359359
q,
360360
waitFunc,
361361
routingTable,
362-
endpointsCache,
362+
svcCache,
363363
timeouts,
364364
port,
365365
true,

interceptor/middleware/routing.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package middleware
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67
"net/url"
@@ -22,16 +23,16 @@ type Routing struct {
2223
routingTable routing.Table
2324
probeHandler http.Handler
2425
upstreamHandler http.Handler
25-
endpointsCache k8s.EndpointsCache
26+
svcCache k8s.ServiceCache
2627
tlsEnabled bool
2728
}
2829

29-
func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, endpointsCache k8s.EndpointsCache, tlsEnabled bool) *Routing {
30+
func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, svcCache k8s.ServiceCache, tlsEnabled bool) *Routing {
3031
return &Routing{
3132
routingTable: routingTable,
3233
probeHandler: probeHandler,
3334
upstreamHandler: upstreamHandler,
34-
endpointsCache: endpointsCache,
35+
svcCache: svcCache,
3536
tlsEnabled: tlsEnabled,
3637
}
3738
}
@@ -55,7 +56,7 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5556
}
5657
r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))
5758

58-
stream, err := rm.streamFromHTTPSO(httpso)
59+
stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
5960
if err != nil {
6061
sh := handler.NewStatic(http.StatusInternalServerError, err)
6162
sh.ServeHTTP(w, r)
@@ -67,29 +68,27 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6768
rm.upstreamHandler.ServeHTTP(w, r)
6869
}
6970

70-
func (rm *Routing) getPort(httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
71+
func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
7172
if httpso.Spec.ScaleTargetRef.Port != 0 {
7273
return httpso.Spec.ScaleTargetRef.Port, nil
7374
}
7475
if httpso.Spec.ScaleTargetRef.PortName == "" {
75-
return 0, fmt.Errorf("must specify either port or portName")
76+
return 0, fmt.Errorf(`must specify either "port" or "portName"`)
7677
}
77-
endpoints, err := rm.endpointsCache.Get(httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
78+
svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
7879
if err != nil {
79-
return 0, fmt.Errorf("failed to get Endpoints: %w", err)
80+
return 0, fmt.Errorf("failed to get Service: %w", err)
8081
}
81-
for _, subset := range endpoints.Subsets {
82-
for _, port := range subset.Ports {
83-
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
84-
return port.Port, nil
85-
}
82+
for _, port := range svc.Spec.Ports {
83+
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
84+
return port.Port, nil
8685
}
8786
}
88-
return 0, fmt.Errorf("portName %s not found in Endpoints", httpso.Spec.ScaleTargetRef.PortName)
87+
return 0, fmt.Errorf("portName %q not found in Service", httpso.Spec.ScaleTargetRef.PortName)
8988
}
9089

91-
func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
92-
port, err := rm.getPort(httpso)
90+
func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
91+
port, err := rm.getPort(ctx, httpso)
9392
if err != nil {
9493
return nil, fmt.Errorf("failed to get port: %w", err)
9594
}

interceptor/middleware/routing_test.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ var _ = Describe("RoutingMiddleware", func() {
2525
emptyHandler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
2626
probeHandler.Handle("/probe", emptyHandler)
2727
upstreamHandler.Handle("/upstream", emptyHandler)
28-
endpointsCache := k8s.NewFakeEndpointsCache()
28+
svcCache := k8s.NewFakeServiceCache()
2929

30-
rm := NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false)
30+
rm := NewRouting(routingTable, probeHandler, upstreamHandler, svcCache, false)
3131
Expect(rm).NotTo(BeNil())
3232
Expect(rm.routingTable).To(Equal(routingTable))
3333
Expect(rm.probeHandler).To(Equal(probeHandler))
@@ -44,7 +44,7 @@ var _ = Describe("RoutingMiddleware", func() {
4444
var (
4545
upstreamHandler *http.ServeMux
4646
probeHandler *http.ServeMux
47-
endpointsCache *k8s.FakeEndpointsCache
47+
svcCache *k8s.FakeServiceCache
4848
routingTable *routingtest.Table
4949
routingMiddleware *Routing
5050
w *httptest.ResponseRecorder
@@ -76,18 +76,16 @@ var _ = Describe("RoutingMiddleware", func() {
7676
},
7777
},
7878
}
79-
endpoints = corev1.Endpoints{
79+
svc = &corev1.Service{
8080
ObjectMeta: metav1.ObjectMeta{
8181
Name: "keda-svc",
8282
Namespace: "default",
8383
},
84-
Subsets: []corev1.EndpointSubset{
85-
{
86-
Ports: []corev1.EndpointPort{
87-
{
88-
Name: "http",
89-
Port: 80,
90-
},
84+
Spec: corev1.ServiceSpec{
85+
Ports: []corev1.ServicePort{
86+
{
87+
Name: "http",
88+
Port: 80,
9189
},
9290
},
9391
},
@@ -98,8 +96,8 @@ var _ = Describe("RoutingMiddleware", func() {
9896
upstreamHandler = http.NewServeMux()
9997
probeHandler = http.NewServeMux()
10098
routingTable = routingtest.NewTable()
101-
endpointsCache = k8s.NewFakeEndpointsCache()
102-
routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false)
99+
svcCache = k8s.NewFakeServiceCache()
100+
routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, svcCache, false)
103101

104102
w = httptest.NewRecorder()
105103

@@ -141,7 +139,7 @@ var _ = Describe("RoutingMiddleware", func() {
141139

142140
When("route is found with portName", func() {
143141
It("routes to the upstream handler", func() {
144-
endpointsCache.Set(endpoints)
142+
svcCache.Add(*svc)
145143
var (
146144
sc = http.StatusTeapot
147145
st = http.StatusText(sc)

interceptor/proxy_handlers_integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ func newHarness(
281281
},
282282
)
283283

284+
svcCache := k8s.NewFakeServiceCache()
284285
endpCache := k8s.NewFakeEndpointsCache()
285286
waitFunc := newWorkloadReplicasForwardWaitFunc(
286287
logr.Discard(),
@@ -308,7 +309,7 @@ func newHarness(
308309
respHeaderTimeout: time.Second,
309310
},
310311
&tls.Config{}),
311-
endpCache,
312+
svcCache,
312313
false,
313314
)
314315

pkg/k8s/svc_cache.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package k8s
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/go-logr/logr"
9+
v1 "k8s.io/api/core/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/client-go/informers"
12+
"k8s.io/client-go/kubernetes"
13+
listerv1 "k8s.io/client-go/listers/core/v1"
14+
)
15+
16+
// ServiceCache is an interface for caching service objects
17+
type ServiceCache interface {
18+
// Get gets a service with the given namespace and name from the cache
19+
// If the service doesn't exist in the cache, it will be fetched from the API server
20+
Get(ctx context.Context, namespace, name string) (*v1.Service, error)
21+
}
22+
23+
// InformerBackedServicesCache is a cache of services backed by a shared informer
24+
type InformerBackedServicesCache struct {
25+
lggr logr.Logger
26+
cl kubernetes.Interface
27+
svcLister listerv1.ServiceLister
28+
}
29+
30+
// FakeServiceCache is a fake implementation of a ServiceCache for testing
31+
type FakeServiceCache struct {
32+
current map[string]v1.Service
33+
mut sync.RWMutex
34+
}
35+
36+
// NewInformerBackedServiceCache creates a new InformerBackedServicesCache
37+
func NewInformerBackedServiceCache(lggr logr.Logger, cl kubernetes.Interface, factory informers.SharedInformerFactory) *InformerBackedServicesCache {
38+
return &InformerBackedServicesCache{
39+
lggr: lggr.WithName("InformerBackedServicesCache"),
40+
cl: cl,
41+
svcLister: factory.Core().V1().Services().Lister(),
42+
}
43+
}
44+
45+
// Get gets a service with the given namespace and name from the cache and as a fallback from the API server
46+
func (c *InformerBackedServicesCache) Get(ctx context.Context, namespace, name string) (*v1.Service, error) {
47+
svc, err := c.svcLister.Services(namespace).Get(name)
48+
if err == nil {
49+
c.lggr.V(1).Info("Service found in cache", "namespace", namespace, "name", name)
50+
return svc, nil
51+
}
52+
c.lggr.V(1).Info("Service not found in cache, fetching from API server", "namespace", namespace, "name", name, "error", err)
53+
return c.cl.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
54+
}
55+
56+
// NewFakeServiceCache creates a new FakeServiceCache
57+
func NewFakeServiceCache() *FakeServiceCache {
58+
return &FakeServiceCache{current: make(map[string]v1.Service)}
59+
}
60+
61+
// Get gets a service with the given namespace and name from the cache
62+
func (c *FakeServiceCache) Get(ctx context.Context, namespace, name string) (*v1.Service, error) {
63+
c.mut.RLock()
64+
defer c.mut.RUnlock()
65+
svc, ok := c.current[key(namespace, name)]
66+
if !ok {
67+
return nil, fmt.Errorf("service not found")
68+
}
69+
return &svc, nil
70+
}
71+
72+
// Add adds a service to the cache
73+
func (c *FakeServiceCache) Add(svc v1.Service) {
74+
c.mut.Lock()
75+
defer c.mut.Unlock()
76+
c.current[key(svc.Namespace, svc.Name)] = svc
77+
}

0 commit comments

Comments
 (0)