Skip to content

Commit e501416

Browse files
committed
Multi batch support
1 parent 1ad6912 commit e501416

File tree

7 files changed

+273
-26
lines changed

7 files changed

+273
-26
lines changed

exporter/exporterhelper/internal/base_exporter.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9090

9191
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
9292
qSet := queuebatch.Settings[request.Request]{
93-
Signal: signal,
94-
ID: set.ID,
95-
Telemetry: set.TelemetrySettings,
96-
Encoding: be.queueBatchSettings.Encoding,
97-
Sizers: be.queueBatchSettings.Sizers,
93+
Signal: signal,
94+
ID: set.ID,
95+
Telemetry: set.TelemetrySettings,
96+
Encoding: be.queueBatchSettings.Encoding,
97+
Sizers: be.queueBatchSettings.Sizers,
98+
Partitioner: be.queueBatchSettings.Partitioner,
9899
}
99100
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
100101
if err != nil {

exporter/exporterhelper/internal/queue_sender.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020

2121
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2222
type QueueBatchSettings[T any] struct {
23-
Encoding queuebatch.Encoding[T]
24-
Sizers map[request.SizerType]request.Sizer[T]
23+
Encoding queuebatch.Encoding[T]
24+
Sizers map[request.SizerType]request.Sizer[T]
25+
Partitioner queuebatch.Partitioner[T]
2526
}
2627

2728
// NewDefaultQueueConfig returns the default config for queuebatch.Config.

exporter/exporterhelper/internal/queuebatch/default_batcher.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@ type batch struct {
2222
}
2323

2424
type batcherSettings[T any] struct {
25-
sizerType request.SizerType
26-
sizer request.Sizer[T]
27-
next sender.SendFunc[T]
28-
maxWorkers int
25+
sizerType request.SizerType
26+
sizer request.Sizer[T]
27+
partitioner Partitioner[T]
28+
next sender.SendFunc[T]
29+
maxWorkers int
2930
}
3031

3132
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
3233
type defaultBatcher struct {
3334
cfg BatchConfig
34-
workerPool chan struct{}
35+
workerPool *chan struct{}
3536
sizerType request.SizerType
3637
sizer request.Sizer[request.Request]
3738
consumeFunc sender.SendFunc[request.Request]
@@ -53,7 +54,7 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
5354
}
5455
return &defaultBatcher{
5556
cfg: bCfg,
56-
workerPool: workerPool,
57+
workerPool: &workerPool,
5758
sizerType: bSet.sizerType,
5859
sizer: bSet.sizer,
5960
consumeFunc: bSet.next,
@@ -209,13 +210,13 @@ func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
209210
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done Done) {
210211
qb.stopWG.Add(1)
211212
if qb.workerPool != nil {
212-
<-qb.workerPool
213+
<-*qb.workerPool
213214
}
214215
go func() {
215216
defer qb.stopWG.Done()
216217
done.OnDone(qb.consumeFunc(ctx, req))
217218
if qb.workerPool != nil {
218-
qb.workerPool <- struct{}{}
219+
*qb.workerPool <- struct{}{}
219220
}
220221
}()
221222
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
import (
6+
"context"
7+
"sync"
8+
9+
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
12+
)
13+
14+
type multiBatcher struct {
15+
cfg BatchConfig
16+
workerPool *chan struct{}
17+
sizerType request.SizerType
18+
sizer request.Sizer[request.Request]
19+
partitioner Partitioner[request.Request]
20+
consumeFunc sender.SendFunc[request.Request]
21+
22+
shardMapMu sync.Mutex
23+
shards map[string]*defaultBatcher
24+
}
25+
26+
func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *multiBatcher {
27+
// TODO: Determine what is the right behavior for this in combination with async queue.
28+
var workerPool chan struct{}
29+
if bSet.maxWorkers != 0 {
30+
workerPool = make(chan struct{}, bSet.maxWorkers)
31+
for i := 0; i < bSet.maxWorkers; i++ {
32+
workerPool <- struct{}{}
33+
}
34+
}
35+
return &multiBatcher{
36+
cfg: bCfg,
37+
workerPool: &workerPool,
38+
sizerType: bSet.sizerType,
39+
sizer: bSet.sizer,
40+
partitioner: bSet.partitioner,
41+
consumeFunc: bSet.next,
42+
shardMapMu: sync.Mutex{},
43+
shards: make(map[string]*defaultBatcher),
44+
}
45+
}
46+
47+
func (qb *multiBatcher) getShard(ctx context.Context, req request.Request) *defaultBatcher {
48+
key := qb.partitioner.GetKey(ctx, req)
49+
50+
qb.shardMapMu.Lock()
51+
defer qb.shardMapMu.Unlock()
52+
53+
s, ok := qb.shards[key]
54+
if !ok {
55+
s = &defaultBatcher{
56+
cfg: qb.cfg,
57+
workerPool: qb.workerPool,
58+
sizerType: qb.sizerType,
59+
sizer: qb.sizer,
60+
consumeFunc: qb.consumeFunc,
61+
stopWG: sync.WaitGroup{},
62+
shutdownCh: make(chan struct{}, 1),
63+
}
64+
qb.shards[key] = s
65+
_ = s.Start(ctx, nil)
66+
}
67+
return s
68+
}
69+
70+
func (qb *multiBatcher) Start(_ context.Context, _ component.Host) error {
71+
return nil
72+
}
73+
74+
func (qb *multiBatcher) Consume(ctx context.Context, req request.Request, done Done) {
75+
shard := qb.getShard(ctx, req)
76+
shard.Consume(ctx, req, done)
77+
}
78+
79+
func (qb *multiBatcher) Shutdown(ctx context.Context) error {
80+
qb.shardMapMu.Lock()
81+
defer qb.shardMapMu.Unlock()
82+
stopWG := sync.WaitGroup{}
83+
for _, shard := range qb.shards {
84+
stopWG.Add(1)
85+
go func() {
86+
_ = shard.Shutdown(ctx)
87+
stopWG.Done()
88+
}()
89+
}
90+
stopWG.Wait()
91+
return nil
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch
5+
6+
import (
7+
"context"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"go.opentelemetry.io/collector/component/componenttest"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
17+
)
18+
19+
func TestMultiBatcher_NoTimeout(t *testing.T) {
20+
cfg := BatchConfig{
21+
FlushTimeout: 0,
22+
MinSize: 10,
23+
}
24+
sink := requesttest.NewSink()
25+
26+
type partitionKey struct{}
27+
28+
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
29+
sizerType: request.SizerTypeItems,
30+
sizer: request.NewItemsSizer(),
31+
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
32+
return ctx.Value(partitionKey{}).(string)
33+
}),
34+
next: sink.Export,
35+
maxWorkers: 1,
36+
})
37+
38+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
39+
t.Cleanup(func() {
40+
require.NoError(t, ba.Shutdown(context.Background()))
41+
})
42+
43+
done := newFakeDone()
44+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
45+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
46+
47+
// Neither batch should be flushed since they haven't reached min threshold.
48+
assert.Equal(t, 0, sink.RequestsCount())
49+
assert.Equal(t, 0, sink.ItemsCount())
50+
51+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
52+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
53+
54+
assert.Eventually(t, func() bool {
55+
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
56+
}, 1*time.Second, 10*time.Millisecond)
57+
// Check that done callback is called for the right amount of times.
58+
assert.EqualValues(t, 0, done.errors.Load())
59+
assert.EqualValues(t, 4, done.success.Load())
60+
61+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
62+
}
63+
64+
func TestMultiBatcher_Timeout(t *testing.T) {
65+
cfg := BatchConfig{
66+
FlushTimeout: 100 * time.Millisecond,
67+
MinSize: 100,
68+
}
69+
sink := requesttest.NewSink()
70+
71+
type partitionKey struct{}
72+
73+
ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
74+
sizerType: request.SizerTypeItems,
75+
sizer: request.NewItemsSizer(),
76+
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
77+
return ctx.Value(partitionKey{}).(string)
78+
}),
79+
next: sink.Export,
80+
maxWorkers: 1,
81+
})
82+
83+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
84+
t.Cleanup(func() {
85+
require.NoError(t, ba.Shutdown(context.Background()))
86+
})
87+
88+
done := newFakeDone()
89+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
90+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
91+
92+
// Neither batch should be flushed since they haven't reached min threshold.
93+
assert.Equal(t, 0, sink.RequestsCount())
94+
assert.Equal(t, 0, sink.ItemsCount())
95+
96+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
97+
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)
98+
99+
assert.Eventually(t, func() bool {
100+
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
101+
}, 1*time.Second, 10*time.Millisecond)
102+
// Check that done callback is called for the right amount of times.
103+
assert.EqualValues(t, 0, done.errors.Load())
104+
assert.EqualValues(t, 4, done.success.Load())
105+
106+
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
10+
)
11+
12+
// Partitioner is an interface that returns the the partition key of the given element.
13+
type Partitioner[T any] interface {
14+
GetKey(context.Context, T) string
15+
}
16+
17+
type GetKeyFunc[T any] func(context.Context, T) string
18+
19+
func (f GetKeyFunc[T]) GetKey(ctx context.Context, t T) string {
20+
return f(ctx, t)
21+
}
22+
23+
type BasePartitioner struct {
24+
GetKeyFunc[request.Request]
25+
}
26+
27+
func NewPartitioner(
28+
getKeyFunc func(ctx context.Context,
29+
req request.Request) string,
30+
) Partitioner[request.Request] {
31+
return &BasePartitioner{
32+
GetKeyFunc: getKeyFunc,
33+
}
34+
}

