Skip to content

Add PriorityClass #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 18, 2023
1 change: 1 addition & 0 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
resources:
- manager.yaml
- service.yaml
- priority-class.yaml

apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
Expand Down
1 change: 1 addition & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spec:
app.kubernetes.io/instance: eventing-manager
app.kubernetes.io/component: eventing-manager
spec:
priorityClassName: "eventing-manager-priority-class"
# TODO(user): Uncomment the following code to configure the nodeAffinity expression
# according to the platforms which are supported by your solution.
# It is considered best practice to support multiple architectures. You can
Expand Down
7 changes: 7 additions & 0 deletions config/manager/priority-class.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: eventing-manager-priority-class
value: 2000000
globalDefault: false
description: "Scheduling priority of the Eventing-Manager module. Must not be blocked by unschedulable user workloads."
1 change: 1 addition & 0 deletions config/webhook/cronjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ spec:
annotations:
sidecar.istio.io/inject: "false"
spec:
priorityClassName: eventing-manager-priority-class
restartPolicy: Never
containers:
- name: api-gateway
Expand Down
1 change: 1 addition & 0 deletions config/webhook/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ spec:
annotations:
sidecar.istio.io/inject: "false"
spec:
priorityClassName: eventing-manager-priority-class
restartPolicy: Never
containers:
- name: api-gateway
Expand Down
12 changes: 6 additions & 6 deletions hack/e2e/common/testenvironment/test_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (te *TestEnvironment) CreateTestNamespace() error {

func (te *TestEnvironment) DeleteTestNamespace() error {
return common.Retry(FewAttempts, Interval, func() error {
// It's fine if the Namespace not exists.
// It's fine if the Namespace does not exist.
return client.IgnoreNotFound(te.K8sClient.Delete(te.Context, fixtures.Namespace(te.TestConfigs.TestNamespace)))
})
}
Expand Down Expand Up @@ -122,13 +122,13 @@ func (te *TestEnvironment) InitSinkClient() {

func (te *TestEnvironment) CreateAllSubscriptions() error {
ctx := context.TODO()
// create v1alpha1 subscriptions if not exists.
// Create v1alpha1 subscriptions if not exists.
err := te.CreateV1Alpha1Subscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest())
if err != nil {
return err
}

// create v1alpha2 subscriptions if not exists.
// Create v1alpha2 subscriptions if not exists.
return te.CreateV1Alpha2Subscriptions(ctx, fixtures.V1Alpha2SubscriptionsToTest())
}

Expand All @@ -140,7 +140,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error {
}
}

