Skip to content

[Experimental] Signing write requests #5430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397
* [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,11 @@ ha_tracker:
# CLI flag: -distributor.extend-writes
[extend_writes: <boolean> | default = true]

# EXPERIMENTAL: If enabled, sign the write request between distributors and
# ingesters.
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ require (
sigs.k8s.io/yaml v1.3.0
)

require github.com/google/go-cmp v0.5.9
require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/go-cmp v0.5.9
)

require (
cloud.google.com/go v0.110.0 // indirect
Expand Down Expand Up @@ -109,7 +112,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/cortexproject/cortex/pkg/alertmanager"
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
"github.com/cortexproject/cortex/pkg/api"
Expand Down Expand Up @@ -355,6 +357,7 @@ func New(cfg Config) (*Cortex, error) {

cortex.setupThanosTracing()
cortex.setupGRPCHeaderForwarding()
cortex.setupRequestSigning()

if err := cortex.setupModuleManager(); err != nil {
return nil, err
Expand All @@ -379,6 +382,13 @@ func (t *Cortex) setupGRPCHeaderForwarding() {
}
}

func (t *Cortex) setupRequestSigning() {
if t.Cfg.Distributor.SignWriteRequestsEnabled {
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)
}
}

// Run starts Cortex running, and blocks until a Cortex stops.
func (t *Cortex) Run() error {
// Register custom process metrics.
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
107 changes: 107 additions & 0 deletions pkg/cortexpb/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package cortexpb

import (
"context"
"fmt"
"strconv"
"sync"

"github.com/cespare/xxhash/v2"

"github.com/cortexproject/cortex/pkg/tenant"
)

const maxBufferSize = 1024
const signVersion = "v1"

var signerPool = sync.Pool{
New: func() interface{} {
return newSigner()
},
}

type signer struct {
h *xxhash.Digest
b []byte
optimized bool
}

func newSigner() *signer {
s := &signer{
h: xxhash.New(),
b: make([]byte, 0, maxBufferSize),
}
s.Reset()
return s
}

func (s *signer) Reset() {
s.h.Reset()
s.b = s.b[:0]
s.optimized = true
}

func (s *signer) WriteString(val string) {
switch {
case !s.optimized:
_, _ = s.h.WriteString(val)
case len(s.b)+len(val) > cap(s.b):
// If labels val does not fit in the []byte we fall back to not allocate the whole entry.
_, _ = s.h.Write(s.b)
_, _ = s.h.WriteString(val)
s.optimized = false
default:
// Use xxhash.Sum64(b) for fast path as it's faster.
s.b = append(s.b, val...)
}
}

func (s *signer) Sum64() uint64 {
if s.optimized {
return xxhash.Sum64(s.b)
}

return s.h.Sum64()
}

func (w *WriteRequest) VerifySign(ctx context.Context, signature string) (bool, error) {
s, err := w.Sign(ctx)
return s == signature, err
}

func (w *WriteRequest) Sign(ctx context.Context) (string, error) {
u, err := tenant.TenantID(ctx)
if err != nil {
return "", err
}

s := signerPool.Get().(*signer)
defer func() {
s.Reset()
signerPool.Put(s)
}()
s.WriteString(u)

for _, md := range w.Metadata {
s.WriteString(strconv.Itoa(int(md.Type)))
s.WriteString(md.MetricFamilyName)
s.WriteString(md.Help)
s.WriteString(md.Unit)
}

for _, ts := range w.Timeseries {
for _, lbl := range ts.Labels {
s.WriteString(lbl.Name)
s.WriteString(lbl.Value)
}

for _, ex := range ts.Exemplars {
for _, lbl := range ex.Labels {
s.WriteString(lbl.Name)
s.WriteString(lbl.Value)
}
}
}

return fmt.Sprintf("%v/%v", signVersion, s.Sum64()), nil
}
100 changes: 100 additions & 0 deletions pkg/cortexpb/extensions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package cortexpb

import (
"context"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
)

func TestWriteRequest_Sign(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

tests := map[string]struct {
w *WriteRequest
expectedSign string
}{
"small write with exemplar": {
w: createWriteRequest(10, true, "family1", "help1", "unit"),
expectedSign: "v1/9125893422459502203",
},
"small write with exemplar and changed md": {
w: createWriteRequest(10, true, "family2", "help1", "unit"),
expectedSign: "v1/18044786562323437562",
},
"small write without exemplar": {
w: createWriteRequest(10, false, "family1", "help1", "unit"),
expectedSign: "v1/7697478040597284323",
},
"big write with exemplar": {
w: createWriteRequest(10000, true, "family1", "help1", "unit"),
expectedSign: "v1/18402783317092766507",
},
"big write without exemplar": {
w: createWriteRequest(10000, false, "family1", "help1", "unit"),
expectedSign: "v1/14973071954515615892",
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
// running multiple times in parallel to make sure no race
itNumber := 1000
wg := sync.WaitGroup{}
wg.Add(itNumber)
for i := 0; i < itNumber; i++ {
go func() {
defer wg.Done()
s, err := tc.w.Sign(ctx)
require.NoError(t, err)
// Make sure this sign doesn't change
require.Equal(t, tc.expectedSign, s)
}()
}
wg.Wait()
})
}
}

func createWriteRequest(numTs int, exemplar bool, family string, help string, unit string) *WriteRequest {
w := &WriteRequest{}
w.Metadata = []*MetricMetadata{
{
MetricFamilyName: family,
Help: help,
Unit: unit,
},
}

for i := 0; i < numTs; i++ {
w.Timeseries = append(w.Timeseries, PreallocTimeseries{
TimeSeries: &TimeSeries{
Labels: []LabelAdapter{
{
Name: fmt.Sprintf("Name-%v", i),
Value: fmt.Sprintf("Value-%v", i),
},
},
},
})

if exemplar {
w.Timeseries[i].Exemplars = []Exemplar{
{
Labels: []LabelAdapter{
{
Name: fmt.Sprintf("Ex-Name-%v", i),
Value: fmt.Sprintf("Ex-Value-%v", i),
},
},
},
}
}
}

return w
}
9 changes: 6 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ type Config struct {
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -163,6 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")

Expand All @@ -181,6 +183,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
}

haHATrackerConfig := cfg.HATrackerConfig.ToHATrackerConfig()

return haHATrackerConfig.Validate()
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type Config struct {
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffConfig backoff.Config `yaml:"backoff_config"`

TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
SignWriteRequestsEnabled bool `yaml:"-"`
}

// RegisterFlags registers flags.
Expand Down Expand Up @@ -91,6 +92,10 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
}

if cfg.SignWriteRequestsEnabled {
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)
}

return append(
opts,
grpc.WithDefaultCallOptions(cfg.CallOptions()...),
Expand Down
Loading