Skip to content

receive adapters can now run in leader elected mode #3370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions pkg/adapter/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,29 @@ import (
"encoding/json"

"go.uber.org/zap"
tracingconfig "knative.dev/pkg/tracing/config"

duckv1 "knative.dev/pkg/apis/duck/v1"
kle "knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
tracingconfig "knative.dev/pkg/tracing/config"

"knative.dev/eventing/pkg/tracing"
)

type EnvConfigConstructor func() EnvConfigAccessor

const (
EnvConfigComponent = "K_COMPONENT"
EnvConfigNamespace = "NAMESPACE"
EnvConfigName = "NAME"
EnvConfigResourceGroup = "K_RESOURCE_GROUP"
EnvConfigSink = "K_SINK"
EnvConfigCEOverrides = "K_CE_OVERRIDES"
EnvConfigMetricsConfig = "K_METRICS_CONFIG"
EnvConfigLoggingConfig = "K_LOGGING_CONFIG"
EnvConfigTracingConfig = "K_TRACING_CONFIG"
EnvConfigComponent = "K_COMPONENT"
EnvConfigNamespace = "NAMESPACE"
EnvConfigName = "NAME"
EnvConfigResourceGroup = "K_RESOURCE_GROUP"
EnvConfigSink = "K_SINK"
EnvConfigCEOverrides = "K_CE_OVERRIDES"
EnvConfigMetricsConfig = "K_METRICS_CONFIG"
EnvConfigLoggingConfig = "K_LOGGING_CONFIG"
EnvConfigTracingConfig = "K_TRACING_CONFIG"
EnvConfigLeaderElectionConfig = "K_LEADER_ELECTION_CONFIG"
)

// EnvConfig is the minimal set of configuration parameters
Expand Down Expand Up @@ -79,6 +81,9 @@ type EnvConfig struct {
// a config map inside the controllers namespace and copied here.
// Default is no-op.
TracingConfigJson string `envconfig:"K_TRACING_CONFIG"`

// LeaderElectionConfigJson is the leader election component configuration.
LeaderElectionConfigJson string `envconfig:"K_LEADER_ELECTION_CONFIG"`
}

// EnvConfigAccessor defines accessors for the minimal
Expand All @@ -105,6 +110,9 @@ type EnvConfigAccessor interface {
SetupTracing(*zap.SugaredLogger) error

GetCloudEventOverrides() (*duckv1.CloudEventOverrides, error)

// GetLeaderElectionConfig returns leader election configuration.
GetLeaderElectionConfig() (*kle.ComponentConfig, error)
}

var _ EnvConfigAccessor = (*EnvConfig)(nil)
Expand Down Expand Up @@ -167,3 +175,21 @@ func (e *EnvConfig) GetCloudEventOverrides() (*duckv1.CloudEventOverrides, error
}
return &ceOverrides, nil
}

func (e *EnvConfig) GetLeaderElectionConfig() (*kle.ComponentConfig, error) {
if e.LeaderElectionConfigJson == "" {
return defaultLeaderElectionConfig(), nil
}

var config kle.ComponentConfig
if err := json.Unmarshal([]byte(e.LeaderElectionConfigJson), &config); err != nil {
return defaultLeaderElectionConfig(), err
}
return &config, nil
}

func defaultLeaderElectionConfig() *kle.ComponentConfig {
return &kle.ComponentConfig{
LeaderElect: false,
}
}
5 changes: 5 additions & 0 deletions pkg/adapter/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestEnvConfig(t *testing.T) {
os.Setenv("K_METRICS_CONFIG", "metrics")
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("K_TRACING_CONFIG", "tracing")
os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection")
os.Setenv("MODE", "mymode") // note: custom to this test impl

var env myEnvConfig
Expand All @@ -49,4 +50,8 @@ func TestEnvConfig(t *testing.T) {
if env.Sink != "http://sink" {
t.Errorf("Expected sinkURI http://sink, got: %s", env.Sink)
}

if env.LeaderElectionConfigJson != "leaderelection" {
t.Errorf("Expected LeaderElectionConfigJson leaderelection, got: %s", env.LeaderElectionConfigJson)
}
}
23 changes: 19 additions & 4 deletions pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
Expand Down Expand Up @@ -105,16 +106,30 @@ func MainWithContext(ctx context.Context, component string, ector EnvConfigConst

eventsClient, err := NewCloudEventsClient(env.GetSink(), ceOverrides, reporter)
if err != nil {
logger.Fatal("error building cloud event client", zap.Error(err))
logger.Fatal("Error building cloud event client", zap.Error(err))
}

// Configuring the adapter
adapter := ctor(ctx, env, eventsClient)

logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))
run := func(ctx context.Context) {
logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter))

