Skip to content

Commit 3b22c25

Browse files
authored
[server] Start the server as a Fx application (#6793)
* [server] Start the server as a Fx application
1 parent ab2e31b commit 3b22c25

File tree

10 files changed

+522
-144
lines changed

10 files changed

+522
-144
lines changed

cmd/server/cadence/cadence.go

Lines changed: 92 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -21,109 +21,25 @@
2121
package cadence
2222

2323
import (
24+
"context"
2425
"fmt"
25-
"log"
26+
stdLog "log"
2627
"os"
27-
"os/signal"
2828
"path/filepath"
2929
"strings"
30-
"syscall"
3130

3231
"github.com/urfave/cli/v2"
32+
"go.uber.org/fx"
3333

34-
"github.com/uber/cadence/common"
3534
"github.com/uber/cadence/common/client"
3635
"github.com/uber/cadence/common/config"
37-
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
36+
"github.com/uber/cadence/common/log/logfx"
3837
"github.com/uber/cadence/common/service"
39-
"github.com/uber/cadence/tools/cassandra"
40-
"github.com/uber/cadence/tools/sql"
4138
)
4239

4340
// validServices is the list of all valid cadence services
4441
var validServices = service.ShortNames(service.List)
4542

46-
// startHandler is the handler for the cli start command
47-
func startHandler(c *cli.Context) error {
48-
env := getEnvironment(c)
49-
zone := getZone(c)
50-
configDir := getConfigDir(c)
51-
rootDir := getRootDir(c)
52-
53-
log.Printf("Loading config; env=%v,zone=%v,configDir=%v\n", env, zone, configDir)
54-
55-
var cfg config.Config
56-
err := config.Load(env, configDir, zone, &cfg)
57-
if err != nil {
58-
return fmt.Errorf("Config file corrupted: %w", err)
59-
}
60-
if cfg.Log.Level == "debug" {
61-
log.Printf("config=%v", cfg.String())
62-
}
63-
if cfg.DynamicConfig.Client == "" {
64-
cfg.DynamicConfigClient.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfigClient.Filepath)
65-
} else {
66-
cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(rootDir, cfg.DynamicConfig.FileBased.Filepath)
67-
}
68-
69-
if err := cfg.ValidateAndFillDefaults(); err != nil {
70-
return fmt.Errorf("config validation failed: %w", err)
71-
}
72-
// cassandra schema version validation
73-
if err := cassandra.VerifyCompatibleVersion(cfg.Persistence, gocql.Quorum); err != nil {
74-
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
75-
}
76-
// sql schema version validation
77-
if err := sql.VerifyCompatibleVersion(cfg.Persistence); err != nil {
78-
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
79-
}
80-
81-
var daemons []common.Daemon
82-
services := getServices(c)
83-
sigc := make(chan os.Signal, 1)
84-
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT)
85-
for _, svc := range services {
86-
server := newServer(svc, &cfg)
87-
daemons = append(daemons, server)
88-
server.Start()
89-
}
90-
91-
<-sigc
92-
log.Println("Received SIGTERM signal, initiating shutdown.")
93-
for _, daemon := range daemons {
94-
daemon.Stop()
95-
}
96-
return nil
97-
}
98-
99-
func getEnvironment(c *cli.Context) string {
100-
return strings.TrimSpace(c.String("env"))
101-
}
102-
103-
func getZone(c *cli.Context) string {
104-
return strings.TrimSpace(c.String("zone"))
105-
}
106-
107-
// getServices parses the services arg from cli
108-
// and returns a list of services to start
109-
func getServices(c *cli.Context) []string {
110-
111-
val := strings.TrimSpace(c.String("services"))
112-
tokens := strings.Split(val, ",")
113-
114-
if len(tokens) == 0 {
115-
log.Fatal("list of services is empty")
116-
}
117-
118-
for _, t := range tokens {
119-
if !isValidService(t) {
120-
log.Fatalf("invalid service `%v` in service list [%v]", t, val)
121-
}
122-
}
123-
124-
return tokens
125-
}
126-
12743
func isValidService(in string) bool {
12844
for _, s := range validServices {
12945
if s == in {
@@ -133,31 +49,6 @@ func isValidService(in string) bool {
13349
return false
13450
}
13551

136-
func getConfigDir(c *cli.Context) string {
137-
return constructPathIfNeed(getRootDir(c), c.String("config"))
138-
}
139-
140-
func getRootDir(c *cli.Context) string {
141-
dirpath := c.String("root")
142-
if len(dirpath) == 0 {
143-
cwd, err := os.Getwd()
144-
if err != nil {
145-
log.Fatalf("os.Getwd() failed, err=%v", err)
146-
}
147-
return cwd
148-
}
149-
return dirpath
150-
}
151-
152-
// constructPathIfNeed would append the dir as the root dir
153-
// when the file wasn't absolute path.
154-
func constructPathIfNeed(dir string, file string) string {
155-
if !filepath.IsAbs(file) {
156-
return dir + "/" + file
157-
}
158-
return file
159-
}
160-
16152
// BuildCLI is the main entry point for the cadence server
16253
func BuildCLI(releaseVersion string, gitRevision string) *cli.App {
16354
version := fmt.Sprintf(" Release version: %v \n"+
@@ -217,11 +108,98 @@ func BuildCLI(releaseVersion string, gitRevision string) *cli.App {
217108
},
218109
},
219110
Action: func(c *cli.Context) error {
220-
return startHandler(c)
111+
fxApp := fx.New(
112+
config.Module,
113+
logfx.Module,
114+
fx.Provide(func() appContext {
115+
return appContext{
116+
CfgContext: config.Context{
117+
Environment: getEnvironment(c),
118+
Zone: getZone(c),
119+
},
120+
ConfigDir: getConfigDir(c),
121+
RootDir: getRootDir(c),
122+
Services: getServices(c),
123+
}
124+
}),
125+
Module,
126+
)
127+
128+
ctx := context.Background()
129+
if err := fxApp.Start(ctx); err != nil {
130+
return err
131+
}
132+
133+
// Block until FX receives a shutdown signal
134+
<-fxApp.Done()
135+
136+
// Stop the application
137+
return fxApp.Stop(ctx)
221138
},
222139
},
223140
}
224141

225142
return app
226143

227144
}
145+
146+
type appContext struct {
147+
fx.Out
148+
149+
CfgContext config.Context
150+
ConfigDir string `name:"config-dir"`
151+
RootDir string `name:"root-dir"`
152+
Services []string `name:"services"`
153+
}
154+
155+
func getEnvironment(c *cli.Context) string {
156+
return strings.TrimSpace(c.String("env"))
157+
}
158+
159+
func getZone(c *cli.Context) string {
160+
return strings.TrimSpace(c.String("zone"))
161+
}
162+
163+
// getServices parses the services arg from cli
164+
// and returns a list of services to start
165+
func getServices(c *cli.Context) []string {
166+
val := strings.TrimSpace(c.String("services"))
167+
tokens := strings.Split(val, ",")
168+
169+
if len(tokens) == 0 {
170+
stdLog.Fatal("list of services is empty")
171+
}
172+
173+
for _, t := range tokens {
174+
if !isValidService(t) {
175+
stdLog.Fatalf("invalid service `%v` in service list [%v]", t, val)
176+
}
177+
}
178+
179+
return tokens
180+
}
181+
182+
func getConfigDir(c *cli.Context) string {
183+
return constructPathIfNeed(getRootDir(c), c.String("config"))
184+
}
185+
186+
func getRootDir(c *cli.Context) string {
187+
dirpath := c.String("root")
188+
if len(dirpath) == 0 {
189+
cwd, err := os.Getwd()
190+
if err != nil {
191+
stdLog.Fatalf("os.Getwd() failed, err=%v", err)
192+
}
193+
return cwd
194+
}
195+
return dirpath
196+
}
197+
198+
// constructPathIfNeed would append the dir as the root dir
199+
// when the file wasn't absolute path.
200+
func constructPathIfNeed(dir string, file string) string {
201+
if !filepath.IsAbs(file) {
202+
return dir + "/" + file
203+
}
204+
return file
205+
}

cmd/server/cadence/fx.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package cadence
24+
25+
import (
26+
"context"
27+
"fmt"
28+
29+
"go.uber.org/fx"
30+
31+
"github.com/uber/cadence/common"
32+
"github.com/uber/cadence/common/config"
33+
"github.com/uber/cadence/common/log"
34+
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
35+
"github.com/uber/cadence/tools/cassandra"
36+
"github.com/uber/cadence/tools/sql"
37+
)
38+
39+
// Module provides a cadence server initialization with root components.
40+
// AppParams allows to provide optional/overrides for implementation specific dependencies.
41+
var Module = fx.Options(
42+
fx.Provide(NewApp),
43+
// empty invoke so fx won't drop the application from the dependencies.
44+
fx.Invoke(func(a *App) {}),
45+
)
46+
47+
type AppParams struct {
48+
fx.In
49+
50+
RootDir string `name:"root-dir"`
51+
Services []string `name:"services"`
52+
AppContext config.Context
53+
Config config.Config
54+
Logger log.Logger
55+
}
56+
57+
// NewApp created a new Application from pre initalized config and logger.
58+
func NewApp(params AppParams) *App {
59+
app := &App{
60+
cfg: params.Config,
61+
rootDir: params.RootDir,
62+
logger: params.Logger,
63+
services: params.Services,
64+
}
65+
return app
66+
}
67+
68+
// App is a fx application that registers itself into fx.Lifecycle and runs.
69+
// It is done implicitly, since it provides methods Start and Stop which are picked up by fx.
70+
type App struct {
71+
cfg config.Config
72+
rootDir string
73+
logger log.Logger
74+
75+
daemons []common.Daemon
76+
services []string
77+
}
78+
79+
func (a *App) Start(_ context.Context) error {
80+
if a.cfg.DynamicConfig.Client == "" {
81+
a.cfg.DynamicConfigClient.Filepath = constructPathIfNeed(a.rootDir, a.cfg.DynamicConfigClient.Filepath)
82+
} else {
83+
a.cfg.DynamicConfig.FileBased.Filepath = constructPathIfNeed(a.rootDir, a.cfg.DynamicConfig.FileBased.Filepath)
84+
}
85+
86+
if err := a.cfg.ValidateAndFillDefaults(); err != nil {
87+
return fmt.Errorf("config validation failed: %w", err)
88+
}
89+
// cassandra schema version validation
90+
if err := cassandra.VerifyCompatibleVersion(a.cfg.Persistence, gocql.Quorum); err != nil {
91+
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
92+
}
93+
// sql schema version validation
94+
if err := sql.VerifyCompatibleVersion(a.cfg.Persistence); err != nil {
95+
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
96+
}
97+
98+
var daemons []common.Daemon
99+
for _, svc := range a.services {
100+
server := newServer(svc, a.cfg, a.logger)
101+
daemons = append(daemons, server)
102+
server.Start()
103+
}
104+
105+
return nil
106+
}
107+
108+
func (a *App) Stop(ctx context.Context) error {
109+
for _, daemon := range a.daemons {
110+
daemon.Stop()
111+
}
112+
return nil
113+
}

0 commit comments

Comments
 (0)