Skip to content

Preserve EventMesh precomputed hashes #17959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions components/eventing-controller/api/v1alpha2/status_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,11 @@ type EventMeshTypes struct {
// Event type that is used on the EventMesh backend.
EventMeshType string `json:"eventMeshType"`
}

// CopyHashes copies the precomputed hashes from the given backend.
func (b *Backend) CopyHashes(src Backend) {
b.Ev2hash = src.Ev2hash
b.EventMeshHash = src.EventMeshHash
b.WebhookAuthHash = src.WebhookAuthHash
b.EventMeshLocalHash = src.EventMeshLocalHash
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package v1alpha2

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBackend_CopyHashes(t *testing.T) {
// given
b := Backend{}

// then
require.Equal(t, int64(0), b.Ev2hash)
require.Equal(t, int64(0), b.EventMeshHash)
require.Equal(t, int64(0), b.WebhookAuthHash)
require.Equal(t, int64(0), b.EventMeshLocalHash)

// given
src := Backend{
Ev2hash: int64(1118518533334734626),
EventMeshHash: int64(1748405436686967274),
WebhookAuthHash: int64(1118518533334734627),
EventMeshLocalHash: int64(1883494500014499539),
}

// when
b.CopyHashes(src)

// then
require.Equal(t, src.Ev2hash, b.Ev2hash)
require.Equal(t, src.EventMeshHash, b.EventMeshHash)
require.Equal(t, src.WebhookAuthHash, b.WebhookAuthHash)
require.Equal(t, src.EventMeshLocalHash, b.EventMeshLocalHash)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/kyma-project/kyma/components/eventing-controller/pkg/env"
)

type syncConditionWebhookCallStatusFunc func(subscription *eventingv1alpha2.Subscription)

// Reconciler reconciles a Subscription object.
type Reconciler struct {
ctx context.Context
Expand All @@ -56,9 +58,10 @@ type Reconciler struct {
cleaner cleaner.Cleaner
oauth2credentials *eventmesh.OAuth2ClientCredentials
// nameMapper is used to map the Kyma subscription name to a subscription name on EventMesh.
nameMapper backendutils.NameMapper
sinkValidator sink.Validator
collector *metrics.Collector
nameMapper backendutils.NameMapper
sinkValidator sink.Validator
collector *metrics.Collector
syncConditionWebhookCallStatus syncConditionWebhookCallStatusFunc
}

const (
Expand All @@ -82,17 +85,18 @@ func NewReconciler(ctx context.Context, client client.Client, logger *logger.Log
panic(err)
}
return &Reconciler{
ctx: ctx,
Client: client,
logger: logger,
recorder: recorder,
Backend: eventMeshBackend,
Domain: cfg.Domain,
cleaner: cleaner,
oauth2credentials: credential,
nameMapper: mapper,
sinkValidator: validator,
collector: collector,
ctx: ctx,
Client: client,
logger: logger,
recorder: recorder,
Backend: eventMeshBackend,
Domain: cfg.Domain,
cleaner: cleaner,
oauth2credentials: credential,
nameMapper: mapper,
sinkValidator: validator,
collector: collector,
syncConditionWebhookCallStatus: syncConditionWebhookCallStatus,
}
}

Expand Down Expand Up @@ -265,7 +269,7 @@ func (r *Reconciler) handleDeleteSubscription(ctx context.Context, subscription
// update condition in subscription status
condition := eventingv1alpha2.MakeCondition(eventingv1alpha2.ConditionSubscribed,
eventingv1alpha2.ConditionReasonSubscriptionDeleted, corev1.ConditionFalse, "")
r.replaceStatusCondition(subscription, condition)
replaceStatusCondition(subscription, condition)

// remove finalizers from subscription
removeFinalizer(subscription)
Expand Down Expand Up @@ -320,7 +324,7 @@ func (r *Reconciler) syncConditionSubscribed(subscription *eventingv1alpha2.Subs
condition = eventingv1alpha2.MakeCondition(eventingv1alpha2.ConditionSubscribed, eventingv1alpha2.ConditionReasonSubscriptionCreationFailed, corev1.ConditionFalse, message)
}

r.replaceStatusCondition(subscription, condition)
replaceStatusCondition(subscription, condition)
}

// syncConditionSubscriptionActive syncs the condition ConditionSubscribed.
Expand All @@ -330,33 +334,30 @@ func (r *Reconciler) syncConditionSubscriptionActive(subscription *eventingv1alp
corev1.ConditionTrue,
"")
if !isActive {
logger.Debugw("Waiting for subscription to be active",
"name",
subscription.Name,
"status",
subscription.Status.Backend.EventMeshSubscriptionStatus.Status)

logger.Infow("Waiting for subscription to be active", "name", subscription.Name,
"status", subscription.Status.Backend.EventMeshSubscriptionStatus)
message := "Waiting for subscription to be active"
condition = eventingv1alpha2.MakeCondition(eventingv1alpha2.ConditionSubscriptionActive,
eventingv1alpha2.ConditionReasonSubscriptionNotActive,
corev1.ConditionFalse,
message)
}
r.replaceStatusCondition(subscription, condition)
replaceStatusCondition(subscription, condition)
}

// syncConditionWebhookCallStatus syncs the condition WebhookCallStatus
// checks if the last webhook call returned an error.
func (r *Reconciler) syncConditionWebhookCallStatus(subscription *eventingv1alpha2.Subscription) {
condition := eventingv1alpha2.MakeCondition(eventingv1alpha2.ConditionWebhookCallStatus, eventingv1alpha2.ConditionReasonWebhookCallStatus, corev1.ConditionFalse, "")
if isWebhookCallError, err := r.checkLastFailedDelivery(subscription); err != nil {
func syncConditionWebhookCallStatus(subscription *eventingv1alpha2.Subscription) {
condition := eventingv1alpha2.MakeCondition(eventingv1alpha2.ConditionWebhookCallStatus,
eventingv1alpha2.ConditionReasonWebhookCallStatus, corev1.ConditionFalse, "")
if isWebhookCallError, err := checkLastFailedDelivery(subscription); err != nil {
condition.Message = err.Error()
} else if isWebhookCallError {
condition.Message = subscription.Status.Backend.EventMeshSubscriptionStatus.LastFailedDeliveryReason
} else {
condition.Status = corev1.ConditionTrue
}
r.replaceStatusCondition(subscription, condition)
replaceStatusCondition(subscription, condition)
}

// syncAPIRule validate the given subscription sink URL and sync its APIRule.
Expand Down Expand Up @@ -658,6 +659,7 @@ func (r *Reconciler) syncInitialStatus(subscription *eventingv1alpha2.Subscripti
}

if len(subscription.Status.Conditions) == 0 {
expectedStatus.Backend.CopyHashes(subscription.Status.Backend)
subscription.Status = expectedStatus
} else {
requiredConditions := getRequiredConditions(subscription.Status.Conditions, expectedStatus.Conditions)
Expand Down Expand Up @@ -695,7 +697,8 @@ func getRequiredConditions(subscriptionConditions, expectedConditions []eventing

// replaceStatusCondition replaces the given condition on the subscription. Also it sets the readiness in the status.
// So make sure you always use this method then changing a condition.
func (r *Reconciler) replaceStatusCondition(subscription *eventingv1alpha2.Subscription, condition eventingv1alpha2.Condition) bool {
func replaceStatusCondition(subscription *eventingv1alpha2.Subscription,
condition eventingv1alpha2.Condition) bool {
// the subscription is ready if all conditions are fulfilled
isReady := true

Expand Down Expand Up @@ -795,7 +798,7 @@ func (r *Reconciler) checkStatusActive(subscription *eventingv1alpha2.Subscripti
}

// checkLastFailedDelivery checks if LastFailedDelivery exists and if it happened after LastSuccessfulDelivery.
func (r *Reconciler) checkLastFailedDelivery(subscription *eventingv1alpha2.Subscription) (bool, error) {
func checkLastFailedDelivery(subscription *eventingv1alpha2.Subscription) (bool, error) {
// Check if LastFailedDelivery exists.
lastFailed := subscription.Status.Backend.EventMeshSubscriptionStatus.LastFailedDelivery
if len(lastFailed) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,117 @@ func TestReconciler_APIRuleConfig_Upgrade(t *testing.T) {
}
}

// TestReconciler_PreserveBackendHashes ensures that the precomputed EventMesh hashes in the Kyma subscription
// is preserved after reconciliation.
func TestReconciler_PreserveBackendHashes(t *testing.T) {
ctx := context.Background()
collector := metrics.NewCollector()
validator := sink.ValidatorFunc(func(s *eventingv1alpha2.Subscription) error { return nil })

const (
ev2hash = int64(118518533334734626)
eventMeshHash = int64(748405436686967274)
webhookAuthHash = int64(118518533334734627)
eventMeshLocalHash = int64(883494500014499539)
)

var testCases = []struct {
name string
givenSubscription *eventingv1alpha2.Subscription
givenReconcilerSetup func(*eventingv1alpha2.Subscription) (*Reconciler, client.Client)
wantEv2Hash int64
wantEventMeshHash int64
wantWebhookAuthHash int64
wantEventMeshLocalHash int64
wantReconcileError error
}{
{
name: "Preserve hashes if conditions are empty",
givenSubscription: func() *eventingv1alpha2.Subscription {
return reconcilertesting.NewSubscription("some-test-sub-0", "test",
reconcilertesting.WithValidSink("test", "some-test-svc-0"),
reconcilertesting.WithConditions(nil),
reconcilertesting.WithBackend(eventingv1alpha2.Backend{
Ev2hash: ev2hash,
EventMeshHash: eventMeshHash,
WebhookAuthHash: webhookAuthHash,
EventMeshLocalHash: eventMeshLocalHash,
}),
)
}(),
givenReconcilerSetup: func(s *eventingv1alpha2.Subscription) (*Reconciler, client.Client) {
te := setupTestEnvironment(t, s)
te.backend.On("Initialize", mock.Anything).Return(nil)
te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
return NewReconciler(ctx, te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner,
te.backend, te.credentials, te.mapper, validator, collector), te.fakeClient
},
wantEv2Hash: ev2hash,
wantEventMeshHash: eventMeshHash,
wantWebhookAuthHash: webhookAuthHash,
wantEventMeshLocalHash: eventMeshLocalHash,
wantReconcileError: nil,
},
{
name: "Preserve hashes if conditions are not empty",
givenSubscription: func() *eventingv1alpha2.Subscription {
return reconcilertesting.NewSubscription("some-test-sub-1", "test",
reconcilertesting.WithValidSink("test", "some-test-svc-1"),
reconcilertesting.WithConditions(eventingv1alpha2.MakeSubscriptionConditions()),
reconcilertesting.WithBackend(eventingv1alpha2.Backend{
Ev2hash: ev2hash,
EventMeshHash: eventMeshHash,
WebhookAuthHash: webhookAuthHash,
EventMeshLocalHash: eventMeshLocalHash,
}),
)
}(),
givenReconcilerSetup: func(s *eventingv1alpha2.Subscription) (*Reconciler, client.Client) {
te := setupTestEnvironment(t, s)
te.backend.On("Initialize", mock.Anything).Return(nil)
te.backend.On("SyncSubscription", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
return NewReconciler(ctx, te.fakeClient, te.logger, te.recorder, te.cfg, te.cleaner,
te.backend, te.credentials, te.mapper, validator, collector), te.fakeClient
},
wantEv2Hash: ev2hash,
wantEventMeshHash: eventMeshHash,
wantWebhookAuthHash: webhookAuthHash,
wantEventMeshLocalHash: eventMeshLocalHash,
wantReconcileError: nil,
},
}
featureFlagValues := []bool{true, false}
for _, testCase := range testCases {
for _, value := range featureFlagValues {
tc := testCase
flag := value
t.Run(fmt.Sprintf("%s [EventingWebhookAuthEnabled=%v]", tc.name, flag), func(t *testing.T) {
// given
featureflags.SetEventingWebhookAuthEnabled(flag)
reconciler, cli := tc.givenReconcilerSetup(tc.givenSubscription)
reconciler.syncConditionWebhookCallStatus = func(subscription *eventingv1alpha2.Subscription) {}
namespacedName := k8stypes.NamespacedName{
Namespace: tc.givenSubscription.Namespace,
Name: tc.givenSubscription.Name,
}

// when
_, err := reconciler.Reconcile(context.Background(), ctrl.Request{NamespacedName: namespacedName})
require.Equal(t, tc.wantReconcileError, err)

// then
sub := &eventingv1alpha2.Subscription{}
err = cli.Get(ctx, namespacedName, sub)
require.NoError(t, err)
require.Equal(t, tc.wantEv2Hash, sub.Status.Backend.Ev2hash)
require.Equal(t, tc.wantEventMeshHash, sub.Status.Backend.EventMeshHash)
require.Equal(t, tc.wantWebhookAuthHash, sub.Status.Backend.WebhookAuthHash)
require.Equal(t, tc.wantEventMeshLocalHash, sub.Status.Backend.EventMeshLocalHash)
})
}
}
}

func Test_replaceStatusCondition(t *testing.T) {
var testCases = []struct {
name string
Expand Down Expand Up @@ -620,13 +731,11 @@ func Test_replaceStatusCondition(t *testing.T) {
},
}

r := Reconciler{}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
sub := tt.giveSubscription
condition := tt.giveCondition
statusChanged := r.replaceStatusCondition(sub, condition)
statusChanged := replaceStatusCondition(sub, condition)
assert.Equal(t, tt.wantStatusChanged, statusChanged)
assert.Contains(t, sub.Status.Conditions, condition)
assert.Equal(t, tt.wantReady, sub.Status.Ready)
Expand Down Expand Up @@ -775,7 +884,11 @@ func Test_syncConditionSubscribed(t *testing.T) {
}

r := Reconciler{
nameMapper: backendutils.NewBEBSubscriptionNameMapper(domain, eventmesh.MaxSubscriptionNameLength),
nameMapper: backendutils.NewBEBSubscriptionNameMapper(
domain,
eventmesh.MaxSubscriptionNameLength,
),
syncConditionWebhookCallStatus: syncConditionWebhookCallStatus,
}

for _, tc := range testCases {
Expand Down Expand Up @@ -1027,7 +1140,8 @@ func Test_syncConditionWebhookCallStatus(t *testing.T) {
}

r := Reconciler{
logger: logger,
logger: logger,
syncConditionWebhookCallStatus: syncConditionWebhookCallStatus,
}

for _, tc := range testCases {
Expand Down Expand Up @@ -1223,18 +1337,9 @@ func Test_checkLastFailedDelivery(t *testing.T) {
},
}

logger, err := eventinglogger.New(string(kymalogger.JSON), string(kymalogger.INFO))
if err != nil {
t.Fatalf(`failed to initiate logger, %v`, err)
}

r := Reconciler{
logger: logger,
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result, err := r.checkLastFailedDelivery(tc.givenSubscription)
result, err := checkLastFailedDelivery(tc.givenSubscription)
assert.Equal(t, tc.wantResult, result)
if tc.wantError == nil {
assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func Test_Client_Update(t *testing.T) {
WebhookURL: "www.kyma-project.io",
WebhookAuth: &types.WebhookAuth{
Type: "abc",
User: "test",
Password: "test123",
GrantType: "test",
ClientID: "123456",
ClientSecret: "qwerty",
Expand All @@ -52,8 +50,6 @@ func Test_Client_Update(t *testing.T) {
}
givenUpdateWebhook := &types.WebhookAuth{
Type: "abc",
User: "test",
Password: "test123changed",
GrantType: "test",
ClientID: "123456changed",
ClientSecret: "qwertychanged",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ type Subscriptions []Subscription

type WebhookAuth struct {
Type AuthType `json:"type,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
GrantType GrantType `json:"grantType,omitempty"`
ClientID string `json:"clientId,omitempty"`
ClientSecret string `json:"clientSecret,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func markAllV1Alpha2SubscriptionsAsNotReady(dynamicClient dynamic.Interface, log

desiredSub := sub.DuplicateWithStatusDefaults()
desiredSub.Status.Ready = false
desiredSub.Status.Backend.CopyHashes(sub.Status.Backend)
if err = backendutils.UpdateSubscriptionStatus(ctx, dynamicClient, desiredSub); err != nil {
logger.Errorw("Failed to update subscription status", "namespace", sub.Namespace, "name", sub.Name, "error", err)
}
Expand Down
Loading