if err := adapter.Start(ctx); err != nil {
logger.Warn("start returned an error", zap.Error(err))
if err := adapter.Start(ctx); err != nil {
logger.Warn("Start returned an error", zap.Error(err))
}
}

leConfig, err := env.GetLeaderElectionConfig()
if err != nil {
logger.Error("Error loading the leader election configuration", zap.Error(err))
}

if leConfig.LeaderElect {
sharedmain.RunLeaderElected(ctx, logger, run, *leConfig)
} else {
logger.Infof("%v will not run in leader-elected mode", component)
run(ctx)
}
}

Expand Down
22 changes: 19 additions & 3 deletions pkg/adapter/v2/main_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net/http"
"time"

"knative.dev/pkg/injection/sharedmain"

"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
Expand Down Expand Up @@ -114,9 +116,23 @@ func MainMessageAdapterWithContext(ctx context.Context, component string, ector
// Configuring the adapter
adapter := ctor(ctx, env, httpBindingsSender, reporter)

logger.Info("Starting Receive MessageAdapter", zap.Any("adapter", adapter))
run := func(ctx context.Context) {
logger.Info("Starting Receive MessageAdapter", zap.Any("adapter", adapter))

if err := adapter.Start(ctx); err != nil {
logger.Warn("start returned an error", zap.Error(err))
}
}

leConfig, err := env.GetLeaderElectionConfig()
if err != nil {
logger.Error("Error loading the leader election configuration", zap.Error(err))
}

if err := adapter.Start(ctx); err != nil {
logger.Warn("start returned an error", zap.Error(err))
if leConfig.LeaderElect {
sharedmain.RunLeaderElected(ctx, logger, run, *leConfig)
} else {
logger.Infof("%v will not run in leader-elected mode", component)
run(ctx)
}
}
9 changes: 9 additions & 0 deletions pkg/adapter/v2/main_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestMainMessageAdapter(t *testing.T) {
os.Setenv("K_METRICS_CONFIG", "metrics")
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("MODE", "mymode")
os.Setenv("K_LEADER_ELECTION_CONFIG", "")

ctx, cancel := context.WithCancel(context.TODO())

Expand All @@ -51,6 +52,14 @@ func TestMainMessageAdapter(t *testing.T) {
if env.Sink != "http://sink" {
t.Errorf("Expected sinkURI http://sink, got: %s", env.Sink)
}

leConfig, err := env.GetLeaderElectionConfig()
if err != nil {
t.Errorf("Expected no error: %v", err)
}
if leConfig.LeaderElect {
t.Errorf("Expected LeaderElect to be false, got: %t", leConfig.LeaderElect)
}
return &myAdapterBindings{}
})

Expand Down
9 changes: 9 additions & 0 deletions pkg/adapter/v2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestMainWithContext(t *testing.T) {
os.Setenv("K_METRICS_CONFIG", "metrics")
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("MODE", "mymode")
os.Setenv("K_LEADER_ELECTION_CONFIG", "")

ctx, cancel := context.WithCancel(context.TODO())

Expand All @@ -49,6 +50,14 @@ func TestMainWithContext(t *testing.T) {
if env.Sink != "http://sink" {
t.Errorf("Expected sinkURI http://sink, got: %s", env.Sink)
}

leConfig, err := env.GetLeaderElectionConfig()
if err != nil {
t.Errorf("Expected no error: %v", err)
}
if leConfig.LeaderElect {
t.Errorf("Expected LeaderElect to be false, got: %t", leConfig.LeaderElect)
}
return &myAdapter{}
})

Expand Down