// delete v1alpha2 subscriptions if not exists.
// Delete v1alpha2 subscriptions if not exists.
for _, subToTest := range fixtures.V1Alpha2SubscriptionsToTest() {
if err := te.DeleteSubscriptionFromK8s(subToTest.Name, te.TestConfigs.TestNamespace); err != nil {
return err
Expand All @@ -151,7 +151,7 @@ func (te *TestEnvironment) DeleteAllSubscriptions() error {

func (te *TestEnvironment) WaitForAllSubscriptions() error {
ctx := context.TODO()
// wait for v1alpha1 subscriptions to get ready.
// Wait for v1alpha1 subscriptions to get ready.
err := te.WaitForSubscriptions(ctx, fixtures.V1Alpha1SubscriptionsToTest())
if err != nil {
return err
Expand All @@ -167,7 +167,7 @@ func (te *TestEnvironment) CreateV1Alpha1Subscriptions(ctx context.Context, subL
newSub := subInfo.ToSubscriptionV1Alpha1(te.TestConfigs.SubscriptionSinkURL, te.TestConfigs.TestNamespace)
return client.IgnoreAlreadyExists(te.K8sClient.Create(ctx, newSub))
})
// return error if all retries are exhausted.
// Return error if all retries are exhausted.
if err != nil {
return err
}
Expand Down
97 changes: 84 additions & 13 deletions hack/e2e/setup/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var testEnvironment *testenvironment.TestEnvironment
func TestMain(m *testing.M) {
testEnvironment = testenvironment.NewTestEnvironment()

// create test namespace,
// Create a test namespace.
if err := testEnvironment.CreateTestNamespace(); err != nil {
testEnvironment.Logger.Fatal(err.Error())
}
Expand All @@ -53,12 +53,11 @@ func TestMain(m *testing.M) {
}

// Create the Eventing CR used for testing.

if err := testEnvironment.SetupEventingCR(); err != nil {
testEnvironment.Logger.Fatal(err.Error())
}

// wait for a testenvironment.Interval for reconciliation to update status.
// Wait for a testenvironment.Interval for reconciliation to update status.
time.Sleep(testenvironment.Interval)

// Wait for Eventing CR to get ready.
Expand All @@ -77,10 +76,7 @@ func Test_WebhookServerCertSecret(t *testing.T) {
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.CoreV1().Secrets(NamespaceName).Get(ctx, WebhookServerCertSecretName, metav1.GetOptions{})
if getErr != nil {
return getErr
}
return nil
return getErr
})
require.NoError(t, err)
}
Expand All @@ -90,10 +86,20 @@ func Test_WebhookServerCertJob(t *testing.T) {
t.Parallel()
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
job, getErr := testEnvironment.K8sClientset.BatchV1().Jobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
if getErr != nil {
return getErr
}

// Check if the PriorityClassName was set correctly.
if job.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("Job '%s' was expected to have PriorityClassName '%s' but has '%s'",
job.GetName(),
eventing.PriorityClassName,
job.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand All @@ -104,10 +110,24 @@ func Test_WebhookServerCertCronJob(t *testing.T) {
t.Parallel()
ctx := context.TODO()
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get(ctx, WebhookServerCertJobName, metav1.GetOptions{})
job, getErr := testEnvironment.K8sClientset.BatchV1().CronJobs(NamespaceName).Get(
ctx,
WebhookServerCertJobName,
metav1.GetOptions{},
)
if getErr != nil {
return getErr
}

// Check if the PriorityClassName was set correctly.
if job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("ChronJob '%s' was expected to have PriorityClassName '%s' but has '%s'",
job.GetName(),
eventing.PriorityClassName,
job.Spec.JobTemplate.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand Down Expand Up @@ -240,6 +260,15 @@ func Test_PublisherProxyDeployment(t *testing.T) {
return err
}

if gotDeployment.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf(
"error while checking deployment '%s'; PriorityClasssName was supposed to be %s, but was %s",
gotDeployment.GetName(),
eventing.PriorityClassName,
gotDeployment.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
require.NoError(t, err)
Expand All @@ -254,7 +283,7 @@ func Test_PublisherProxyPods(t *testing.T) {

ctx := context.TODO()
eventingCR := EventingCR(eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType))
// RetryGet the Pods and test them.
// Retry to get the Pods and test them.
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
// get publisher deployment
gotDeployment, getErr := testEnvironment.GetDeployment(eventing.GetPublisherDeploymentName(*eventingCR), NamespaceName)
Expand All @@ -280,7 +309,7 @@ func Test_PublisherProxyPods(t *testing.T) {
)
}

// Go through all Pods, check its spec. It should be same as defined in Eventing CR
// Go through all Pods, check its spec. It should be same as defined in Eventing CR.
for _, pod := range pods.Items {
// find the container.
container := FindContainerInPod(pod, PublisherContainerName)
Expand All @@ -297,13 +326,22 @@ func Test_PublisherProxyPods(t *testing.T) {
)
}

// check if the ENV `BACKEND` is defined correctly.
// Check if the PriorityClassName was set as expected.
if pod.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("'PriorityClassName' of Pod %v should be %v but was %v",
pod.GetName(),
eventing.PriorityClassName,
pod.Spec.PriorityClassName,
)
}

// Check if the ENV `BACKEND` is defined correctly.
wantBackendENVValue := "nats"
if eventingv1alpha1.BackendType(testEnvironment.TestConfigs.BackendType) == eventingv1alpha1.EventMeshBackendType {
wantBackendENVValue = "beb"
}

// get value of ENV `BACKEND`
// Get value of ENV `BACKEND`.
gotBackendENVValue := ""
for _, envVar := range container.Env {
if envVar.Name == "BACKEND" {
Expand All @@ -325,3 +363,36 @@ func Test_PublisherProxyPods(t *testing.T) {
})
require.NoError(t, err)
}

func Test_PriorityClass(t *testing.T) {
t.Parallel()

ctx := context.TODO()

// Check if the PriorityClass exists in the cluster.
err := Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
_, getErr := testEnvironment.K8sClientset.SchedulingV1().PriorityClasses().Get(
ctx, eventing.PriorityClassName, metav1.GetOptions{})
return getErr
})
require.Nil(t, err, fmt.Errorf("error while fetching PriorityClass: %v", err))

// Check if the Eventing-Manager Deployment has the right PriorityClassName. This implicits that the
// corresponding Pod also has the right PriorityClassName.
err = Retry(testenvironment.Attempts, testenvironment.Interval, func() error {
deploy, getErr := testEnvironment.GetDeployment(ManagerDeploymentName, NamespaceName)
if getErr != nil {
return getErr
}

if deploy.Spec.Template.Spec.PriorityClassName != eventing.PriorityClassName {
return fmt.Errorf("deployment '%s' should have the PriorityClassName %s but was %s",
deploy.GetName(),
eventing.PriorityClassName,
deploy.Spec.Template.Spec.PriorityClassName,
)
}

return nil
})
}
10 changes: 10 additions & 0 deletions pkg/eventing/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (

PublisherSecretEMSURLKey = "ems-publish-url"
PublisherSecretBEBNamespaceKey = "beb-namespace"

PriorityClassName = "eventing-manager-priority-class"
)

var (
Expand All @@ -57,6 +59,7 @@ func newNATSPublisherDeployment(
WithNATSEnvVars(natsConfig, publisherConfig, eventing),
WithLogEnvVars(publisherConfig, eventing),
WithAffinity(GetPublisherDeploymentName(*eventing)),
WithPriorityClassName(PriorityClassName),
)
}

Expand All @@ -70,6 +73,7 @@ func newEventMeshPublisherDeployment(
WithContainers(publisherConfig, eventing),
WithBEBEnvVars(GetPublisherDeploymentName(*eventing), publisherConfig, eventing),
WithLogEnvVars(publisherConfig, eventing),
WithPriorityClassName(PriorityClassName),
)
}

Expand Down Expand Up @@ -136,6 +140,12 @@ func WithLabels(publisherName string, backendType v1alpha1.BackendType) DeployOp
}
}

func WithPriorityClassName(name string) DeployOpt {
return func(deployment *appsv1.Deployment) {
deployment.Spec.Template.Spec.PriorityClassName = name
}
}

func WithAffinity(publisherName string) DeployOpt {
return func(d *appsv1.Deployment) {
d.Spec.Template.Spec.Affinity = &v1.Affinity{
Expand Down
3 changes: 2 additions & 1 deletion pkg/eventing/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func Test_NewDeploymentSecurityContext(t *testing.T) {
testutils.WithEventingCRNamespace("test-namespace"),
)
deployment := newDeployment(givenEventing, config.PublisherConfig,
WithContainers(config.PublisherConfig, givenEventing))
WithContainers(config.PublisherConfig, givenEventing),
)

// when
podSecurityContext := deployment.Spec.Template.Spec.SecurityContext
Expand Down