Skip to content

Batch DescribeLogGroups calls #1717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 41 additions & 28 deletions plugins/outputs/cloudwatchlogs/internal/pusher/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

const (
cacheTTL = 5 * time.Second
retentionChannelSize = 100
retentionChannelSize = 100
cacheTTL = 5 * time.Second
logGroupIdentifierLimit = 50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 50? I'm guessing this is the api limi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's the API limit :/

// max wait time with backoff and jittering:
// 0 + 2.4 + 4.8 + 9.6 + 10 ~= 26.8 sec
baseRetryDelay = 1 * time.Second
Expand Down Expand Up @@ -174,44 +175,56 @@ func (m *targetManager) createLogStream(t Target) error {
}

func (m *targetManager) processDescribeLogGroup() {
for target := range m.dlg {
for attempt := 0; attempt < numBackoffRetries; attempt++ {
currentRetention, err := m.getRetention(target)
if err != nil {
m.logger.Errorf("failed to describe log group retention for target %v: %v", target, err)
time.Sleep(m.calculateBackoff(attempt))
continue
t := time.NewTicker(5 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure it's fine, but what's the reasoning behind the ticker vs timer. Are we anticipatingDescribeLogGroup calls minutes after start up?

Copy link
Contributor Author

@duhminick duhminick Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible, but maybe this covers the scenario I'm thinking of:

func (l *LogAgent) checkRetentionAlreadyAttempted(retention int, logGroup string) int {

Another scenario - my thinking is that it's not too safe to assume that it will only need to be called once with the timer. The system could be slow to initialize the targets so having this on a timer could potentially miss those log groups

defer t.Stop()

batch := make(map[string]Target, logGroupIdentifierLimit)

for {
select {
case target := <-m.dlg:
batch[target.Group] = target
if len(batch) == logGroupIdentifierLimit {
m.updateTargetBatch(batch)
// Reset batch
batch = make(map[string]Target, logGroupIdentifierLimit)
}

if currentRetention != target.Retention && target.Retention > 0 {
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
m.prp <- target
case <-t.C:
if len(batch) > 0 {
m.updateTargetBatch(batch)
// Reset batch
batch = make(map[string]Target, logGroupIdentifierLimit)
}
break // no change in retention
}
}
}

func (m *targetManager) getRetention(target Target) (int, error) {
input := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupNamePrefix: aws.String(target.Group),
func (m *targetManager) updateTargetBatch(targets map[string]Target) {
identifiers := make([]*string, 0, len(targets))
for logGroup := range targets {
identifiers = append(identifiers, aws.String(logGroup))
}

output, err := m.service.DescribeLogGroups(input)
if err != nil {
return 0, fmt.Errorf("describe log groups failed: %w", err)
describeLogGroupsInput := &cloudwatchlogs.DescribeLogGroupsInput{
LogGroupIdentifiers: identifiers,
Limit: aws.Int64(50),
}
for attempt := 0; attempt < numBackoffRetries; attempt++ {
output, err := m.service.DescribeLogGroups(describeLogGroupsInput)
if err != nil {
m.logger.Errorf("failed to describe log group retention for targets %v: %v", targets, err)
time.Sleep(m.calculateBackoff(attempt))
continue
}

for _, group := range output.LogGroups {
if *group.LogGroupName == target.Group {
if group.RetentionInDays == nil {
return 0, nil
for _, logGroups := range output.LogGroups {
target := targets[*logGroups.LogGroupName]
if (logGroups.RetentionInDays == nil || target.Retention != int(*logGroups.RetentionInDays)) && target.Retention > 0 {
m.logger.Debugf("queueing log group %v to update retention policy", target.Group)
m.prp <- target
}
return int(*group.RetentionInDays), nil
}
break
}

return 0, fmt.Errorf("log group %v not found", target.Group)
}

func (m *targetManager) processPutRetentionPolicy() {
Expand Down
194 changes: 191 additions & 3 deletions plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
package pusher

import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -121,6 +122,8 @@ func TestTargetManager(t *testing.T) {
})

t.Run("SetRetentionPolicy", func(t *testing.T) {
t.Parallel()

target := Target{Group: "G", Stream: "S", Retention: 7}

mockService := new(mockLogsService)
Expand All @@ -139,12 +142,14 @@ func TestTargetManager(t *testing.T) {
err := manager.InitTarget(target)
assert.NoError(t, err)
// Wait for async operations to complete
time.Sleep(100 * time.Millisecond)
time.Sleep(7 * time.Second)
mockService.AssertExpectations(t)
assertCacheLen(t, manager, 1)
})

t.Run("SetRetentionPolicy/NoChange", func(t *testing.T) {
t.Parallel()

target := Target{Group: "G", Stream: "S", Retention: 7}

mockService := new(mockLogsService)
Expand All @@ -161,7 +166,7 @@ func TestTargetManager(t *testing.T) {
manager := NewTargetManager(logger, mockService)
err := manager.InitTarget(target)
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)
time.Sleep(7 * time.Second)
mockService.AssertExpectations(t)
mockService.AssertNotCalled(t, "PutRetentionPolicy")
assertCacheLen(t, manager, 1)
Expand Down Expand Up @@ -344,6 +349,189 @@ func TestTargetManager(t *testing.T) {
})
}

func TestDescribeLogGroupsBatching(t *testing.T) {
logger := testutil.NewNopLogger()

t.Run("ProcessBatchOnLimit", func(t *testing.T) {
mockService := new(mockLogsService)

// Setup mock to expect a batch of 50 log groups
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
return len(input.LogGroupIdentifiers) == logGroupIdentifierLimit
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

for i := 0; i < logGroupIdentifierLimit; i++ {
target := Target{
Group: fmt.Sprintf("group-%d", i),
Stream: "stream",
Retention: 7,
}
tm.PutRetentionPolicy(target)
}

time.Sleep(100 * time.Millisecond)

mockService.AssertExpectations(t)
})

t.Run("ProcessBatchOverLimit", func(t *testing.T) {
t.Parallel()

mockService := new(mockLogsService)

// Setup mock to expect a batch of 125 (2.5x the limit) log groups
// DescribeLogGroups will be called twice due to reaching batch limit on the first 100, then the other 25 on the timer
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
return len(input.LogGroupIdentifiers) == logGroupIdentifierLimit
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Twice()
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
return len(input.LogGroupIdentifiers) == 25
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

for i := 0; i < 125; i++ {
target := Target{
Group: fmt.Sprintf("group-%d", i),
Stream: "stream",
Retention: 7,
}
tm.PutRetentionPolicy(target)
}

time.Sleep(7 * time.Second)

mockService.AssertExpectations(t)
})

t.Run("ProcessBatchOnTimer", func(t *testing.T) {
t.Parallel()

mockService := new(mockLogsService)

// Setup mock to expect a batch of less than 50 log groups
mockService.On("DescribeLogGroups", mock.MatchedBy(func(input *cloudwatchlogs.DescribeLogGroupsInput) bool {
return len(input.LogGroupIdentifiers) == 5
})).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

for i := 0; i < 5; i++ {
target := Target{
Group: fmt.Sprintf("group-%d", i),
Stream: "stream",
Retention: 7,
}
tm.PutRetentionPolicy(target)
}

// Wait for ticker to fire (slightly longer than 5 seconds)
time.Sleep(5100 * time.Millisecond)

mockService.AssertExpectations(t)
})

t.Run("ProcessBatchInvalidGroups", func(t *testing.T) {
t.Parallel()

mockService := new(mockLogsService)

// Return empty result
mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

batch := make(map[string]Target)
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}
tm.updateTargetBatch(batch)

// Wait for ticker to fire (slightly longer than 5 seconds)
time.Sleep(5100 * time.Millisecond)

mockService.AssertNotCalled(t, "PutRetentionPolicy")
})

t.Run("RetentionPolicyUpdate", func(t *testing.T) {
mockService := new(mockLogsService)

mockService.On("DescribeLogGroups", mock.Anything).Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{
{
LogGroupName: aws.String("group-1"),
RetentionInDays: aws.Int64(1),
},
{
LogGroupName: aws.String("group-2"),
RetentionInDays: aws.Int64(7),
},
},
}, nil).Once()

// Setup mock for PutRetentionPolicy (should only be called for group-1)
mockService.On("PutRetentionPolicy", mock.MatchedBy(func(input *cloudwatchlogs.PutRetentionPolicyInput) bool {
return *input.LogGroupName == "group-1" && *input.RetentionInDays == 7
})).Return(&cloudwatchlogs.PutRetentionPolicyOutput{}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

// Create a batch with two targets, one needing retention update
batch := make(map[string]Target)
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}
batch["group-2"] = Target{Group: "group-2", Stream: "stream", Retention: 7}

tm.updateTargetBatch(batch)
time.Sleep(100 * time.Millisecond)

mockService.AssertExpectations(t)
})

t.Run("BatchRetryOnError", func(t *testing.T) {
t.Parallel()

mockService := new(mockLogsService)

// Setup mock to fail once then succeed
mockService.On("DescribeLogGroups", mock.Anything).
Return(&cloudwatchlogs.DescribeLogGroupsOutput{}, fmt.Errorf("internal error")).Once()
mockService.On("DescribeLogGroups", mock.Anything).
Return(&cloudwatchlogs.DescribeLogGroupsOutput{
LogGroups: []*cloudwatchlogs.LogGroup{},
}, nil).Once()

manager := NewTargetManager(logger, mockService)
tm := manager.(*targetManager)

// Create a batch with one target
batch := make(map[string]Target)
batch["group-1"] = Target{Group: "group-1", Stream: "stream", Retention: 7}

tm.updateTargetBatch(batch)
// Sleep enough for retry
time.Sleep(2 * time.Second)

mockService.AssertExpectations(t)
})

}

func TestCalculateBackoff(t *testing.T) {
manager := &targetManager{}
// should never exceed 30sec of total wait time
Expand Down
Loading