Skip to content

add pingsource finalizer #2759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions config/200-jobrunner-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,17 @@ rules:
- get
- list
- watch
- patch
- apiGroups:
- sources.knative.dev
resources:
- pingsources/finalizers
verbs:
- "patch"
- apiGroups:
- ""
resources:
- events
verbs:
- "create"
- "patch"
2 changes: 1 addition & 1 deletion pkg/reconciler/pingsource/controller/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Reconciler struct {
metricsConfig *metrics.ExporterOptions
}

// Check that our Reconciler implements controller.Reconciler
// Check that our Reconciler implements ReconcileKind
var _ pingsourcereconciler.Interface = (*Reconciler)(nil)

func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.PingSource) pkgreconciler.Event {
Expand Down
10 changes: 6 additions & 4 deletions pkg/reconciler/pingsource/jobrunner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"knative.dev/pkg/logging"
"knative.dev/pkg/source"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
pingsourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha2/pingsource"
pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha2/pingsource"
"knative.dev/eventing/pkg/kncloudevents"
Expand All @@ -40,7 +41,7 @@ const (

// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "ping-source-dispatcher"
controllerAgentName = "ping-source-job-runner"
)

// NewController initializes the controller and is called by the generated code.
Expand All @@ -60,9 +61,10 @@ func NewController(
pingsourceInformer := pingsourceinformer.Get(ctx)

r := &Reconciler{
pingsourceLister: pingsourceInformer.Lister(),
entryidMu: sync.Mutex{},
entryids: make(map[string]cron.EntryID),
eventingClientSet: eventingclient.Get(ctx),
pingsourceLister: pingsourceInformer.Lister(),
entryidMu: sync.Mutex{},
entryids: make(map[string]cron.EntryID),
}

impl := pingsourcereconciler.NewImpl(ctx, r)
Expand Down
37 changes: 23 additions & 14 deletions pkg/reconciler/pingsource/jobrunner/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ import (

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
pingsourcereconciler "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha2/pingsource"
sourceslisters "knative.dev/eventing/pkg/client/listers/sources/v1alpha2"
"knative.dev/eventing/pkg/logging"
)

// Reconciler reconciles PingSources
type Reconciler struct {
cronRunner *cronJobsRunner
pingsourceLister sourceslisters.PingSourceLister
cronRunner *cronJobsRunner
eventingClientSet clientset.Interface
pingsourceLister sourceslisters.PingSourceLister

entryidMu sync.Mutex
entryids map[string]cron.EntryID // key: resource namespace/name
Expand All @@ -44,6 +46,9 @@ type Reconciler struct {
// Check that our Reconciler implements ReconcileKind.
var _ pingsourcereconciler.Interface = (*Reconciler)(nil)

// Check that our Reconciler implements FinalizeKind.
var _ pingsourcereconciler.Finalizer = (*Reconciler)(nil)

func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha2.PingSource) pkgreconciler.Event {
scope, ok := source.Annotations[eventing.ScopeAnnotationKey]
if ok && scope != eventing.ScopeCluster {
Expand All @@ -62,23 +67,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha2.PingSou
} else {
logging.FromContext(ctx).Debug("PingSource reconciled")
}
return nil
return reconcileErr
}

func (r *Reconciler) reconcile(ctx context.Context, source *v1alpha2.PingSource) error {
key := fmt.Sprintf("%s/%s", source.Namespace, source.Name)
if source.DeletionTimestamp != nil {
if id, ok := r.entryids[key]; ok {
r.cronRunner.RemoveSchedule(id)

r.entryidMu.Lock()
delete(r.entryids, key)
r.entryidMu.Unlock()
}
return nil
}
logging.FromContext(ctx).Info("synchronizing schedule")

key := fmt.Sprintf("%s/%s", source.Namespace, source.Name)
// Is the schedule already cached?
if id, ok := r.entryids[key]; ok {
r.cronRunner.RemoveSchedule(id)
Expand All @@ -93,3 +88,17 @@ func (r *Reconciler) reconcile(ctx context.Context, source *v1alpha2.PingSource)

return nil
}

func (r *Reconciler) FinalizeKind(ctx context.Context, source *v1alpha2.PingSource) pkgreconciler.Event {
key := fmt.Sprintf("%s/%s", source.Namespace, source.Name)

if id, ok := r.entryids[key]; ok {
r.cronRunner.RemoveSchedule(id)

r.entryidMu.Lock()
delete(r.entryids, key)
r.entryidMu.Unlock()
}

return nil
}
121 changes: 112 additions & 9 deletions pkg/reconciler/pingsource/jobrunner/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"testing"

"github.com/robfig/cron"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientgotesting "k8s.io/client-go/testing"
sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha2/pingsource"
kncetesting "knative.dev/eventing/pkg/kncloudevents/testing"
Expand All @@ -39,11 +43,12 @@ import (
)

const (
testNS = "test-namespace"
pingSourceName = "test-pingsource"
testSchedule = "*/2 * * * *"
testData = "data"
sinkName = "mysink"
testNS = "test-namespace"
pingSourceName = "test-pingsource"
testSchedule = "*/2 * * * *"
testData = "data"
sinkName = "mysink"
defaultFinalizerName = "pingsources.sources.knative.dev"
)

var (
Expand Down Expand Up @@ -95,6 +100,85 @@ func TestAllCases(t *testing.T) {
WithPingSourceV1A2EventType,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, defaultFinalizerName),
},
WantErr: false,
}, {
Name: "valid schedule, with finalizer",
Key: pingsourceKey,
Objects: []runtime.Object{
NewPingSourceV1Alpha2(pingSourceName, testNS,
WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{
Schedule: testSchedule,
JsonData: testData,
SourceSpec: duckv1.SourceSpec{
Sink: sinkDest,
CloudEventOverrides: nil,
},
}),
WithInitPingSourceV1A2Conditions,
WithValidPingSourceV1A2Schedule,
WithPingSourceV1A2Deployed,
WithPingSourceV1A2Sink(sinkURI),
WithPingSourceV1A2EventType,
WithPingSourceV1A2Finalizers(defaultFinalizerName),
),
},
WantErr: false,
}, {
Name: "valid schedule, deleted with finalizer",
Key: pingsourceKey,
Objects: []runtime.Object{
NewPingSourceV1Alpha2(pingSourceName, testNS,
WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{
Schedule: testSchedule,
JsonData: testData,
SourceSpec: duckv1.SourceSpec{
Sink: sinkDest,
CloudEventOverrides: nil,
},
}),
WithInitPingSourceV1A2Conditions,
WithValidPingSourceV1A2Schedule,
WithPingSourceV1A2Deployed,
WithPingSourceV1A2Sink(sinkURI),
WithPingSourceV1A2EventType,
WithPingSourceV1A2Finalizers(defaultFinalizerName),
WithPingSourceV1A2Deleted,
),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "%s" finalizers`, pingSourceName),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, pingSourceName, ""),
},
WantErr: false,
}, {
Name: "valid schedule, deleted without finalizer",
Key: pingsourceKey,
Objects: []runtime.Object{
NewPingSourceV1Alpha2(pingSourceName, testNS,
WithPingSourceV1A2Spec(sourcesv1alpha2.PingSourceSpec{
Schedule: testSchedule,
JsonData: testData,
SourceSpec: duckv1.SourceSpec{
Sink: sinkDest,
CloudEventOverrides: nil,
},
}),
WithInitPingSourceV1A2Conditions,
WithValidPingSourceV1A2Schedule,
WithPingSourceV1A2Deployed,
WithPingSourceV1A2Sink(sinkURI),
WithPingSourceV1A2EventType,
WithPingSourceV1A2Deleted,
),
},
WantErr: false,
},
}
Expand All @@ -105,14 +189,33 @@ func TestAllCases(t *testing.T) {

table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
r := &Reconciler{
pingsourceLister: listers.GetPingSourceV1alpha2Lister(),
cronRunner: NewCronJobsRunner(ce, reporter, logger),
entryidMu: sync.Mutex{},
entryids: make(map[string]cron.EntryID),
eventingClientSet: eventingclient.Get(ctx),
pingsourceLister: listers.GetPingSourceV1alpha2Lister(),
cronRunner: NewCronJobsRunner(ce, reporter, logger),
entryidMu: sync.Mutex{},
entryids: make(map[string]cron.EntryID),
}
return pingsource.NewReconciler(ctx, logging.FromContext(ctx),
fakeeventingclient.Get(ctx), listers.GetPingSourceV1alpha2Lister(),
controller.GetEventRecorder(ctx), r)
}, false, logger))

}

func patchFinalizers(namespace, name string, finalizers string) clientgotesting.PatchActionImpl {
fstr := ""
if finalizers != "" {
fstr = `"` + finalizers + `"`
}
return clientgotesting.PatchActionImpl{
ActionImpl: clientgotesting.ActionImpl{
Namespace: namespace,
Verb: "patch",
Resource: schema.GroupVersionResource{Group: "sources.knative.dev", Version: "v1alpha2", Resource: "pingsources"},
Subresource: "",
},
Name: name,
PatchType: "application/merge-patch+json",
Patch: []byte(`{"metadata":{"finalizers":[` + fstr + `],"resourceVersion":""}}`),
}
}
11 changes: 11 additions & 0 deletions pkg/reconciler/testing/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,14 @@ func WithPingSourceObjectMetaGeneration(generation int64) PingSourceOption {
c.ObjectMeta.Generation = generation
}
}

func WithPingSourceV1A2Finalizers(finalizers ...string) PingSourceV1A2Option {
return func(c *v1alpha2.PingSource) {
c.Finalizers = finalizers
}
}

func WithPingSourceV1A2Deleted(c *v1alpha2.PingSource) {
t := metav1.NewTime(time.Unix(1e9, 0))
c.SetDeletionTimestamp(&t)
}