Skip to content

Commit 23f9d07

Browse files
committed
use watch instead of polling
Signed-off-by: itspooya <[email protected]>
1 parent 06bc039 commit 23f9d07

File tree

4 files changed

+505
-103
lines changed

4 files changed

+505
-103
lines changed

README.md

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ and was created for lack of an alternative.
1010

1111
## How it works
1212

13-
The controller simply gets all Pod Disruption Budgets for each namespace and
14-
compares them to Deployments and StatefulSets. For any resource with more than
15-
1 replica and no matching Pod Disruption Budget, a default PDB will be created:
13+
The controller uses Kubernetes informers and watch functionality to detect changes in Deployments, StatefulSets and PodDisruptionBudgets. It automatically gets all Pod Disruption Budgets for each namespace and compares them to Deployments and StatefulSets. For any resource with more than 1 replica and no matching Pod Disruption Budget, a default PDB will be created:
1614

1715
```yaml
1816
apiVersion: policy/v1beta1
@@ -70,10 +68,6 @@ Deploy it by running:
7068
$ kubectl apply -f docs/deployment.yaml
7169
```
7270

73-
## TODO
74-
75-
* [ ] Instead of long polling, add a Watch feature.
76-
7771
## LICENSE
7872

7973
See [LICENSE](LICENSE) file.

controller.go

Lines changed: 191 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ import (
77
"time"
88

99
log "github.com/sirupsen/logrus"
10+
appsv1 "k8s.io/api/apps/v1"
1011
v1 "k8s.io/api/core/v1"
1112
pv1 "k8s.io/api/policy/v1"
1213
"k8s.io/apimachinery/pkg/api/equality"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime"
1416
"k8s.io/apimachinery/pkg/util/intstr"
17+
"k8s.io/apimachinery/pkg/watch"
1518
"k8s.io/client-go/kubernetes"
19+
"k8s.io/client-go/tools/cache"
1620
"k8s.io/client-go/util/retry"
21+
"k8s.io/client-go/util/workqueue"
1722
)
1823

1924
const (
@@ -32,81 +37,230 @@ var (
3237
// if missing.
3338
type PDBController struct {
3439
kubernetes.Interface
35-
interval time.Duration
36-
pdbNameSuffix string
37-
nonReadyTTL time.Duration
38-
parentResourceHash bool
39-
maxUnavailable intstr.IntOrString
40+
interval time.Duration // kept for backward compatibility
41+
pdbNameSuffix string
42+
nonReadyTTL time.Duration
43+
parentResourceHash bool
44+
maxUnavailable intstr.IntOrString
45+
queue workqueue.TypedRateLimitingInterface[string]
46+
deploymentInformer cache.SharedIndexInformer
47+
statefulSetInformer cache.SharedIndexInformer
48+
pdbInformer cache.SharedIndexInformer
4049
}
4150

4251
// NewPDBController initializes a new PDBController.
4352
func NewPDBController(interval time.Duration, client kubernetes.Interface, pdbNameSuffix string, nonReadyTTL time.Duration, parentResourceHash bool, maxUnavailable intstr.IntOrString) *PDBController {
44-
return &PDBController{
53+
log.Info("Initializing PDB controller with TypedRateLimitingInterface - v20250304")
54+
controller := &PDBController{
4555
Interface: client,
4656
interval: interval,
4757
pdbNameSuffix: pdbNameSuffix,
4858
nonReadyTTL: nonReadyTTL,
4959
parentResourceHash: parentResourceHash,
5060
maxUnavailable: maxUnavailable,
61+
queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
5162
}
52-
}
5363

54-
// Run runs the controller loop until it receives a stop signal over the stop
55-
// channel.
56-
func (n *PDBController) Run(ctx context.Context) {
57-
for {
58-
log.Debug("Running main control loop.")
59-
err := n.runOnce(ctx)
60-
if err != nil {
61-
log.Error(err)
62-
}
64+
// Setup Deployment informer
65+
controller.deploymentInformer = cache.NewSharedIndexInformer(
66+
&cache.ListWatch{
67+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
68+
return client.AppsV1().Deployments(v1.NamespaceAll).List(context.Background(), options)
69+
},
70+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
71+
return client.AppsV1().Deployments(v1.NamespaceAll).Watch(context.Background(), options)
72+
},
73+
},
74+
&appsv1.Deployment{},
75+
0, // resync disabled
76+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
77+
)
78+
79+
// Setup StatefulSet informer
80+
controller.statefulSetInformer = cache.NewSharedIndexInformer(
81+
&cache.ListWatch{
82+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
83+
return client.AppsV1().StatefulSets(v1.NamespaceAll).List(context.Background(), options)
84+
},
85+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
86+
return client.AppsV1().StatefulSets(v1.NamespaceAll).Watch(context.Background(), options)
87+
},
88+
},
89+
&appsv1.StatefulSet{},
90+
0, // resync disabled
91+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
92+
)
93+
94+
// Setup PodDisruptionBudget informer
95+
controller.pdbInformer = cache.NewSharedIndexInformer(
96+
&cache.ListWatch{
97+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
98+
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(context.Background(), options)
99+
},
100+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
101+
return client.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).Watch(context.Background(), options)
102+
},
103+
},
104+
&pv1.PodDisruptionBudget{},
105+
0, // resync disabled
106+
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
107+
)
108+
109+
// Setup event handlers
110+
controller.deploymentInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
111+
AddFunc: controller.enqueueResource,
112+
UpdateFunc: func(old, new interface{}) {
113+
controller.enqueueResource(new)
114+
},
115+
DeleteFunc: controller.enqueueResource,
116+
})
63117

