Skip to content

Commit d8b0343

Browse files
[release-1.9] 🌱 inmemory: fix watch to continue serving based on resourceVersion parameter (#11710)
* fix resourceVersion to increment across a tracker * inmemory: implement continuing watch based on resourceVersion * test fixes * inmemory: add some debug log lines for watches * stream all events if resourceVersion is not given * fixes * set proper resourceVersion on list calls * fixup tests * another fixes * review fixes * fixup --------- Co-authored-by: Christian Schlotter <[email protected]>
1 parent 508792d commit d8b0343

File tree

6 files changed

+140
-35
lines changed

6 files changed

+140
-35
lines changed

test/infrastructure/inmemory/pkg/runtime/cache/cache.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ type resourceGroupTracker struct {
8989
lock sync.RWMutex
9090
objects map[schema.GroupVersionKind]map[types.NamespacedName]client.Object
9191
// ownedObjects tracks ownership. Key is the owner, values are the owned objects.
92-
ownedObjects map[ownReference]map[ownReference]struct{}
92+
ownedObjects map[ownReference]map[ownReference]struct{}
93+
lastResourceVersion uint64
9394
}
9495

9596
type ownReference struct {
@@ -159,8 +160,9 @@ func (c *cache) AddResourceGroup(name string) {
159160
return
160161
}
161162
c.resourceGroups[name] = &resourceGroupTracker{
162-
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
163-
ownedObjects: map[ownReference]map[ownReference]struct{}{},
163+
objects: map[schema.GroupVersionKind]map[types.NamespacedName]client.Object{},
164+
ownedObjects: map[ownReference]map[ownReference]struct{}{},
165+
lastResourceVersion: 0,
164166
}
165167
}
166168

test/infrastructure/inmemory/pkg/runtime/cache/client.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien
145145
if err := meta.SetList(list, items); err != nil {
146146
return apierrors.NewInternalError(err)
147147
}
148+
149+
list.SetResourceVersion(fmt.Sprintf("%d", tracker.lastResourceVersion))
150+
148151
return nil
149152
}
150153

@@ -212,7 +215,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
212215
return apierrors.NewConflict(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String(), fmt.Errorf("object has been modified"))
213216
}
214217

215-
c.beforeUpdate(resourceGroup, trackedObj, obj)
218+
c.beforeUpdate(resourceGroup, trackedObj, obj, &tracker.lastResourceVersion)
216219

217220
tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
218221
updateTrackerOwnerReferences(tracker, trackedObj, obj, objRef)
@@ -226,7 +229,7 @@ func (c *cache) store(resourceGroup string, obj client.Object, replaceExisting b
226229
return apierrors.NewNotFound(unsafeGuessGroupVersionResource(objGVK).GroupResource(), objKey.String())
227230
}
228231

229-
c.beforeCreate(resourceGroup, obj)
232+
c.beforeCreate(resourceGroup, obj, &tracker.lastResourceVersion)
230233

