Skip to content

Commit 5e697d5

Browse files
committed
Make the number of worker processing federated query configurable
Signed-off-by: SungJin1212 <[email protected]>
1 parent c124391 commit 5e697d5

File tree

6 files changed

+27
-16
lines changed

6 files changed

+27
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1919
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
2020
* [FEATURE] Ruler: Add support for per-user external labels #6340
21-
* [ENHANCEMENT] Querier: Add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query when the `-tenant-federation.enabled=true`. #6449
21+
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2222
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2323
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
2424
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ tenant_federation:
157157
# CLI flag: -tenant-federation.enabled
158158
[enabled: <boolean> | default = false]
159159

160+
# The number of workers used for processing federated query.
161+
# CLI flag: -tenant-federation.max-concurrent
162+
[max_concurrent: <int> | default = 16]
163+
160164
# The ruler_config configures the Cortex ruler.
161165
[ruler: <ruler_config>]
162166

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
274274
// single tenant. This allows for a less impactful enabling of tenant
275275
// federation.
276276
byPassForSingleQuerier := true
277-
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, prometheus.DefaultRegisterer))
277+
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
278278
}
279279
return nil, nil
280280
}

pkg/querier/tenantfederation/merge_queryable.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import (
2121
)
2222

2323
const (
24-
defaultTenantLabel = "__tenant_id__"
25-
retainExistingPrefix = "original_"
26-
maxConcurrency = 16
24+
defaultTenantLabel = "__tenant_id__"
25+
retainExistingPrefix = "original_"
26+
defaultMaxConcurrency = 16
2727
)
2828

2929
// NewQueryable returns a queryable that iterates through all the tenant IDs
@@ -38,8 +38,8 @@ const (
3838
// If the label "__tenant_id__" is already existing, its value is overwritten
3939
// by the tenant ID and the previous value is exposed through a new label
4040
// prefixed with "original_". This behaviour is not implemented recursively.
41-
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
42-
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
41+
func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
42+
return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
4343
}
4444

4545
func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
@@ -81,9 +81,10 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
8181
// If the label `idLabelName` is already existing, its value is overwritten and
8282
// the previous value is exposed through a new label prefixed with "original_".
8383
// This behaviour is not implemented recursively.
84-
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
84+
func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
8585
return &mergeQueryable{
8686
idLabelName: idLabelName,
87+
maxConcurrent: maxConcurrent,
8788
callback: callback,
8889
byPassWithSingleQuerier: byPassWithSingleQuerier,
8990

@@ -98,6 +99,7 @@ func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPass
9899

99100
type mergeQueryable struct {
100101
idLabelName string
102+
maxConcurrent int
101103
byPassWithSingleQuerier bool
102104
callback MergeQuerierCallback
103105
tenantsPerQuery prometheus.Histogram
@@ -108,6 +110,7 @@ type mergeQueryable struct {
108110
func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) {
109111
return &mergeQuerier{
110112
idLabelName: m.idLabelName,
113+
maxConcurrent: m.maxConcurrent,
111114
mint: mint,
112115
maxt: maxt,
113116
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
@@ -123,9 +126,10 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error
123126
// the previous value is exposed through a new label prefixed with "original_".
124127
// This behaviour is not implemented recursively
125128
type mergeQuerier struct {
126-
idLabelName string
127-
mint, maxt int64
128-
callback MergeQuerierCallback
129+
idLabelName string
130+
mint, maxt int64
131+
callback MergeQuerierCallback
132+
maxConcurrent int
129133

130134
byPassWithSingleQuerier bool
131135
tenantsPerQuery prometheus.Histogram
@@ -273,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context,
273277
return nil
274278
}
275279

276-
err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
280+
err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
277281
if err != nil {
278282
return nil, nil, err
279283
}
@@ -370,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
370374
return nil
371375
}
372376

373-
if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil {
377+
if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil {
374378
return storage.ErrSeriesSet(err)
375379
}
376380

pkg/querier/tenantfederation/merge_queryable_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ type mergeQueryableScenario struct {
302302
func (s *mergeQueryableScenario) init() (storage.Querier, prometheus.Gatherer, error) {
303303
// initialize with default tenant label
304304
reg := prometheus.NewPedanticRegistry()
305-
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, reg)
305+
q := NewQueryable(&s.queryable, defaultMaxConcurrency, !s.doNotByPassSingleQuerier, reg)
306306

307307
// retrieve querier
308308
querier, err := q.Querier(mint, maxt)
@@ -384,7 +384,7 @@ func TestMergeQueryable_Querier(t *testing.T) {
384384
t.Run("querying without a tenant specified should error", func(t *testing.T) {
385385
t.Parallel()
386386
queryable := &mockTenantQueryableWithFilter{}
387-
q := NewQueryable(queryable, false /* byPassWithSingleQuerier */, nil)
387+
q := NewQueryable(queryable, defaultMaxConcurrency, false /* byPassWithSingleQuerier */, nil)
388388

389389
querier, err := q.Querier(mint, maxt)
390390
require.NoError(t, err)
@@ -1115,7 +1115,7 @@ func TestTracingMergeQueryable(t *testing.T) {
11151115
// set a multi tenant resolver
11161116
tenant.WithDefaultResolver(tenant.NewMultiResolver())
11171117
filter := mockTenantQueryableWithFilter{}
1118-
q := NewQueryable(&filter, false, nil)
1118+
q := NewQueryable(&filter, defaultMaxConcurrency, false, nil)
11191119
// retrieve querier if set
11201120
querier, err := q.Querier(mint, maxt)
11211121
require.NoError(t, err)

pkg/querier/tenantfederation/tenant_federation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import (
77
type Config struct {
88
// Enabled switches on support for multi tenant query federation
99
Enabled bool `yaml:"enabled"`
10+
// MaxConcurrent The number of workers used for processing federated query.
11+
MaxConcurrent int `yaml:"max_concurrent"`
1012
}
1113

1214
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
1315
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
16+
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used for processing federated query.")
1417
}

0 commit comments

Comments
 (0)