Skip to content

[server] Start the server as a Fx application #6793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 92 additions & 114 deletions cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,109 +21,25 @@
package cadence

import (
"context"
"fmt"
"log"
stdLog "log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/urfave/cli/v2"
"go.uber.org/fx"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/log/logfx"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
)

// validServices is the list of all valid cadence services
var validServices = service.ShortNames(service.List)

// startHandler is the handler for the cli start command
func startHandler(c *cli.Context) error {
env := getEnvironment(c)
zone := getZone(c)
configDir := getConfigDir(c)
rootDir := getRootDir(c)

log.Printf("Loading config; env=%v,zone=%v,configDir=%v\n", env, zone, configDir)

var cfg config.Config
err := config.Load(env, configDir, zone, &cfg)
if err != nil {
return fmt.Errorf("Config file corrupted: %w", err)
}
if cfg.Log.Level == "debug" {
log.Printf("config=%v", cfg.String())
}
if cfg.DynamicConfig.Client == "" {
cfg.DynamicConfigClient.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfigClient.Filepath)
} else {
cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfig.FileBased.Filepath)
}

if err := cfg.ValidateAndFillDefaults(); err != nil {
return fmt.Errorf("config validation failed: %w", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.Quorum); err != nil {
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
}
// sql schema version validation
if err := sql.VerifyCompatibleVersion(cfg.Persistence); err != nil {
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
}

var daemons []common.Daemon
services := getServices(c)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT)
for _, svc := range services {
server := newServer(svc, &cfg)
daemons = append(daemons, server)
server.Start()
}

<-sigc
log.Println("Received SIGTERM signal, initiating shutdown.")
for _, daemon := range daemons {
daemon.Stop()
}
return nil
}

func getEnvironment(c *cli.Context) string {
return strings.TrimSpace(c.String("env"))
}

func getZone(c *cli.Context) string {
return strings.TrimSpace(c.String("zone"))
}

// getServices parses the services arg from cli
// and returns a list of services to start
func getServices(c *cli.Context) []string {

val := strings.TrimSpace(c.String("services"))
tokens := strings.Split(val, ",")

if len(tokens) == 0 {
log.Fatal("list of services is empty")
}

for _, t := range tokens {
if !isValidService(t) {
log.Fatalf("invalid service `%v` in service list [%v]", t, val)
}
}

return tokens
}

func isValidService(in string) bool {
for _, s := range validServices {
if s == in {
Expand All @@ -133,31 +49,6 @@ func isValidService(in string) bool {
return false
}

func getConfigDir(c *cli.Context) string {
return constructPathIfNeed(getRootDir(c), c.String("config"))
}

func getRootDir(c *cli.Context) string {
dirpath := c.String("root")
if len(dirpath) == 0 {
cwd, err := os.Getwd()
if err != nil {
log.Fatalf("os.Getwd() failed, err=%v", err)
}
return cwd
}
return dirpath
}

// constructPathIfNeed would append the dir as the root dir
// when the file wasn't absolute path.
func constructPathIfNeed(dir string, file string) string {
if !filepath.IsAbs(file) {
return dir + "/" + file
}
return file
}

// BuildCLI is the main entry point for the cadence server
func BuildCLI(releaseVersion string, gitRevision string) *cli.App {
version := fmt.Sprintf(" Release version: %v \n"+
Expand Down Expand Up @@ -217,11 +108,98 @@ func BuildCLI(releaseVersion string, gitRevision string) *cli.App {
},
},
Action: func(c *cli.Context) error {
return startHandler(c)
fxApp := fx.New(
config.Module,
logfx.Module,
fx.Provide(func() appContext {
return appContext{
CfgContext: config.Context{
Environment: getEnvironment(c),
Zone: getZone(c),
},
ConfigDir: getConfigDir(c),
RootDir: getRootDir(c),
Services: getServices(c),
}
}),
Module,
)

ctx := context.Background()
if err := fxApp.Start(ctx); err != nil {
return err
}

// Block until FX receives a shutdown signal
<-fxApp.Done()

// Stop the application
return fxApp.Stop(ctx)
},
},
}

return app

}

type appContext struct {
fx.Out

CfgContext config.Context
ConfigDir string `name:"config-dir"`
RootDir string `name:"root-dir"`
Services []string `name:"services"`
}

func getEnvironment(c *cli.Context) string {
return strings.TrimSpace(c.String("env"))
}

func getZone(c *cli.Context) string {
return strings.TrimSpace(c.String("zone"))
}

// getServices parses the services arg from cli
// and returns a list of services to start
func getServices(c *cli.Context) []string {
val := strings.TrimSpace(c.String("services"))
tokens := strings.Split(val, ",")

if len(tokens) == 0 {
stdLog.Fatal("list of services is empty")
}

for _, t := range tokens {
if !isValidService(t) {
stdLog.Fatalf("invalid service `%v` in service list [%v]", t, val)
}
}

return tokens
}

func getConfigDir(c *cli.Context) string {
return constructPathIfNeed(getRootDir(c), c.String("config"))
}

func getRootDir(c *cli.Context) string {
dirpath := c.String("root")
if len(dirpath) == 0 {
cwd, err := os.Getwd()
if err != nil {
stdLog.Fatalf("os.Getwd() failed, err=%v", err)
}
return cwd
}
return dirpath
}

// constructPathIfNeed would append the dir as the root dir
// when the file wasn't absolute path.
func constructPathIfNeed(dir string, file string) string {
if !filepath.IsAbs(file) {
return dir + "/" + file
}
return file
}
113 changes: 113 additions & 0 deletions cmd/server/cadence/fx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package cadence

import (
"context"
"fmt"

"go.uber.org/fx"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
)

// Module provides a cadence server initialization with root components.
// AppParams allows to provide optional/overrides for implementation specific dependencies.
var Module = fx.Options(
fx.Provide(NewApp),
// empty invoke so fx won't drop the application from the dependencies.
fx.Invoke(func(a *App) {}),
)

type AppParams struct {
fx.In

RootDir string `name:"root-dir"`
Services []string `name:"services"`
AppContext config.Context
Config config.Config
Logger log.Logger
}

// NewApp created a new Application from pre initalized config and logger.
func NewApp(params AppParams) *App {
app := &App{
cfg: params.Config,
rootDir: params.RootDir,
logger: params.Logger,
services: params.Services,
}
return app
}

// App is a fx application that registers itself into fx.Lifecycle and runs.
// It is done implicitly, since it provides methods Start and Stop which are picked up by fx.
type App struct {
cfg config.Config
rootDir string
logger log.Logger

daemons []common.Daemon
services []string
}

func (a *App) Start(_ context.Context) error {
if a.cfg.DynamicConfig.Client == "" {
a.cfg.DynamicConfigClient.Filepath = constructPathIfNeed(a.rootDir, a.cfg.DynamicConfigClient.Filepath)
} else {
a.cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(a.rootDir, a.cfg.DynamicConfig.FileBased.Filepath)
}

if err := a.cfg.ValidateAndFillDefaults(); err != nil {
return fmt.Errorf("config validation failed: %w", err)
}
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(a.cfg.Persistence, gocql.Quorum); err != nil {
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
}
// sql schema version validation
if err := sql.VerifyCompatibleVersion(a.cfg.Persistence); err != nil {
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
}

var daemons []common.Daemon
for _, svc := range a.services {
server := newServer(svc, a.cfg, a.logger)
daemons = append(daemons, server)
server.Start()
}

return nil
}

func (a *App) Stop(ctx context.Context) error {
for _, daemon := range a.daemons {
daemon.Stop()
}
return nil
}
Loading