Skip to content

Commit fd94cc0

Browse files
committed
Batch DescribeLogGroups calls
Lint
1 parent 8ad482f commit fd94cc0

File tree

2 files changed

+185
-121
lines changed

2 files changed

+185
-121
lines changed

plugins/outputs/cloudwatchlogs/internal/pusher/target.go

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616
)
1717

1818
const (
19-
cacheTTL = 5 * time.Second
20-
retentionChannelSize = 100
19+
cacheTTL = 5 * time.Second
20+
logGroupIdentifierLimit = 50
21+
describeLogGroupChannelSize = 50
22+
putRetentionPolicyChannelSize = 100
2123
// max wait time with backoff and jittering:
2224
// 0 + 2.4 + 4.8 + 9.6 + 10 ~= 26.8 sec
2325
baseRetryDelay = 1 * time.Second
@@ -52,8 +54,8 @@ func NewTargetManager(logger telegraf.Logger, service cloudWatchLogsService) Tar
5254
service: service,
5355
cache: make(map[Target]time.Time),
5456
cacheTTL: cacheTTL,
55-
dlg: make(chan Target, retentionChannelSize),
56-
prp: make(chan Target, retentionChannelSize),
57+
dlg: make(chan Target, describeLogGroupChannelSize),
58+
prp: make(chan Target, putRetentionPolicyChannelSize),
5759
}
5860

5961
go tm.processDescribeLogGroup()
@@ -174,44 +176,55 @@ func (m *targetManager) createLogStream(t Target) error {
174176
}
175177