exporter/exporterhelper/internal/queuebatch/queue_batch.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ import (
1616

1717
// Settings defines settings for creating a QueueBatch.
1818
type Settings[T any] struct {
19-
Signal pipeline.Signal
20-
ID component.ID
21-
Telemetry component.TelemetrySettings
22-
Encoding Encoding[T]
23-
Sizers map[request.SizerType]request.Sizer[T]
19+
Signal pipeline.Signal
20+
ID component.ID
21+
Telemetry component.TelemetrySettings
22+
Encoding Encoding[T]
23+
Sizers map[request.SizerType]request.Sizer[T]
24+
Partitioner Partitioner[T]
2425
}
2526

2627
type QueueBatch struct {
@@ -74,12 +75,22 @@ func newQueueBatch(
7475
maxWorkers: cfg.NumConsumers,
7576
})
7677
} else {
77-
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
78-
sizerType: cfg.Sizer,
79-
sizer: sizer,
80-
next: next,
81-
maxWorkers: cfg.NumConsumers,
82-
})
78+
if set.Partitioner == nil {
79+
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
80+
sizerType: cfg.Sizer,
81+
sizer: sizer,
82+
next: next,
83+
maxWorkers: cfg.NumConsumers,
84+
})
85+
} else {
86+
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
87+
sizerType: cfg.Sizer,
88+
sizer: sizer,
89+
partitioner: set.Partitioner,
90+
next: next,
91+
maxWorkers: cfg.NumConsumers,
92+
})
93+
}
8394
}
8495
} else {
8596
b = newDisabledBatcher[request.Request](next)

0 commit comments

Comments
 (0)