Skip to content

Commit 8139b04

Browse files
committed
Introduce metrics client to nosql stores
1 parent 07426f2 commit 8139b04

33 files changed

+144
-61
lines changed

cmd/server/cadence/fx_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ import (
3131
"github.com/uber/cadence/common/config"
3232
"github.com/uber/cadence/common/dynamicconfig/dynamicconfigfx"
3333
"github.com/uber/cadence/common/log/logfx"
34+
"github.com/uber/cadence/common/metrics/metricsfx"
3435
)
3536

3637
func TestFxDependencies(t *testing.T) {
3738
err := fx.ValidateApp(config.Module,
3839
logfx.Module,
40+
metricsfx.Module,
3941
dynamicconfigfx.Module,
4042
fx.Supply(appContext{
4143
CfgContext: config.Context{

common/dynamicconfig/configstore/config_store_client.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3737
"github.com/uber/cadence/common/log"
3838
"github.com/uber/cadence/common/log/tag"
39+
"github.com/uber/cadence/common/metrics"
3940
"github.com/uber/cadence/common/persistence"
4041
"github.com/uber/cadence/common/persistence/nosql"
4142
"github.com/uber/cadence/common/persistence/sql"
@@ -82,9 +83,11 @@ type cacheEntry struct {
8283
}
8384

8485
// NewConfigStoreClient creates a config store client
85-
func NewConfigStoreClient(clientCfg *csc.ClientConfig,
86+
func NewConfigStoreClient(
87+
clientCfg *csc.ClientConfig,
8688
persistenceCfg *config.Persistence,
8789
logger log.Logger,
90+
metricsClient metrics.Client,
8891
configType persistence.ConfigType,
8992
) (Client, error) {
9093
if persistenceCfg == nil {
@@ -101,7 +104,7 @@ func NewConfigStoreClient(clientCfg *csc.ClientConfig,
101104
clientCfg = defaultConfigValues
102105
}
103106

104-
client, err := newConfigStoreClient(clientCfg, &ds, logger, configType)
107+
client, err := newConfigStoreClient(clientCfg, &ds, logger, metricsClient, configType)
105108
if err != nil {
106109
return nil, err
107110
}
@@ -116,15 +119,16 @@ func newConfigStoreClient(
116119
clientCfg *csc.ClientConfig,
117120
ds *config.DataStore,
118121
logger log.Logger,
122+
metricsClient metrics.Client,
119123
configType persistence.ConfigType,
120124
) (*configStoreClient, error) {
121125
var store persistence.ConfigStore
122126
var err error
123127
switch {
124128
case ds.ShardedNoSQL != nil:
125-
store, err = nosql.NewNoSQLConfigStore(*ds.ShardedNoSQL, logger, nil)
129+
store, err = nosql.NewNoSQLConfigStore(*ds.ShardedNoSQL, logger, metricsClient, nil)
126130
case ds.NoSQL != nil:
127-
store, err = nosql.NewNoSQLConfigStore(*ds.NoSQL.ConvertToShardedNoSQLConfig(), logger, nil)
131+
store, err = nosql.NewNoSQLConfigStore(*ds.NoSQL.ConvertToShardedNoSQLConfig(), logger, metricsClient, nil)
128132
case ds.SQL != nil:
129133
var db sqlplugin.DB
130134
db, err = sql.NewSQLDB(ds.SQL)

common/dynamicconfig/configstore/config_store_client_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
c "github.com/uber/cadence/common/dynamicconfig/configstore/config"
3737
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3838
"github.com/uber/cadence/common/log"
39+
"github.com/uber/cadence/common/metrics"
3940
p "github.com/uber/cadence/common/persistence"
4041
"github.com/uber/cadence/common/persistence/nosql"
4142
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
@@ -318,7 +319,7 @@ func (s *configStoreClientSuite) SetupTest() {
318319
DefaultShard: config.NonShardedStoreName,
319320
Connections: connections,
320321
},
321-
}, log.NewNoop(), p.DynamicConfig)
322+
}, log.NewNoop(), metrics.NewNoopMetricsClient(), p.DynamicConfig)
322323
s.Require().NoError(err)
323324

324325
s.mockManager = p.NewMockConfigStoreManager(s.mockController)
@@ -984,7 +985,7 @@ func (s *configStoreClientSuite) TestValidateKeyDataBlobPair() {
984985
}
985986

986987
func (s *configStoreClientSuite) TestNewConfigStoreClient_NilPersistenceConfig() {
987-
_, err := NewConfigStoreClient(&c.ClientConfig{}, nil, log.NewNoop(), p.DynamicConfig)
988+
_, err := NewConfigStoreClient(&c.ClientConfig{}, nil, log.NewNoop(), metrics.NewNoopMetricsClient(), p.DynamicConfig)
988989
s.Require().Error(err, "should fail when persistence config is nil")
989990
s.Require().EqualError(err, "persistence cfg is nil")
990991
}
@@ -993,7 +994,7 @@ func (s *configStoreClientSuite) TestNewConfigStoreClient_MissingDefaultPersiste
993994
persistenceCfg := &config.Persistence{
994995
DataStores: map[string]config.DataStore{},
995996
}
996-
_, err := NewConfigStoreClient(&c.ClientConfig{}, persistenceCfg, log.NewNoop(), p.DynamicConfig)
997+
_, err := NewConfigStoreClient(&c.ClientConfig{}, persistenceCfg, log.NewNoop(), metrics.NewNoopMetricsClient(), p.DynamicConfig)
997998
s.Require().Error(err, "should fail when default persistence config is missing")
998999
s.Require().EqualError(err, "default persistence config missing")
9991000
}
@@ -1009,7 +1010,7 @@ func (s *configStoreClientSuite) TestNewConfigStoreClient_InvalidClientConfig()
10091010
PollInterval: time.Millisecond,
10101011
}
10111012
logger := log.NewNoop()
1012-
_, err := NewConfigStoreClient(clientCfg, persistenceCfg, logger, p.DynamicConfig)
1013+
_, err := NewConfigStoreClient(clientCfg, persistenceCfg, logger, metrics.NewNoopMetricsClient(), p.DynamicConfig)
10131014
s.Require().Error(err, "should fail when client config is invalid")
10141015
}
10151016

common/dynamicconfig/dynamicconfigfx/fx.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3434
"github.com/uber/cadence/common/log"
3535
"github.com/uber/cadence/common/log/tag"
36+
"github.com/uber/cadence/common/metrics"
3637
"github.com/uber/cadence/common/persistence"
3738
)
3839

@@ -43,9 +44,10 @@ var Module = fx.Options(fx.Provide(New))
4344
type Params struct {
4445
fx.In
4546

46-
Cfg config.Config
47-
Logger log.Logger
48-
RootDir string `name:"root-dir"`
47+
Cfg config.Config
48+
Logger log.Logger
49+
MetricsClient metrics.Client
50+
RootDir string `name:"root-dir"`
4951

5052
Lifecycle fx.Lifecycle
5153
}
@@ -85,6 +87,7 @@ func New(p Params) Result {
8587
&p.Cfg.DynamicConfig.ConfigStore,
8688
&p.Cfg.Persistence,
8789
p.Logger,
90+
p.MetricsClient,
8891
persistence.DynamicConfig,
8992
)
9093
case dynamicconfig.FileBasedClient:

common/dynamicconfig/dynamicconfigfx/fx_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,25 @@ import (
3131
"github.com/uber/cadence/common/config"
3232
"github.com/uber/cadence/common/dynamicconfig"
3333
"github.com/uber/cadence/common/log/testlogger"
34+
"github.com/uber/cadence/common/metrics"
3435
)
3536

3637
func TestModule(t *testing.T) {
3738
app := fxtest.New(t,
3839
testlogger.Module(t),
39-
fx.Provide(func() config.Config {
40-
return config.Config{
41-
ClusterGroupMetadata: &config.ClusterGroupMetadata{},
42-
}
43-
},
40+
fx.Provide(
41+
func() config.Config {
42+
return config.Config{
43+
ClusterGroupMetadata: &config.ClusterGroupMetadata{},
44+
}
45+
},
4446
func() fxRoot {
4547
return fxRoot{
4648
RootDir: "../../../",
4749
}
48-
}),
50+
},
51+
metrics.NewNoopMetricsClient,
52+
),
4953
Module,
5054
fx.Invoke(func(c dynamicconfig.Client) {}),
5155
)

common/metrics/defs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,9 @@ const (
21462146
PersistenceSampledCounterPerDomain
21472147
PersistenceEmptyResponseCounterPerDomain
21482148

2149+
NoSQLShardStoreReadFromOriginalColumnCounter
2150+
NoSQLShardStoreReadFromDataBlobCounter
2151+
21492152
CadenceClientRequests
21502153
CadenceClientFailures
21512154
CadenceClientLatency
@@ -2874,6 +2877,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
28742877
PersistenceErrDBUnavailableCounterPerDomain: {metricName: "persistence_errors_db_unavailable_per_domain", metricRollupName: "persistence_errors_db_unavailable", metricType: Counter},
28752878
PersistenceSampledCounterPerDomain: {metricName: "persistence_sampled_per_domain", metricRollupName: "persistence_sampled", metricType: Counter},
28762879
PersistenceEmptyResponseCounterPerDomain: {metricName: "persistence_empty_response_per_domain", metricRollupName: "persistence_empty_response", metricType: Counter},
2880+
NoSQLShardStoreReadFromOriginalColumnCounter: {metricName: "nosql_shard_store_read_from_original_column", metricType: Counter},
2881+
NoSQLShardStoreReadFromDataBlobCounter: {metricName: "nosql_shard_store_read_from_data_blob", metricType: Counter},
28772882
CadenceClientRequests: {metricName: "cadence_client_requests", metricType: Counter},
28782883
CadenceClientFailures: {metricName: "cadence_client_errors", metricType: Counter},
28792884
CadenceClientLatency: {metricName: "cadence_client_latency", metricType: Timer},

common/persistence/client/factory.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,11 +529,11 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
529529
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
530530
taskSerializer := serialization.NewTaskSerializer(parser)
531531
shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig()
532-
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, taskSerializer, parser, f.dc)
532+
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
533533
case defaultCfg.ShardedNoSQL != nil:
534534
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
535535
taskSerializer := serialization.NewTaskSerializer(parser)
536-
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, taskSerializer, parser, f.dc)
536+
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
537537
case defaultCfg.SQL != nil:
538538
if defaultCfg.SQL.EncodingType == "" {
539539
defaultCfg.SQL.EncodingType = string(constants.EncodingTypeThriftRW)
@@ -579,7 +579,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
579579
parser := getParser(f.logger, constants.EncodingTypeThriftRW, constants.EncodingTypeThriftRW)
580580
taskSerializer := serialization.NewTaskSerializer(parser)
581581
shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig()
582-
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, taskSerializer, parser, f.dc)
582+
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.metricsClient, taskSerializer, parser, f.dc)
583583
case visibilityCfg.SQL != nil:
584584
var decodingTypes []constants.EncodingType
585585
for _, dt := range visibilityCfg.SQL.DecodingTypes {

common/persistence/nosql/factory.go

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/uber/cadence/common/config"
2727
"github.com/uber/cadence/common/log"
28+
"github.com/uber/cadence/common/metrics"
2829
"github.com/uber/cadence/common/persistence"
2930
"github.com/uber/cadence/common/persistence/serialization"
3031
)
@@ -36,6 +37,7 @@ type (
3637
cfg config.ShardedNoSQL
3738
clusterName string
3839
logger log.Logger
40+
metricsClient metrics.Client
3941
execStoreFactory *executionStoreFactory
4042
dc *persistence.DynamicConfiguration
4143
parser serialization.Parser
@@ -51,11 +53,12 @@ type (
5153

5254
// NewFactory returns an instance of a factory object which can be used to create
5355
// datastores that are backed by cassandra
54-
func NewFactory(cfg config.ShardedNoSQL, clusterName string, logger log.Logger, taskSerializer serialization.TaskSerializer, parser serialization.Parser, dc *persistence.DynamicConfiguration) *Factory {
56+
func NewFactory(cfg config.ShardedNoSQL, clusterName string, logger log.Logger, metricsClient metrics.Client, taskSerializer serialization.TaskSerializer, parser serialization.Parser, dc *persistence.DynamicConfiguration) *Factory {
5557
return &Factory{
5658
cfg: cfg,
5759
clusterName: clusterName,
5860
logger: logger,
61+
metricsClient: metricsClient,
5962
taskSerializer: taskSerializer,
6063
dc: dc,
6164
parser: parser,
@@ -64,22 +67,22 @@ func NewFactory(cfg config.ShardedNoSQL, clusterName string, logger log.Logger,
6467

6568
// NewTaskStore returns a new task store
6669
func (f *Factory) NewTaskStore() (persistence.TaskStore, error) {
67-
return newNoSQLTaskStore(f.cfg, f.logger, f.dc)
70+
return newNoSQLTaskStore(f.cfg, f.logger, f.metricsClient, f.dc)
6871
}
6972

7073
// NewShardStore returns a new shard store
7174
func (f *Factory) NewShardStore() (persistence.ShardStore, error) {
72-
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger, f.dc, f.parser)
75+
return newNoSQLShardStore(f.cfg, f.clusterName, f.logger, f.metricsClient, f.dc, f.parser)
7376
}
7477

7578
// NewHistoryStore returns a new history store
7679
func (f *Factory) NewHistoryStore() (persistence.HistoryStore, error) {
77-
return newNoSQLHistoryStore(f.cfg, f.logger, f.dc)
80+
return newNoSQLHistoryStore(f.cfg, f.logger, f.metricsClient, f.dc)
7881
}
7982

8083
// NewDomainStore returns a metadata store that understands only v2
8184
func (f *Factory) NewDomainStore() (persistence.DomainStore, error) {
82-
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger, f.dc)
85+
return newNoSQLDomainStore(f.cfg, f.clusterName, f.logger, f.metricsClient, f.dc)
8386
}
8487

8588
// NewExecutionStore returns an ExecutionStore for a given shardID
@@ -93,17 +96,17 @@ func (f *Factory) NewExecutionStore(shardID int) (persistence.ExecutionStore, er
9396

9497
// NewVisibilityStore returns a visibility store
9598
func (f *Factory) NewVisibilityStore(sortByCloseTime bool) (persistence.VisibilityStore, error) {
96-
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger, f.dc)
99+
return newNoSQLVisibilityStore(sortByCloseTime, f.cfg, f.logger, f.metricsClient, f.dc)
97100
}
98101

99102
// NewQueue returns a new queue backed by cassandra
100103
func (f *Factory) NewQueue(queueType persistence.QueueType) (persistence.Queue, error) {
101-
return newNoSQLQueueStore(f.cfg, f.logger, queueType, f.dc)
104+
return newNoSQLQueueStore(f.cfg, f.logger, f.metricsClient, queueType, f.dc)
102105
}
103106

104107
// NewConfigStore returns a new config store
105108
func (f *Factory) NewConfigStore() (persistence.ConfigStore, error) {
106-
return NewNoSQLConfigStore(f.cfg, f.logger, f.dc)
109+
return NewNoSQLConfigStore(f.cfg, f.logger, f.metricsClient, f.dc)
107110
}
108111

109112
// Close closes the factory
@@ -128,7 +131,7 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
128131
return f.execStoreFactory, nil
129132
}
130133

131-
factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.taskSerializer, f.dc)
134+
factory, err := newExecutionStoreFactory(f.cfg, f.logger, f.metricsClient, f.taskSerializer, f.dc)
132135
if err != nil {
133136
return nil, err
134137
}
@@ -140,10 +143,11 @@ func (f *Factory) executionStoreFactory() (*executionStoreFactory, error) {
140143
func newExecutionStoreFactory(
141144
cfg config.ShardedNoSQL,
142145
logger log.Logger,
146+
metricsClient metrics.Client,
143147
taskSerializer serialization.TaskSerializer,
144148
dc *persistence.DynamicConfiguration,
145149
) (*executionStoreFactory, error) {
146-
s, err := newShardedNosqlStore(cfg, logger, dc)
150+
s, err := newShardedNosqlStore(cfg, logger, metricsClient, dc)
147151
if err != nil {
148152
return nil, err
149153
}

common/persistence/nosql/nosql_config_store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/uber/cadence/common/config"
2828
"github.com/uber/cadence/common/log"
29+
"github.com/uber/cadence/common/metrics"
2930
"github.com/uber/cadence/common/persistence"
3031
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
3132
)
@@ -37,9 +38,10 @@ type nosqlConfigStore struct {
3738
func NewNoSQLConfigStore(
3839
cfg config.ShardedNoSQL,
3940
logger log.Logger,
41+
metricsClient metrics.Client,
4042
dc *persistence.DynamicConfiguration,
4143
) (persistence.ConfigStore, error) {
42-
shardedStore, err := newShardedNosqlStore(cfg, logger, dc)
44+
shardedStore, err := newShardedNosqlStore(cfg, logger, metricsClient, dc)
4345
if err != nil {
4446
return nil, err
4547
}

common/persistence/nosql/nosql_domain_store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/uber/cadence/common/config"
3030
"github.com/uber/cadence/common/constants"
3131
"github.com/uber/cadence/common/log"
32+
"github.com/uber/cadence/common/metrics"
3233
"github.com/uber/cadence/common/persistence"
3334
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
3435
"github.com/uber/cadence/common/types"
@@ -44,9 +45,10 @@ func newNoSQLDomainStore(
4445
cfg config.ShardedNoSQL,
4546
currentClusterName string,
4647
logger log.Logger,
48+
metricsClient metrics.Client,
4749
dc *persistence.DynamicConfiguration,
4850
) (persistence.DomainStore, error) {
49-
shardedStore, err := newShardedNosqlStore(cfg, logger, dc)
51+
shardedStore, err := newShardedNosqlStore(cfg, logger, metricsClient, dc)
5052
if err != nil {
5153
return nil, err
5254
}

common/persistence/nosql/nosql_history_store.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/uber/cadence/common/constants"
2929
"github.com/uber/cadence/common/log"
3030
"github.com/uber/cadence/common/log/tag"
31+
"github.com/uber/cadence/common/metrics"
3132
"github.com/uber/cadence/common/persistence"
3233
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
3334
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
@@ -42,9 +43,10 @@ type nosqlHistoryStore struct {
4243
func newNoSQLHistoryStore(
4344
cfg config.ShardedNoSQL,
4445
logger log.Logger,
46+
metricsClient metrics.Client,
4547
dc *persistence.DynamicConfiguration,
4648
) (persistence.HistoryStore, error) {
47-
s, err := newShardedNosqlStore(cfg, logger, dc)
49+
s, err := newShardedNosqlStore(cfg, logger, metricsClient, dc)
4850
if err != nil {
4951
return nil, err
5052
}

common/persistence/nosql/nosql_history_store_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/uber/cadence/common"
3737
"github.com/uber/cadence/common/constants"
3838
"github.com/uber/cadence/common/log"
39+
"github.com/uber/cadence/common/metrics"
3940
"github.com/uber/cadence/common/persistence"
4041
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
4142
"github.com/uber/cadence/common/types"
@@ -100,7 +101,7 @@ func TestNewNoSQLHistoryStore(t *testing.T) {
100101
registerCassandraMock(t)
101102
cfg := getValidShardedNoSQLConfig()
102103

103-
store, err := newNoSQLHistoryStore(cfg, log.NewNoop(), nil)
104+
store, err := newNoSQLHistoryStore(cfg, log.NewNoop(), metrics.NewNoopMetricsClient(), nil)
104105
assert.NoError(t, err)
105106
assert.NotNil(t, store)
106107
}

0 commit comments

Comments
 (0)