Skip to content

Use slice pooling to populate the query stream response #6466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/wlog"
"github.com/prometheus/prometheus/util/zeropool"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/shipper"
Expand Down Expand Up @@ -95,6 +96,8 @@ const (
var (
errExemplarRef = errors.New("exemplars not ingested because series not already present")
errIngesterStopping = errors.New("ingester stopping")

tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]
)

// Config for an Ingester.
Expand Down Expand Up @@ -2045,7 +2048,8 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
return 0, 0, 0, 0, ss.Err()
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
chunkSeries := getTimeSeriesChunksSlice()
defer putTimeSeriesChunksSlice(chunkSeries)
batchSizeBytes := 0
var it chunks.Iterator
for ss.Next() {
Expand Down Expand Up @@ -3062,6 +3066,31 @@ func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(respMsg))
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
}

// stopIncomingRequests is called during the shutdown process.
func (i *Ingester) stopIncomingRequests() {
i.stoppedMtx.Lock()
defer i.stoppedMtx.Unlock()
i.stopped = true
}

Comment on lines +3069 to +3093
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes on these functions.. I just moved to be close to the other ingesters functions.

// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
if queryIngestersWithin > 0 {
Expand Down Expand Up @@ -3119,27 +3148,16 @@ func wrappedTSDBIngestExemplarErr(ingestErr error, timestamp model.Time, seriesL
)
}

func (i *Ingester) getInstanceLimits() *InstanceLimits {
// Don't apply any limits while starting. We especially don't want to apply series in memory limit while replaying WAL.
if i.State() == services.Starting {
return nil
}

if i.cfg.InstanceLimitsFn == nil {
return defaultInstanceLimits
func getTimeSeriesChunksSlice() []client.TimeSeriesChunk {
if p := tsChunksPool.Get(); p != nil {
return p
}

l := i.cfg.InstanceLimitsFn()
if l == nil {
return defaultInstanceLimits
}

return l
return make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
}

// stopIncomingRequests is called during the shutdown process.
func (i *Ingester) stopIncomingRequests() {
i.stoppedMtx.Lock()
defer i.stoppedMtx.Unlock()
i.stopped = true
func putTimeSeriesChunksSlice(p []client.TimeSeriesChunk) {
if p != nil {
tsChunksPool.Put(p[:0])
}
}
24 changes: 19 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3424,10 +3424,25 @@ func (m *mockQueryStreamServer) Context() context.Context {
}

func BenchmarkIngester_QueryStream_Chunks(b *testing.B) {
benchmarkQueryStream(b)
tc := []struct {
samplesCount, seriesCount int
}{
{samplesCount: 10, seriesCount: 10},
{samplesCount: 10, seriesCount: 50},
{samplesCount: 10, seriesCount: 100},
{samplesCount: 50, seriesCount: 10},
{samplesCount: 50, seriesCount: 50},
{samplesCount: 50, seriesCount: 100},
}

for _, c := range tc {
b.Run(fmt.Sprintf("samplesCount=%v; seriesCount=%v", c.samplesCount, c.seriesCount), func(b *testing.B) {
benchmarkQueryStream(b, c.samplesCount, c.seriesCount)
})
}
}

func benchmarkQueryStream(b *testing.B) {
func benchmarkQueryStream(b *testing.B, samplesCount, seriesCount int) {
cfg := defaultIngesterTestConfig(b)

// Create ingester.
Expand All @@ -3444,7 +3459,6 @@ func benchmarkQueryStream(b *testing.B) {
// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

const samplesCount = 1000
samples := make([]cortexpb.Sample, 0, samplesCount)

for i := 0; i < samplesCount; i++ {
Expand All @@ -3454,15 +3468,14 @@ func benchmarkQueryStream(b *testing.B) {
})
}

const seriesCount = 100
for s := 0; s < seriesCount; s++ {
_, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples))
require.NoError(b, err)
}

req := &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: samplesCount + 1,
EndTimestampMs: int64(samplesCount + 1),

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Expand All @@ -3474,6 +3487,7 @@ func benchmarkQueryStream(b *testing.B) {
mockStream := &mockQueryStreamServer{ctx: ctx}

b.ResetTimer()
b.ReportAllocs()

for ix := 0; ix < b.N; ix++ {
err := i.QueryStream(req, mockStream)
Expand Down
Loading