Skip to content

Add sharddistributor outbounds #6616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type (
// To use Async APIs for a domain first specify the queue using Admin API.
// Either refer to one of the predefined queues in this config or alternatively specify the queue details inline in the API call.
AsyncWorkflowQueues map[string]AsyncWorkflowQueueProvider `yaml:"asyncWorkflowQueues"`
// ShardDistributorClient is the config for shard distributor client
// Shard distributor is used to distribute shards across multiple cadence service instances
// Note: This is not recommended for use, it's still experimental
ShardDistributorClient ShardDistributorClient `yaml:"shardDistributorClient"`
}

// Membership holds peer provider configuration.
Expand Down Expand Up @@ -592,6 +596,12 @@ type (
URI string `yaml:"URI"`
}

// ShardDistributorClient contains the config items for shard distributor
ShardDistributorClient struct {
// The host and port of the shard distributor server
HostPort string `yaml:"hostPort"`
}

// YamlNode is a lazy-unmarshaler, because *yaml.Node only exists in gopkg.in/yaml.v3, not v2,
// and go.uber.org/config currently uses only v2.
YamlNode struct {
Expand Down
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,8 @@ func (v FailoverType) String() string {
return "Unknown"
}
}

const (
ShardModeHashRing = "hash-ring"
ShardModeShardDistributor = "shard-distributor"
)
20 changes: 20 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,23 @@ func IsGRPCOutbound(config transport.ClientConfig) bool {
}
return namer.TransportName() == grpc.TransportName
}
func NewSingleGRPCOutboundBuilder(outboundName string, serviceName string, address string) OutboundsBuilder {
return singleGRPCOutbound{outboundName, serviceName, address}
}

type singleGRPCOutbound struct {
outboundName string
serviceName string
address string
}

func (b singleGRPCOutbound) Build(grpc *grpc.Transport, _ *tchannel.Transport) (*Outbounds, error) {
return &Outbounds{
Outbounds: yarpc.Outbounds{
b.outboundName: {
ServiceName: b.serviceName,
Unary: grpc.NewSingleOutbound(b.address),
},
},
}, nil
}
12 changes: 12 additions & 0 deletions common/rpc/outbounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func TestDirectOutbound(t *testing.T) {
assert.NotNil(t, outbounds["cadence-history"].Unary)
}

func TestSingleGRPCOutbound(t *testing.T) {
grpc := &grpc.Transport{}
tchannel := &tchannel.Transport{}

builder := NewSingleGRPCOutboundBuilder("grpc-only-out", "grpc-service-name", "http://example.com:1234")

outBound, err := builder.Build(grpc, tchannel)
assert.NoError(t, err)
assert.Equal(t, "grpc-service-name", outBound.Outbounds["grpc-only-out"].ServiceName)
assert.NotNil(t, outBound.Outbounds["grpc-only-out"].Unary)
}

func TestIsGRPCOutboud(t *testing.T) {
assert.True(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&grpc.Transport{}).NewSingleOutbound("localhost:1234")}}))
assert.False(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&tchannel.Transport{}).NewSingleOutbound("localhost:1234")}}))
Expand Down
57 changes: 33 additions & 24 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,31 +133,40 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll
}
}

return Params{
ServiceName: serviceName,
HTTP: http,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
OutboundsBuilder: CombineOutbounds(
NewDirectOutboundBuilder(
service.History,
enableGRPCOutbound,
outboundTLS[service.History],
NewDirectPeerChooserFactory(service.History, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
NewDirectOutboundBuilder(
service.Matching,
enableGRPCOutbound,
outboundTLS[service.Matching],
NewDirectPeerChooserFactory(service.Matching, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
publicClientOutbound,
outboundsBuilders := []OutboundsBuilder{
NewDirectOutboundBuilder(
service.History,
enableGRPCOutbound,
outboundTLS[service.History],
NewDirectPeerChooserFactory(service.History, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
NewDirectOutboundBuilder(
service.Matching,
enableGRPCOutbound,
outboundTLS[service.Matching],
NewDirectPeerChooserFactory(service.Matching, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
InboundTLS: inboundTLS,
OutboundTLS: outboundTLS,
publicClientOutbound,
}
if config.ShardDistributorClient.HostPort != "" {
outboundsBuilders = append(outboundsBuilders, NewSingleGRPCOutboundBuilder(
service.ShardDistributor,
service.ShardDistributor,
config.ShardDistributorClient.HostPort,
))
}

return Params{
ServiceName: serviceName,
HTTP: http,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
OutboundsBuilder: CombineOutbounds(outboundsBuilders...),
InboundTLS: inboundTLS,
OutboundTLS: outboundTLS,
InboundMiddleware: yarpc.InboundMiddleware{
// order matters: ForwardPartitionConfigMiddleware must be applied after ClientPartitionConfigMiddleware
Unary: yarpc.UnaryInboundMiddleware(&PinotComparatorMiddleware{}, &InboundMetricsMiddleware{}, &ClientPartitionConfigMiddleware{}, &ForwardPartitionConfigMiddleware{}),
Expand Down
5 changes: 3 additions & 2 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func TestNewParams(t *testing.T) {
dc := dynamicconfig.NewNopCollection()
makeConfig := func(svc config.Service) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
Services: map[string]config.Service{"frontend": svc}}
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
ShardDistributorClient: config.ShardDistributorClient{HostPort: "localhost:9998"},
Services: map[string]config.Service{"frontend": svc}}
}
logger := testlogger.New(t)
metricsCl := metrics.NewNoopMetricsClient()
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,6 @@ dynamicconfig:
blobstore:
filestore:
outputDirectory: "/tmp/blobstore"

shardDistributorClient:
hostPort: "localhost:7943"
23 changes: 2 additions & 21 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
"go.uber.org/cadence/compatibility"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"

adminClient "github.com/uber/cadence/client/admin"
frontendClient "github.com/uber/cadence/client/frontend"
Expand Down Expand Up @@ -1138,8 +1136,8 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo

// For integration tests to generate client out of the same outbound.
OutboundsBuilder: rpc.CombineOutbounds(
&singleGRPCOutbound{testOutboundName(serviceName), serviceName, grpcAddress},
&singleGRPCOutbound{rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress},
rpc.NewSingleGRPCOutboundBuilder(testOutboundName(serviceName), serviceName, grpcAddress),
rpc.NewSingleGRPCOutboundBuilder(rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress),
rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger)),
rpc.NewDirectOutboundBuilder(service.History, true, nil, directOutboundPCF, directConnRetainFn),
rpc.NewDirectOutboundBuilder(service.Matching, true, nil, directOutboundPCF, directConnRetainFn),
Expand All @@ -1152,23 +1150,6 @@ func testOutboundName(name string) string {
return "test-" + name
}

type singleGRPCOutbound struct {
outboundName string
serviceName string
address string
}

func (b singleGRPCOutbound) Build(grpc *grpc.Transport, _ *tchannel.Transport) (*rpc.Outbounds, error) {
return &rpc.Outbounds{
Outbounds: yarpc.Outbounds{
b.outboundName: {
ServiceName: b.serviceName,
Unary: grpc.NewSingleOutbound(b.address),
},
},
}, nil
}

type versionMiddleware struct {
}

Expand Down
Loading