Skip to content

Backfilling the new limits when updating the metrics #5955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930
* [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931
* [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933
* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Query Frontend/Ruler: Omit empty data field in API response. #5953 #5954
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
Expand Down
37 changes: 22 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
i.stoppedMtx.RUnlock()

case <-activeSeriesTickerChan:
i.updateActiveSeries()
i.updateActiveSeries(ctx)
case <-maxInflightRequestResetTicker.C:
i.maxInflightQueryRequests.Tick()
case <-userTSDBConfigTicker.C:
Expand Down Expand Up @@ -929,7 +929,7 @@ func (i *Ingester) getMaxExemplars(userID string) int64 {
return int64(maxExemplarsFromLimits)
}

func (i *Ingester) updateActiveSeries() {
func (i *Ingester) updateActiveSeries(ctx context.Context) {
purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout)

for _, userID := range i.getTSDBUsers() {
Expand All @@ -940,7 +940,9 @@ func (i *Ingester) updateActiveSeries() {

userDB.activeSeries.Purge(purgeTime)
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active()))
userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet)
if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics.activeSeriesPerLabelSet); err != nil {
level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err)
}
}
}

Expand Down Expand Up @@ -1054,18 +1056,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
nativeHistogramCount = 0
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
nativeHistogramCount = 0

updateFirstPartial = func(errFn func() error) {
if firstPartialErr == nil {
Expand Down Expand Up @@ -1150,6 +1153,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
Expand Down Expand Up @@ -1245,6 +1249,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if perMetricSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}
if perLabelSetSeriesLimitCount > 0 {
validation.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
}

if nativeHistogramCount > 0 {
validation.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
Expand Down
52 changes: 37 additions & 15 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry)
registry.MustRegister(validation.DiscardedSamples)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
Expand All @@ -132,13 +133,13 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
}
}

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should impose limits
for _, set := range limits.MaxSeriesPerLabelSet {
Expand All @@ -154,13 +155,16 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.ErrorContains(t, err, set.Id)
}

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should apply composite limits
limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet,
Expand All @@ -187,6 +191,21 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)

// Should backfill
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 0
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Adding 5 metrics with only 1 label
for i := 0; i < 5; i++ {
lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1"}
Expand All @@ -211,16 +230,19 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, labels.FromStrings("comp1", "compValue1", "comp2", "compValue2").String())

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 3
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3
cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 2
cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 7
cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 2
`), "cortex_ingester_active_series_per_labelset"))
`), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total"))

// Should bootstrap and apply limits when configuration change
limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet,
Expand Down Expand Up @@ -249,7 +271,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
require.ErrorContains(t, err, labels.FromStrings(lbls...).String())

ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand All @@ -267,7 +289,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
require.NoError(t, err)
require.NoError(t, limits.UnmarshalJSON(b))
tenantLimits.setLimits(userID, &limits)
ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand All @@ -281,7 +303,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
ing, err = prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
ing.updateActiveSeries()
ing.updateActiveSeries(ctx)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset.
# TYPE cortex_ingester_active_series_per_labelset gauge
Expand Down Expand Up @@ -1207,7 +1229,7 @@ func TestIngester_Push(t *testing.T) {

// Update active series for metrics check.
if !testData.disableActiveSeries {
i.updateActiveSeries()
i.updateActiveSeries(ctx)
}

// Append additional metrics to assert on.
Expand Down Expand Up @@ -1274,7 +1296,7 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi
}

// Update active series for metrics check.
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// Check tracked Prometheus metrics
expectedMetrics := `
Expand Down Expand Up @@ -1361,7 +1383,7 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) {
time.Sleep(200 * time.Millisecond)

// Update active series for metrics check. This will remove inactive series.
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// Check tracked Prometheus metrics
expectedMetrics := `
Expand Down Expand Up @@ -3733,7 +3755,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {
})

pushSingleSampleWithMetadata(t, i)
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

require.Equal(t, int64(1), i.TSDBState.seriesCount.Load())

Expand Down Expand Up @@ -3774,7 +3796,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {
})

require.Greater(t, testutil.ToFloat64(i.TSDBState.idleTsdbChecks.WithLabelValues(string(tsdbIdleClosed))), float64(0))
i.updateActiveSeries()
i.updateActiveSeries(context.Background())
require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) // Flushing removed all series from memory.

// Verify that user has disappeared from metrics.
Expand All @@ -3799,7 +3821,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) {

// Pushing another sample will recreate TSDB.
pushSingleSampleWithMetadata(t, i)
i.updateActiveSeries()
i.updateActiveSeries(context.Background())

// User is back.
require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(`
Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int
return errMaxMetadataPerUserLimitExceeded
}

// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error {
m := l.maxSeriesPerLabelSet(userID, metric)
for _, limit := range m {
Expand Down
88 changes: 52 additions & 36 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,45 +125,49 @@ func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTS
s.RUnlock()

// We still dont keep track of this label value so we need to backfill
ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}
return m.backFillLimit(ctx, u, set, s)
})
}

defer ir.Close()
func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.MaxSeriesPerLabelSet, s *labelSetCounterShard) (int, error) {
ir, err := u.db.Head().Index()
if err != nil {
return 0, err
}

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[set.Hash]; !ok {
postings := make([]index.Postings, 0, len(set.LabelSet))
for _, lbl := range set.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
defer ir.Close()

s.Lock()
defer s.Unlock()
if r, ok := s.valuesCounter[limit.Hash]; !ok {
postings := make([]index.Postings, 0, len(limit.LabelSet))
for _, lbl := range limit.LabelSet {
p, err := ir.Postings(ctx, lbl.Name, lbl.Value)
if err != nil {
return 0, err
}
postings = append(postings, p)
}

p := index.Intersect(postings...)
p := index.Intersect(postings...)

totalCount := 0
for p.Next() {
totalCount++
}
totalCount := 0
for p.Next() {
totalCount++
}

if p.Err() != nil {
return 0, p.Err()
}
if p.Err() != nil {
return 0, p.Err()
}

s.valuesCounter[set.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: set.LabelSet,
}
return totalCount, nil
} else {
return r.count, nil
s.valuesCounter[limit.Hash] = &labelSetCounterEntry{
count: totalCount,
labels: limit.LabelSet,
}
})
return totalCount, nil
} else {
return r.count, nil
}
}

func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) {
Expand Down Expand Up @@ -195,24 +199,36 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe
}
}

func (m *labelSetCounter) UpdateMetric(u *userTSDB, vec *prometheus.GaugeVec) {
currentLbsLimitHash := map[uint64]struct{}{}
func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *prometheus.GaugeVec) error {
currentLbsLimitHash := map[uint64]validation.MaxSeriesPerLabelSet{}
for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) {
currentLbsLimitHash[l.Hash] = struct{}{}
currentLbsLimitHash[l.Hash] = l
}

for i := 0; i < numMetricCounterShards; i++ {
s := m.shards[i]
s.RLock()
for h, entry := range s.valuesCounter {
// This limit no longer ecists
// This limit no longer exists
if _, ok := currentLbsLimitHash[h]; !ok {
vec.DeleteLabelValues(u.userID, entry.labels.String())
continue
}

delete(currentLbsLimitHash, h)
vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count))
}
s.RUnlock()
}

// Backfill all limits that are not being tracked yet
for _, l := range currentLbsLimitHash {
s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards]
count, err := m.backFillLimit(ctx, u, l, s)
if err != nil {
return err
}
vec.WithLabelValues(u.userID, l.LabelSet.String()).Set(float64(count))
}

return nil
}
Loading