64-
select {
65-
case <-time.After(n.interval):
66-
case <-ctx.Done():
67-
log.Info("Terminating main controller loop.")
68-
return
69-
}
70-
}
118+
controller.statefulSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
119+
AddFunc: controller.enqueueResource,
120+
UpdateFunc: func(old, new interface{}) {
121+
controller.enqueueResource(new)
122+
},
123+
DeleteFunc: controller.enqueueResource,
124+
})
125+
126+
controller.pdbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
127+
DeleteFunc: controller.enqueuePDB,
128+
})
129+
130+
return controller
71131
}
72132

73-
// runOnce runs the main reconcilation loop of the controller.
74-
func (n *PDBController) runOnce(ctx context.Context) error {
75-
allPDBs, err := n.PolicyV1().PodDisruptionBudgets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
133+
func (n *PDBController) enqueueResource(obj interface{}) {
134+
key, err := cache.MetaNamespaceKeyFunc(obj)
76135
if err != nil {
77-
return err
136+
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
137+
return
138+
}
139+
n.queue.Add(key)
140+
}
141+
142+
func (n *PDBController) enqueuePDB(obj interface{}) {
143+
pdb, ok := obj.(*pv1.PodDisruptionBudget)
144+
if !ok {
145+
log.Errorf("Expected PodDisruptionBudget but got %+v", obj)
146+
return
78147
}
79148

80-
managedPDBs, unmanagedPDBs := filterPDBs(allPDBs.Items)
149+
// Only enqueue for our managed PDBs
150+
if !containLabels(pdb.Labels, ownerLabels) {
151+
return
152+
}
81153

82-
deployments, err := n.AppsV1().Deployments(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
154+
key, err := cache.MetaNamespaceKeyFunc(obj)
83155
if err != nil {
84-
return err
156+
log.Errorf("Couldn't get key for object %+v: %v", obj, err)
157+
return
158+
}
159+
n.queue.Add(key)
160+
}
161+
162+
// Run runs the controller with the specified number of workers.
163+
func (n *PDBController) Run(ctx context.Context) {
164+
defer n.queue.ShutDown()
165+
166+
log.Info("Starting PDB controller")
167+
168+
// Start the informers
169+
go n.deploymentInformer.Run(ctx.Done())
170+
go n.statefulSetInformer.Run(ctx.Done())
171+
go n.pdbInformer.Run(ctx.Done())
172+
173+
// Wait for the caches to be synced
174+
log.Info("Waiting for informer caches to sync")
175+
if !cache.WaitForCacheSync(ctx.Done(),
176+
n.deploymentInformer.HasSynced,
177+
n.statefulSetInformer.HasSynced,
178+
n.pdbInformer.HasSynced) {
179+
log.Fatal("Failed to wait for caches to sync")
180+
return
85181
}
182+
log.Info("Informer caches synced")
86183

87-
statefulSets, err := n.AppsV1().StatefulSets(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
184+
// Run the reconcile loop
185+
go n.worker(ctx)
186+
187+
<-ctx.Done()
188+
log.Info("Shutting down PDB controller")
189+
}
190+
191+
func (n *PDBController) worker(ctx context.Context) {
192+
for n.processNextItem(ctx) {
193+
}
194+
}
195+
196+
func (n *PDBController) processNextItem(ctx context.Context) bool {
197+
key, quit := n.queue.Get()
198+
if quit {
199+
return false
200+
}
201+
defer n.queue.Done(key)
202+
203+
err := n.reconcile(ctx, key)
88204
if err != nil {
89-
return err
205+
log.Errorf("Error processing item %s: %v", key, err)
206+
n.queue.AddRateLimited(key)
207+
return true
208+
}
209+
210+
n.queue.Forget(key)
211+
return true
212+
}
213+
214+
func (n *PDBController) reconcile(ctx context.Context, key string) error {
215+
log.Debugf("Processing key: %s", key)
216+
217+
// Process all resources and PDBs
218+
return n.runOnce(ctx)
219+
}
220+
221+
// runOnce runs the main reconcilation loop of the controller.
222+
func (n *PDBController) runOnce(ctx context.Context) error {
223+
// Get all PDBs from the informer
224+
var allPDBs []pv1.PodDisruptionBudget
225+
for _, obj := range n.pdbInformer.GetStore().List() {
226+
pdb, ok := obj.(*pv1.PodDisruptionBudget)
227+
if !ok {
228+
continue
229+
}
230+
allPDBs = append(allPDBs, *pdb)
90231
}
91232

92-
resources := make([]kubeResource, 0, len(deployments.Items)+len(statefulSets.Items))
233+
managedPDBs, unmanagedPDBs := filterPDBs(allPDBs)
93234

94-
for _, d := range deployments.Items {
235+
// Get all resources from the informers
236+
resources := make([]kubeResource, 0)
237+
238+
// Process Deployments
239+
for _, obj := range n.deploymentInformer.GetStore().List() {
240+
d, ok := obj.(*appsv1.Deployment)
241+
if !ok {
242+
continue
243+
}
95244
// manually set Kind and APIVersion because of a bug in
96245
// client-go
97246
// https://github.com/kubernetes/client-go/issues/308
98247
d.Kind = "Deployment"
99248
d.APIVersion = "apps/v1"
100-
resources = append(resources, deployment{d})
249+
resources = append(resources, deployment{*d})
101250
}
102251

103-
for _, s := range statefulSets.Items {
252+
// Process StatefulSets
253+
for _, obj := range n.statefulSetInformer.GetStore().List() {
254+
s, ok := obj.(*appsv1.StatefulSet)
255+
if !ok {
256+
continue
257+
}
104258
// manually set Kind and APIVersion because of a bug in
105259
// client-go
106260
// https://github.com/kubernetes/client-go/issues/308
107261
s.Kind = "StatefulSet"
108262
s.APIVersion = "apps/v1"
109-
resources = append(resources, statefulSet{s})
263+
resources = append(resources, statefulSet{*s})
110264
}
111265

112266
desiredPDBs := n.generateDesiredPDBs(resources, managedPDBs, unmanagedPDBs)

0 commit comments

Comments
 (0)