@@ -48,11 +48,11 @@ import (
48
48
"knative.dev/pkg/controller"
49
49
pkgreconciler "knative.dev/pkg/reconciler"
50
50
51
- "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1 "
51
+ "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1 "
52
52
kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned"
53
53
kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme"
54
- kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1 /kafkachannel"
55
- listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1 "
54
+ kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1 /kafkachannel"
55
+ listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1beta1 "
56
56
"knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources"
57
57
"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
58
58
)
@@ -139,7 +139,7 @@ type envConfig struct {
139
139
var _ kafkaChannelReconciler.Interface = (* Reconciler )(nil )
140
140
var _ kafkaChannelReconciler.Finalizer = (* Reconciler )(nil )
141
141
142
- func (r * Reconciler ) ReconcileKind (ctx context.Context , kc * v1alpha1 .KafkaChannel ) pkgreconciler.Event {
142
+ func (r * Reconciler ) ReconcileKind (ctx context.Context , kc * v1beta1 .KafkaChannel ) pkgreconciler.Event {
143
143
kc .Status .InitializeConditions ()
144
144
145
145
logger := logging .FromContext (ctx )
@@ -247,7 +247,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne
247
247
return newReconciledNormal (kc .Namespace , kc .Name )
248
248
}
249
249
250
- func (r * Reconciler ) reconcileDispatcher (ctx context.Context , scope string , dispatcherNamespace string , kc * v1alpha1 .KafkaChannel ) (* appsv1.Deployment , error ) {
250
+ func (r * Reconciler ) reconcileDispatcher (ctx context.Context , scope string , dispatcherNamespace string , kc * v1beta1 .KafkaChannel ) (* appsv1.Deployment , error ) {
251
251
if scope == scopeNamespace {
252
252
// Configure RBAC in namespace to access the configmaps
253
253
sa , err := r .reconcileServiceAccount (ctx , dispatcherNamespace , kc )
@@ -311,7 +311,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
311
311
return d , nil
312
312
}
313
313
314
- func (r * Reconciler ) reconcileServiceAccount (ctx context.Context , dispatcherNamespace string , kc * v1alpha1 .KafkaChannel ) (* corev1.ServiceAccount , error ) {
314
+ func (r * Reconciler ) reconcileServiceAccount (ctx context.Context , dispatcherNamespace string , kc * v1beta1 .KafkaChannel ) (* corev1.ServiceAccount , error ) {
315
315
sa , err := r .serviceAccountLister .ServiceAccounts (dispatcherNamespace ).Get (dispatcherName )
316
316
if err != nil {
317
317
if apierrs .IsNotFound (err ) {
@@ -332,7 +332,7 @@ func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherName
332
332
return sa , err
333
333
}
334
334
335
- func (r * Reconciler ) reconcileRoleBinding (ctx context.Context , name string , ns string , kc * v1alpha1 .KafkaChannel , clusterRoleName string , sa * corev1.ServiceAccount ) (* rbacv1.RoleBinding , error ) {
335
+ func (r * Reconciler ) reconcileRoleBinding (ctx context.Context , name string , ns string , kc * v1beta1 .KafkaChannel , clusterRoleName string , sa * corev1.ServiceAccount ) (* rbacv1.RoleBinding , error ) {
336
336
rb , err := r .roleBindingLister .RoleBindings (ns ).Get (name )
337
337
if err != nil {
338
338
if apierrs .IsNotFound (err ) {
@@ -352,7 +352,7 @@ func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns s
352
352
return rb , err
353
353
}
354
354
355
- func (r * Reconciler ) reconcileDispatcherService (ctx context.Context , dispatcherNamespace string , kc * v1alpha1 .KafkaChannel ) (* corev1.Service , error ) {
355
+ func (r * Reconciler ) reconcileDispatcherService (ctx context.Context , dispatcherNamespace string , kc * v1beta1 .KafkaChannel ) (* corev1.Service , error ) {
356
356
svc , err := r .serviceLister .Services (dispatcherNamespace ).Get (dispatcherName )
357
357
if err != nil {
358
358
if apierrs .IsNotFound (err ) {
@@ -380,7 +380,7 @@ func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherN
380
380
return svc , nil
381
381
}
382
382
383
- func (r * Reconciler ) reconcileChannelService (ctx context.Context , dispatcherNamespace string , channel * v1alpha1 .KafkaChannel ) (* corev1.Service , error ) {
383
+ func (r * Reconciler ) reconcileChannelService (ctx context.Context , dispatcherNamespace string , channel * v1beta1 .KafkaChannel ) (* corev1.Service , error ) {
384
384
logger := logging .FromContext (ctx )
385
385
// Get the Service and propagate the status to the Channel in case it does not exist.
386
386
// We don't do anything with the service because it's status contains nothing useful, so just do
@@ -425,7 +425,7 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName
425
425
return svc , nil
426
426
}
427
427
428
- func (r * Reconciler ) createClient (ctx context.Context , kc * v1alpha1 .KafkaChannel ) (sarama.ClusterAdmin , error ) {
428
+ func (r * Reconciler ) createClient (ctx context.Context , kc * v1beta1 .KafkaChannel ) (sarama.ClusterAdmin , error ) {
429
429
// We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time.
430
430
// This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162.
431
431
// Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently
@@ -441,7 +441,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel
441
441
return kafkaClusterAdmin , nil
442
442
}
443
443
444
- func (r * Reconciler ) createTopic (ctx context.Context , channel * v1alpha1 .KafkaChannel , kafkaClusterAdmin sarama.ClusterAdmin ) error {
444
+ func (r * Reconciler ) createTopic (ctx context.Context , channel * v1beta1 .KafkaChannel , kafkaClusterAdmin sarama.ClusterAdmin ) error {
445
445
logger := logging .FromContext (ctx )
446
446
447
447
topicName := utils .TopicName (utils .KafkaChannelSeparator , channel .Namespace , channel .Name )
@@ -460,7 +460,7 @@ func (r *Reconciler) createTopic(ctx context.Context, channel *v1alpha1.KafkaCha
460
460
return err
461
461
}
462
462
463
- func (r * Reconciler ) deleteTopic (ctx context.Context , channel * v1alpha1 .KafkaChannel , kafkaClusterAdmin sarama.ClusterAdmin ) error {
463
+ func (r * Reconciler ) deleteTopic (ctx context.Context , channel * v1beta1 .KafkaChannel , kafkaClusterAdmin sarama.ClusterAdmin ) error {
464
464
logger := logging .FromContext (ctx )
465
465
466
466
topicName := utils .TopicName (utils .KafkaChannelSeparator , channel .Namespace , channel .Name )
@@ -488,7 +488,7 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
488
488
r .kafkaConfigError = err
489
489
}
490
490
491
- func (r * Reconciler ) FinalizeKind (ctx context.Context , kc * v1alpha1 .KafkaChannel ) pkgreconciler.Event {
491
+ func (r * Reconciler ) FinalizeKind (ctx context.Context , kc * v1beta1 .KafkaChannel ) pkgreconciler.Event {
492
492
// Do not attempt retrying creating the client because it might be a permanent error
493
493
// in which case the finalizer will never get removed.
494
494
if kafkaClusterAdmin , err := r .createClient (ctx , kc ); err == nil && r .kafkaConfig != nil {
0 commit comments