231234
tracker.objects[objGVK][objKey] = obj.DeepCopyObject().(client.Object)
232235
updateTrackerOwnerReferences(tracker, nil, obj, objRef)
@@ -422,7 +425,7 @@ func (c *cache) doTryDeleteLocked(resourceGroup string, tracker *resourceGroupTr
422425
oldObj := obj.DeepCopyObject().(client.Object)
423426
now := metav1.Time{Time: time.Now().UTC()}
424427
obj.SetDeletionTimestamp(&now)
425-
c.beforeUpdate(resourceGroup, oldObj, obj)
428+
c.beforeUpdate(resourceGroup, oldObj, obj, &tracker.lastResourceVersion)
426429

427430
objects[objKey] = obj
428431
c.afterUpdate(resourceGroup, oldObj, obj)

test/infrastructure/inmemory/pkg/runtime/cache/client_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func Test_cache_client(t *testing.T) {
104104
r := c.resourceGroups["foo"].objects[cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)][key]
105105
g.Expect(r.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
106106
g.Expect(r.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
107-
g.Expect(r.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
107+
g.Expect(r.GetResourceVersion()).To(Equal("1"), "resourceVersion must be set")
108108
g.Expect(r.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
109109
g.Expect(r.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must exists")
110110

@@ -271,7 +271,7 @@ func Test_cache_client(t *testing.T) {
271271
// Check all the computed fields are as expected.
272272
g.Expect(obj.GetObjectKind().GroupVersionKind()).To(BeComparableTo(cloudv1.GroupVersion.WithKind(cloudv1.CloudMachineKind)), "gvk must be set")
273273
g.Expect(obj.GetName()).To(Equal("bar"), "name must be equal to object tracker key")
274-
g.Expect(obj.GetResourceVersion()).To(Equal("v1"), "resourceVersion must be set")
274+
g.Expect(obj.GetResourceVersion()).To(Equal("2"), "resourceVersion must be set")
275275
g.Expect(obj.GetCreationTimestamp()).ToNot(BeZero(), "creation timestamp must be set")
276276
g.Expect(obj.GetAnnotations()).To(HaveKey(lastSyncTimeAnnotation), "last sync annotation must be set")
277277
})
@@ -435,6 +435,8 @@ func Test_cache_client(t *testing.T) {
435435
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objUpdate.GetResourceVersion()), "Object version must be changed")
436436
objBefore.SetResourceVersion(objUpdate.GetResourceVersion())
437437
objBefore.Labels = objUpdate.Labels
438+
g.Expect(objUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
439+
objBefore.Generation = objUpdate.GetGeneration()
438440
g.Expect(objBefore).To(BeComparableTo(objUpdate), "everything else must be the same")
439441

440442
g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Updated"))
@@ -670,6 +672,8 @@ func Test_cache_client(t *testing.T) {
670672
g.Expect(objBefore.GetResourceVersion()).ToNot(Equal(objAfterUpdate.GetResourceVersion()), "Object version must be changed")
671673
objBefore.SetResourceVersion(objAfterUpdate.GetResourceVersion())
672674
objBefore.Labels = objAfterUpdate.Labels
675+
g.Expect(objAfterUpdate.GetGeneration()).To(Equal(objBefore.GetGeneration()+1), "Object Generation must increment")
676+
objBefore.Generation = objAfterUpdate.GetGeneration()
673677
g.Expect(objBefore).To(BeComparableTo(objAfterUpdate), "everything else must be the same")
674678

675679
g.Expect(h.Events()).To(ContainElement("foo, CloudMachine=baz, Deleted"))

test/infrastructure/inmemory/pkg/runtime/cache/hooks.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,30 @@ package cache
1919
import (
2020
"fmt"
2121
"reflect"
22-
"strconv"
23-
"strings"
2422
"time"
2523

2624
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2725
"sigs.k8s.io/controller-runtime/pkg/client"
2826
)
2927

30-
func (c *cache) beforeCreate(_ string, obj client.Object) {
28+
func (c *cache) beforeCreate(_ string, obj client.Object, resourceVersion *uint64) {
3129
now := time.Now().UTC()
3230
obj.SetCreationTimestamp(metav1.Time{Time: now})
3331
// TODO: UID
3432
obj.SetAnnotations(appendAnnotations(obj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))
35-
obj.SetResourceVersion(fmt.Sprintf("v%d", 1))
33+
*resourceVersion++
34+
obj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
35+
obj.SetGeneration(1)
3636
}
3737

3838
func (c *cache) afterCreate(resourceGroup string, obj client.Object) {
3939
c.informCreate(resourceGroup, obj)
4040
}
4141

42-
func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
42+
func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object, resourceVersion *uint64) {
4343
newObj.SetCreationTimestamp(oldObj.GetCreationTimestamp())
4444
newObj.SetResourceVersion(oldObj.GetResourceVersion())
45+
newObj.SetGeneration(oldObj.GetGeneration())
4546
// TODO: UID
4647
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, oldObj.GetAnnotations()[lastSyncTimeAnnotation]))
4748
if !oldObj.GetDeletionTimestamp().IsZero() {
@@ -51,8 +52,9 @@ func (c *cache) beforeUpdate(_ string, oldObj, newObj client.Object) {
5152
now := time.Now().UTC()
5253
newObj.SetAnnotations(appendAnnotations(newObj, lastSyncTimeAnnotation, now.Format(time.RFC3339)))
5354

54-
oldResourceVersion, _ := strconv.Atoi(strings.TrimPrefix(oldObj.GetResourceVersion(), "v"))
55-
newObj.SetResourceVersion(fmt.Sprintf("v%d", oldResourceVersion+1))
55+
*resourceVersion++
56+
newObj.SetResourceVersion(fmt.Sprintf("%d", *resourceVersion))
57+
newObj.SetGeneration(oldObj.GetGeneration() + 1)
5658
}
5759
}
5860

test/infrastructure/inmemory/pkg/server/api/handler.go

+26-14
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"sigs.k8s.io/controller-runtime/pkg/client"
5151

5252
inmemoryruntime "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime"
53+
inmemoryclient "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/runtime/client"
5354
inmemoryportforward "sigs.k8s.io/cluster-api/test/infrastructure/inmemory/pkg/server/api/portforward"
5455
)
5556

@@ -315,6 +316,25 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
315316
return
316317
}
317318

319+
h.log.V(3).Info(fmt.Sprintf("Serving List for %v", req.Request.URL), "resourceGroup", resourceGroup)
320+
321+
list, err := h.v1List(ctx, req, *gvk, inmemoryClient)
322+
if err != nil {
323+
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
324+
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
325+
return
326+
}
327+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
328+
return
329+
}
330+
331+
if err := resp.WriteEntity(list); err != nil {
332+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
333+
return
334+
}
335+
}
336+
337+
func (h *apiServerHandler) v1List(ctx context.Context, req *restful.Request, gvk schema.GroupVersionKind, inmemoryClient inmemoryclient.Client) (*unstructured.UnstructuredList, error) {
318338
// Reads and returns the requested data.
319339
list := &unstructured.UnstructuredList{}
320340
list.SetAPIVersion(gvk.GroupVersion().String())
@@ -328,33 +348,23 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
328348
// TODO: The only field Selector which works is for `spec.nodeName` on pods.
329349
fieldSelector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
330350
if err != nil {
331-
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
332-
return
351+
return nil, err
333352
}
334353
if fieldSelector != nil {
335354
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: fieldSelector})
336355
}
337356

