Skip to content

Cherry-pick fixes to release 1.15 branch #5241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Tracing: Use the default OTEL trace sampler when `-tracing.otel.exporter-type` is set to `awsxray`. #5141
* [CHANGE] Ingester partial error log line to debug level. #5192
* [CHANGE] Change HTTP status code from 503/422 to 499 if a request is canceled. #5220
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down Expand Up @@ -44,6 +45,7 @@
* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193
* [ENHANCEMENT] Query Frontend: Add number of chunks and samples fetched in query stats. #5198
* [ENHANCEMENT] Implement grpc.Compressor.DecompressedSize for snappy to optimize memory allocations. #5213
* [ENHANCEMENT] Querier: Batch Iterator optimization to prevent transversing it multiple times query ranges steps does not overlap. #5237
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055
Expand All @@ -57,6 +59,7 @@
* [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188
* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217
* [BUGFIX] Query Frontend: Disable `absent`, `absent_over_time` and `scalar` for vertical sharding. #5221
* [BUGFIX] Catch context error in the s3 bucket client. #5240

## 1.14.0 2022-12-02

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.15.0-rc.0
1.15.0-rc.1
14 changes: 14 additions & 0 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type iterator interface {
// Seek or Next have returned true.
AtTime() int64

// MaxCurrentChunkTime returns the max time on the current chunk.
MaxCurrentChunkTime() int64

// Batch returns the current batch. Must only be called after Seek or Next
// have returned true.
Batch() promchunk.Batch
Expand Down Expand Up @@ -98,6 +101,17 @@ func (a *iteratorAdapter) Seek(t int64) bool {
a.curr.Index++
}
return true
} else if t <= a.underlying.MaxCurrentChunkTime() {
// In this case, some timestamp inside the current underlying chunk can fulfill the seek.
// In this case we will call next until we find the sample as it will be faster than calling
// `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk.
// See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45
// https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95
for a.Next() {
if t <= a.curr.Timestamps[a.curr.Index] {
return true
}
}
}
}

Expand Down
59 changes: 54 additions & 5 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
scenario.duplicationFactor,
scenario.enc.String())

chunks := createChunks(b, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)
chunks := createChunks(b, step, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)

b.Run(name, func(b *testing.B) {
b.ReportAllocs()
Expand All @@ -55,10 +55,59 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
}
}

func BenchmarkNewChunkMergeIterator_Seek(b *testing.B) {
scenarios := []struct {
numChunks int
numSamplesPerChunk int
duplicationFactor int
seekStep time.Duration
scrapeInterval time.Duration
enc promchunk.Encoding
}{
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 30 * time.Second, seekStep: 30 * time.Second * 200, enc: promchunk.PrometheusXorChunk},

{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second / 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 2, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 10, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 30, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 50, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 100, enc: promchunk.PrometheusXorChunk},
{numChunks: 1000, numSamplesPerChunk: 120, duplicationFactor: 3, scrapeInterval: 10 * time.Second, seekStep: 10 * time.Second * 200, enc: promchunk.PrometheusXorChunk},
}

for _, scenario := range scenarios {
name := fmt.Sprintf("scrapeInterval %vs seekStep: %vs",
scenario.scrapeInterval.Seconds(),
scenario.seekStep.Seconds())

chunks := createChunks(b, scenario.scrapeInterval, scenario.numChunks, scenario.numSamplesPerChunk, scenario.duplicationFactor, scenario.enc)

b.Run(name, func(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
i := int64(0)
for it.Seek(i*scenario.seekStep.Milliseconds()) != chunkenc.ValNone {
i++
}
}
})
}
}