176178
func (m *targetManager) processDescribeLogGroup() {
177-
for target := range m.dlg {
178-
for attempt := 0; attempt < numBackoffRetries; attempt++ {
179-
currentRetention, err := m.getRetention(target)
180-
if err != nil {
181-
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
182-
time.Sleep(m.calculateBackoff(attempt))
183-
continue
179+
t := time.NewTicker(5 * time.Second)
180+
defer t.Stop()
181+
182+
batch := make(map[string]Target, logGroupIdentifierLimit)
183+
184+
for {
185+
select {
186+
case target := <-m.dlg:
187+
batch[target.Group] = target
188+
if len(batch) == logGroupIdentifierLimit {
189+
m.updateTargetBatch(batch)
190+
// Reset batch
191+
batch = make(map[string]Target, logGroupIdentifierLimit)
184192
}
185-
186-
if currentRetention != target.Retention && target.Retention > 0 {
187-
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
188-
m.prp <- target
193+
case <-t.C:
194+
if len(batch) > 0 {
195+
m.updateTargetBatch(batch)
196+
// Reset batch
197+
batch = make(map[string]Target, logGroupIdentifierLimit)
189198
}
190-
break // no change in retention
191199
}
192200
}
193201
}
194202

195-
func (m *targetManager) getRetention(target Target) (int, error) {
196-
input := &cloudwatchlogs.DescribeLogGroupsInput{
197-
LogGroupNamePrefix: aws.String(target.Group),
203+
func (m *targetManager) updateTargetBatch(targets map[string]Target) {
204+
var identifiers []*string
205+
for logGroup := range targets {
206+
identifiers = append(identifiers, aws.String(logGroup))
198207
}
199-
200-
output, err := m.service.DescribeLogGroups(input)
201-
if err != nil {
202-
return 0, fmt.Errorf("describe log groups failed: %w", err)
208+
describeLogGroupsInput := &cloudwatchlogs.DescribeLogGroupsInput{
209+
LogGroupIdentifiers: identifiers,
203210
}
211+
for attempt := 0; attempt < numBackoffRetries; attempt++ {
212+
output, err := m.service.DescribeLogGroups(describeLogGroupsInput)
213+
if err != nil {
214+
m.logger.Errorf("failed to describe log group retention for targets %v: %v", targets, err)
215+
time.Sleep(m.calculateBackoff(attempt))
216+
continue
217+
}
204218

205-
for _, group := range output.LogGroups {
206-
if *group.LogGroupName == target.Group {
207-
if group.RetentionInDays == nil {
208-
return 0, nil
219+
for _, logGroups := range output.LogGroups {
220+
target := targets[*logGroups.LogGroupName]
221+
if target.Retention != int(*logGroups.RetentionInDays) && target.Retention > 0 {
222+
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
223+
m.prp <- target
209224
}
210-
return int(*group.RetentionInDays), nil
211225
}
226+
break
212227
}
213-
214-
return 0, fmt.Errorf("log group %v not found", target.Group)
215228
}
216229

217230
func (m *targetManager) processPutRetentionPolicy() {

plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go

Lines changed: 142 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package pusher
55

66
import (
7+
"fmt"
78
"sync"
89
"sync/atomic"
910
"testing"
@@ -120,97 +121,6 @@ func TestTargetManager(t *testing.T) {
120121
assertCacheLen(t, manager, 0)
121122
})
122123

123-
t.Run("SetRetentionPolicy", func(t *testing.T) {
124-
target := Target{Group: "G", Stream: "S", Retention: 7}
125-
126-
mockService := new(mockLogsService)
127-
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
128-
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
129-
LogGroups: []*cloudwatchlogs.LogGroup{
130-
{
131-
LogGroupName: aws.String(target.Group),
132-
RetentionInDays: aws.Int64(0),
133-
},
134-
},
135-
}, nil).Once()
136-
mockService.On("PutRetentionPolicy", mock.Anything).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()
137-
138-
manager := NewTargetManager(logger, mockService)
139-
err := manager.InitTarget(target)
140-
assert.NoError(t, err)
141-
// Wait for async operations to complete
142-
time.Sleep(100 * time.Millisecond)
143-
mockService.AssertExpectations(t)
144-
assertCacheLen(t, manager, 1)
145-
})
146-
147-
t.Run("SetRetentionPolicy/NoChange", func(t *testing.T) {
148-
target := Target{Group: "G", Stream: "S", Retention: 7}
149-
150-
mockService := new(mockLogsService)
151-
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
152-
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
153-
LogGroups: []*cloudwatchlogs.LogGroup{
154-
{
155-
LogGroupName: aws.String(target.Group),
156-
RetentionInDays: aws.Int64(7),
157-
},
158-
},
159-
}, nil).Once()
160-
161-
manager := NewTargetManager(logger, mockService)
162-
err := manager.InitTarget(target)
163-
assert.NoError(t, err)
164-
time.Sleep(100 * time.Millisecond)
165-
mockService.AssertExpectations(t)
166-
mockService.AssertNotCalled(t, "PutRetentionPolicy")
167-
assertCacheLen(t, manager, 1)
168-
})
169-
170-
t.Run("SetRetentionPolicy/LogGroupNotFound", func(t *testing.T) {
171-
t.Parallel()
172-
target := Target{Group: "G", Stream: "S", Retention: 7}
173-
174-
mockService := new(mockLogsService)
175-
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
176-
mockService.On("DescribeLogGroups", mock.Anything).
177-
Return(&cloudwatchlogs.DescribeLogGroupsOutput{}, &cloudwatchlogs.ResourceNotFoundException{}).Times(numBackoffRetries)
178-
179-
manager := NewTargetManager(logger, mockService)
180-
err := manager.InitTarget(target)
181-
assert.NoError(t, err)
182-
time.Sleep(30 * time.Second)
183-
mockService.AssertExpectations(t)
184-
mockService.AssertNotCalled(t, "PutRetentionPolicy")
185-
assertCacheLen(t, manager, 1)
186-
})
187-
188-
t.Run("SetRetentionPolicy/Error", func(t *testing.T) {
189-
t.Parallel()
190-
target := Target{Group: "G", Stream: "S", Retention: 7}
191-
192-
mockService := new(mockLogsService)
193-
mockService.On("CreateLogStream", mock.Anything).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil).Once()
194-
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
195-
LogGroups: []*cloudwatchlogs.LogGroup{
196-
{
197-
LogGroupName: aws.String(target.Group),
198-
RetentionInDays: aws.Int64(0),
199-
},
200-
},
201-
}, nil).Once()
202-
mockService.On("PutRetentionPolicy", mock.Anything).
203-
Return(&cloudwatchlogs.PutRetentionPolicyOutput{},
204-
awserr.New("SomeAWSError", "Failed to set retention policy", nil)).Times(numBackoffRetries)
205-
206-
manager := NewTargetManager(logger, mockService)
207-
err := manager.InitTarget(target)
208-
assert.NoError(t, err)
209-
time.Sleep(30 * time.Second)
210-
mockService.AssertExpectations(t)
211-
assertCacheLen(t, manager, 1)
212-
})
213-
214124
t.Run("SetRetentionPolicy/Negative", func(t *testing.T) {
215125
target := Target{Group: "G", Stream: "S", Retention: -1}
216126

@@ -344,6 +254,147 @@ func TestTargetManager(t *testing.T) {
344254
})
345255
}
346256

257+
func TestDescribeLogGroupsBatching(t *testing.T) {
258+
logger := testutil.NewNopLogger()
259+
260+
t.Run("ProcessBatchOnLimit", func(t *testing.T) {
261+
mockService := new(mockLogsService)
262+
263+
// Setup mock to expect a batch of 50 log groups
264+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
265+
return len(input.LogGroupIdentifiers) == logGroupIdentifierLimit
266+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
267+
LogGroups: []*cloudwatchlogs.LogGroup{},
268+
}, nil).Once()
269+
270+
manager := NewTargetManager(logger, mockService)
271+
tm := manager.(*targetManager)
272+
273+
for i := 0; i < logGroupIdentifierLimit; i++ {
274+
target := Target{
275+
Group: fmt.Sprintf("group-%d", i),
276+
Stream: "stream",
277+
Retention: 7,
278+
}
279+
tm.dlg <- target
280+
}
281+
282+
time.Sleep(100 * time.Millisecond)
283+
284+
mockService.AssertExpectations(t)
285+
})
286+
287+
t.Run("ProcessBatchOnTimer", func(t *testing.T) {
288+
mockService := new(mockLogsService)
289+
290+
// Setup mock to expect a batch of less than 50 log groups
291+
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
292+
return len(input.LogGroupIdentifiers) == 5
293+
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
294+
LogGroups: []*cloudwatchlogs.LogGroup{},
295+
}, nil).Once()
296+
297+
manager := NewTargetManager(logger, mockService)
298+
tm := manager.(*targetManager)
299+
300+
for i := 0; i < 5; i++ {
301+
target := Target{
302+
Group: fmt.Sprintf("group-%d", i),
303+
Stream: "stream",
304+
Retention: 7,
305+
}
306+
tm.dlg <- target
307+
}
308+
309+
// Wait for ticker to fire (slightly longer than 5 seconds)
310+
time.Sleep(5100 * time.Millisecond)
311+
312+
mockService.AssertExpectations(t)
313+
})
314+
315+
t.Run("ProcessBatchInvalidGroups", func(t *testing.T) {
316+
mockService := new(mockLogsService)
317+
318+
// Return empty result
319+
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
320+
LogGroups: []*cloudwatchlogs.LogGroup{},
321+
}, nil).Once()
322+
323+
manager := NewTargetManager(logger, mockService)
324+
tm := manager.(*targetManager)
325+
326+
batch := make(map[string]Target)
327+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
328+
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
329+
tm.updateTargetBatch(batch)
330+
331+
// Wait for ticker to fire (slightly longer than 5 seconds)
332+
time.Sleep(5100 * time.Millisecond)
333+
334+
mockService.AssertNotCalled(t, "PutRetentionPolicy")
335+
})
336+
337+
t.Run("RetentionPolicyUpdate", func(t *testing.T) {
338+
mockService := new(mockLogsService)
339+
340+
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
341+
LogGroups: []*cloudwatchlogs.LogGroup{
342+
{
343+
LogGroupName: aws.String("group-1"),
344+
RetentionInDays: aws.Int64(1),
345+
},
346+
{
347+
LogGroupName: aws.String("group-2"),
348+
RetentionInDays: aws.Int64(7),
349+
},
350+
},
351+
}, nil).Once()
352+
353+
// Setup mock for PutRetentionPolicy (should only be called for group-1)
354+
mockService.On("PutRetentionPolicy", mock.MatchedBy(func(input *cloudwatchlogs.PutRetentionPolicyInput) bool {
355+
return *input.LogGroupName == "group-1" && *input.RetentionInDays == 7
356+
})).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()
357+
358+
manager := NewTargetManager(logger, mockService)
359+
tm := manager.(*targetManager)
360+
361+
// Create a batch with two targets, one needing retention update
362+
batch := make(map[string]Target)
363+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
364+
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
365+
366+
tm.updateTargetBatch(batch)
367+
time.Sleep(100 * time.Millisecond)
368+
369+
mockService.AssertExpectations(t)
370+
})
371+
372+
t.Run("BatchRetryOnError", func(t *testing.T) {
373+
mockService := new(mockLogsService)
374+
375+
// Setup mock to fail once then succeed
376+
mockService.On("DescribeLogGroups", mock.Anything).
377+
Return(&cloudwatchlogs.DescribeLogGroupsOutput{}, fmt.Errorf("internal error")).Once()
378+
mockService.On("DescribeLogGroups", mock.Anything).
379+
Return(&cloudwatchlogs.DescribeLogGroupsOutput{
380+
LogGroups: []*cloudwatchlogs.LogGroup{},
381+
}, nil).Once()
382+
383+
manager := NewTargetManager(logger, mockService)
384+
tm := manager.(*targetManager)
385+
386+
// Create a batch with one target
387+
batch := make(map[string]Target)
388+
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
389+
390+
tm.updateTargetBatch(batch)
391+
// Sleep enough for retry
392+
time.Sleep(2 * time.Second)
393+
394+
mockService.AssertExpectations(t)
395+
})
396+
}
397+
347398
func TestCalculateBackoff(t *testing.T) {
348399
manager := &targetManager{}
349400
// should never exceed 30sec of total wait time

0 commit comments

Comments
 (0)