338357
labelSelector, err := labels.Parse(req.QueryParameter("labelSelector"))
339358
if err != nil {
340-
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
341-
return
359+
return nil, err
342360
}
343361
if labelSelector != nil {
344362
listOpts = append(listOpts, client.MatchingLabelsSelector{Selector: labelSelector})
345363
}
346364
if err := inmemoryClient.List(ctx, list, listOpts...); err != nil {
347-
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
348-
_ = resp.WriteHeaderAndEntity(int(status.Status().Code), status)
349-
return
350-
}
351-
_ = resp.WriteHeaderAndEntity(http.StatusInternalServerError, err.Error())
352-
return
353-
}
354-
if err := resp.WriteEntity(list); err != nil {
355-
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
356-
return
365+
return nil, err
357366
}
367+
return list, nil
358368
}
359369

360370
func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Response) {
@@ -372,6 +382,8 @@ func (h *apiServerHandler) apiV1Watch(req *restful.Request, resp *restful.Respon
372382
return
373383
}
374384

385+
h.log.V(3).Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL), "resourceGroup", resourceGroup)
386+
375387
// If the request is a Watch handle it using watchForResource.
376388
err = h.watchForResource(req, resp, resourceGroup, *gvk)
377389
if err != nil {

test/infrastructure/inmemory/pkg/server/api/watch.go

+88-6
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@ import (
2020
"context"
2121
"fmt"
2222
"net/http"
23+
"strconv"
2324
"time"
2425

2526
"github.com/emicklei/go-restful/v3"
2627
"github.com/pkg/errors"
27-
"k8s.io/apimachinery/pkg/runtime"
2828
"k8s.io/apimachinery/pkg/runtime/schema"
2929
"k8s.io/apimachinery/pkg/watch"
30+
ctrl "sigs.k8s.io/controller-runtime"
3031
"sigs.k8s.io/controller-runtime/pkg/client"
3132
)
3233

3334
// Event records a lifecycle event for a Kubernetes object.
3435
type Event struct {
3536
Type watch.EventType `json:"type,omitempty"`
36-
Object runtime.Object `json:"object,omitempty"`
37+
Object client.Object `json:"object,omitempty"`
3738
}
3839

3940
// WatchEventDispatcher dispatches events for a single resourceGroup.
@@ -88,13 +89,15 @@ func (m *WatchEventDispatcher) OnGeneric(resourceGroup string, o client.Object)
8889

8990
func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.Response, resourceGroup string, gvk schema.GroupVersionKind) (reterr error) {
9091
ctx := req.Request.Context()
92+
log := h.log.WithValues("resourceGroup", resourceGroup, "gvk", gvk.String())
93+
ctx = ctrl.LoggerInto(ctx, log)
9194
queryTimeout := req.QueryParameter("timeoutSeconds")
95+
resourceVersion := req.QueryParameter("resourceVersion")
9296
c := h.manager.GetCache()
9397
i, err := c.GetInformerForKind(ctx, gvk)
9498
if err != nil {
9599
return err
96100
}
97-
h.log.Info(fmt.Sprintf("Serving Watch for %v", req.Request.URL))
98101
// With an unbuffered event channel RemoveEventHandler could be blocked because it requires a lock on the informer.
99102
// When Run stops reading from the channel the informer could be blocked with an unbuffered chanel and then RemoveEventHandler never goes through.
100103
// 1000 is used to avoid deadlocks in clusters with a higher number of Machines/Nodes.
@@ -115,7 +118,12 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
115118
L:
116119
for {
117120
select {
118-
case <-events:
121+
case event, ok := <-events:
122+
if !ok {
123+
// End of results.
124+
break L
125+
}
126+
log.V(4).Info("Missed event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
119127
default:
120128
break L
121129
}
@@ -124,11 +132,49 @@ func (h *apiServerHandler) watchForResource(req *restful.Request, resp *restful.
124132
// Note: After we removed the handler, no new events will be written to the events channel.
125133
}()
126134

127-
return watcher.Run(ctx, queryTimeout, resp)
135+
// Get at client to the resource group and list all relevant objects.
136+
inmemoryClient := h.manager.GetResourceGroup(resourceGroup).GetClient()
137+
list, err := h.v1List(ctx, req, gvk, inmemoryClient)
138+
if err != nil {
139+
return err
140+
}
141+
142+
// If resourceVersion was set parse to uint64 which is the representation in the simulated apiserver.
143+
var parsedResourceVersion uint64
144+
if resourceVersion != "" {
145+
parsedResourceVersion, err = strconv.ParseUint(resourceVersion, 10, 64)
146+
if err != nil {
147+
return err
148+
}
149+
}
150+
151+
initialEvents := []Event{}
152+
153+
// Loop over all items and fill the list of events with objects which have a newer resourceVersion.
154+
for _, obj := range list.Items {
155+
if resourceVersion != "" {
156+
objResourceVersion, err := strconv.ParseUint(obj.GetResourceVersion(), 10, 64)
157+
if err != nil {
158+
return err
159+
}
160+
if objResourceVersion <= parsedResourceVersion {
161+
continue
162+
}
163+
}
164+
eventType := watch.Modified
165+
// kube-apiserver emits all events as ADDED when no resourceVersion is given.
166+
if obj.GetGeneration() == 1 || resourceVersion == "" {
167+
eventType = watch.Added
168+
}
169+
initialEvents = append(initialEvents, Event{Type: eventType, Object: &obj})
170+
}
171+
172+
return watcher.Run(ctx, queryTimeout, initialEvents, list.GetResourceVersion(), resp)
128173
}
129174

130175
// Run serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
131-
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.ResponseWriter) error {
176+
func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, initialEvents []Event, initialResourceVersion string, w http.ResponseWriter) error {
177+
log := ctrl.LoggerFrom(ctx)
132178
flusher, ok := w.(http.Flusher)
133179
if !ok {
134180
return errors.New("can't start Watch: can't get http.Flusher")
@@ -139,6 +185,16 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
139185
}
140186
w.Header().Set("Transfer-Encoding", "chunked")
141187
w.WriteHeader(http.StatusOK)
188+
189+
// Write all initial events.
190+
for _, event := range initialEvents {
191+
if err := resp.WriteEntity(event); err != nil {
192+
log.Error(err, "Error writing initial event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
193+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
194+
} else {
195+
log.V(4).Info("Wrote initial event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
196+
}
197+
}
142198
flusher.Flush()
143199

144200
timeoutTimer, seconds, err := setTimer(timeout)
@@ -149,6 +205,15 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
149205
ctx, cancel := context.WithTimeout(ctx, seconds)
150206
defer cancel()
151207
defer timeoutTimer.Stop()
208+
209+
// Use the resourceVersion of the list to filter out events from the channel
210+
// which are already written above.
211+
minResourceVersion, err := strconv.ParseUint(initialResourceVersion, 10, 64)
212+
if err != nil {
213+
return err
214+
}
215+
216+
var objResourceVersion uint64
152217
for {
153218
select {
154219
case <-ctx.Done():
@@ -160,8 +225,25 @@ func (m *WatchEventDispatcher) Run(ctx context.Context, timeout string, w http.R
160225
// End of results.
161226
return nil
162227
}
228+
229+
// Parse and check if the object has a higher resource version than we allow.
230+
objResourceVersion, err = strconv.ParseUint(event.Object.GetResourceVersion(), 10, 64)
231+
if err != nil {
232+
log.Error(err, "Parsing object resource version", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
233+
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
234+
continue
235+
}
236+
237+
// Skip objects which were already written.
238+
if objResourceVersion <= minResourceVersion {
239+
continue
240+
}
241+
163242
if err := resp.WriteEntity(event); err != nil {
243+
log.Error(err, "Error writing event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
164244
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
245+
} else {
246+
log.V(4).Info("Wrote event", "eventType", event.Type, "objectName", event.Object.GetName(), "resourceVersion", event.Object.GetResourceVersion())
165247
}
166248
if len(m.events) == 0 {
167249
flusher.Flush()

0 commit comments

Comments
 (0)