func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
t.Parallel()
chunkOne := mkChunk(t, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkOne := mkChunk(t, step, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunkTwo := mkChunk(t, step, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
chunks := []chunk.Chunk{chunkOne, chunkTwo}

sut := NewChunkMergeIterator(chunks, 0, 0)
Expand All @@ -72,13 +121,13 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
require.Equal(t, int64(1*time.Second/time.Millisecond), actual)
}

func createChunks(b *testing.B, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
func createChunks(b *testing.B, step time.Duration, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
result := make([]chunk.Chunk, 0, numChunks)

for d := 0; d < duplicationFactor; d++ {
for c := 0; c < numChunks; c++ {
minTime := step * time.Duration(c*numSamplesPerChunk)
result = append(result, mkChunk(b, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
result = append(result, mkChunk(b, step, model.Time(minTime.Milliseconds()), numSamplesPerChunk, enc))
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (i *chunkIterator) reset(chunk GenericChunk) {
i.batch.Index = 0
}

func (i *chunkIterator) MaxCurrentChunkTime() int64 {
return i.chunk.MaxTime
}

// Seek advances the iterator forward to the value at or after
// the given timestamp.
func (i *chunkIterator) Seek(t int64, size int) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) {
}
}

func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
func mkChunk(t require.TestingT, step time.Duration, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk {
metric := labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
}
Expand All @@ -65,7 +65,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
}

func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) GenericChunk {
ck := mkChunk(t, from, points, enc)
ck := mkChunk(t, step, from, points, enc)
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ func (c *mergeIterator) AtTime() int64 {
return c.batches[0].Timestamps[0]
}

func (c *mergeIterator) MaxCurrentChunkTime() int64 {
if len(c.h) < 1 {
return -1
}

return c.h[0].MaxCurrentChunkTime()
}

func (c *mergeIterator) Batch() promchunk.Batch {
return c.batches[0]
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/batch/non_overlapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (it *nonOverlappingIterator) Seek(t int64, size int) bool {
}
}

func (it *nonOverlappingIterator) MaxCurrentChunkTime() int64 {
return it.iter.MaxCurrentChunkTime()
}

func (it *nonOverlappingIterator) Next(size int) bool {
for {
if it.iter.Next(size) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operation
level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr, "operation", operationInfo)
return lastErr
}
return nil
return retries.Err()
}

func (b *BucketWithRetries) Name() string {
Expand Down
21 changes: 20 additions & 1 deletion pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ func TestBucketWithRetries_UploadFailed(t *testing.T) {
require.ErrorContains(t, err, "failed upload: ")
}

func TestBucketWithRetries_ContextCanceled(t *testing.T) {
t.Parallel()

m := mockBucket{}
b := BucketWithRetries{
logger: log.NewNopLogger(),
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

ctx, cancel := context.WithCancel(context.Background())
cancel()
obj, err := b.GetRange(ctx, "dummy", 0, 10)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, obj)
}

type fakeReader struct {
}

Expand Down Expand Up @@ -121,7 +140,7 @@ func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error

// GetRange mocks objstore.Bucket.GetRange()
func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return nil, nil
return io.NopCloser(bytes.NewBuffer(bytes.Repeat([]byte{1}, int(length)))), nil
}

// Exists mocks objstore.Bucket.Exists()
Expand Down
12 changes: 6 additions & 6 deletions pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric) {

data.SendSumOfGaugesPerUser(out, m.blocksLoaded, "thanos_bucket_store_blocks_loaded")

data.SendSumOfSummariesWithLabels(out, m.seriesDataTouched, "thanos_bucket_store_series_data_touched", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataFetched, "thanos_bucket_store_series_data_fetched", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeTouched, "thanos_bucket_store_series_data_size_touched_bytes", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeFetched, "thanos_bucket_store_series_data_size_fetched_bytes", "data_type")
data.SendSumOfSummariesWithLabels(out, m.seriesBlocksQueried, "thanos_bucket_store_series_blocks_queried")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataTouched, "thanos_bucket_store_series_data_touched", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataFetched, "thanos_bucket_store_series_data_fetched", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataSizeTouched, "thanos_bucket_store_series_data_size_touched_bytes", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesDataSizeFetched, "thanos_bucket_store_series_data_size_fetched_bytes", "data_type")
data.SendSumOfHistogramsWithLabels(out, m.seriesBlocksQueried, "thanos_bucket_store_series_blocks_queried")

data.SendSumOfHistograms(out, m.seriesGetAllDuration, "thanos_bucket_store_series_get_all_duration_seconds")
data.SendSumOfHistograms(out, m.seriesMergeDuration, "thanos_bucket_store_series_merge_duration_seconds")
data.SendSumOfCounters(out, m.seriesRefetches, "thanos_bucket_store_series_refetches_total")
data.SendSumOfSummaries(out, m.resultSeriesCount, "thanos_bucket_store_series_result_series")
data.SendSumOfHistograms(out, m.resultSeriesCount, "thanos_bucket_store_series_result_series")
data.SendSumOfCounters(out, m.queriesDropped, "thanos_bucket_store_queries_dropped_total")

data.SendSumOfCountersWithLabels(out, m.cachedPostingsCompressions, "thanos_bucket_store_cached_postings_compressions_total", "op")
Expand Down
Loading