Skip to content

Commit b85ddcc

Browse files
philasmarjkwpappin
andauthored
Issue-147: Add feature to pause/resume SQSMessagePoller (#197)
* Allow pausing and resumption of message consumption (cherry picked from commit 4a442fb) * Add changelog (cherry picked from commit fba5d5b) * Remove redundant parenthesis (cherry picked from commit 1946aec) * Fix whitespace (cherry picked from commit b86353c) * Add a ConfigureAwait as we're trying to do something and verify a Moq during the async execution (cherry picked from commit 6c3f0cc) * Use SpinWait to hold the thread (cherry picked from commit 1b4ac7a) * fix unit and integ tests * address norm's comments --------- Co-authored-by: jkwpappin <[email protected]>
1 parent 468b71d commit b85ddcc

File tree

11 files changed

+258
-19
lines changed

11 files changed

+258
-19
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"Projects": [
3+
{
4+
"Name": "AWS.Messaging",
5+
"Type": "Minor",
6+
"ChangelogMessages": [
7+
"Implement a start/stop mechanism for message consumption. (ISSUE 147)"
8+
]
9+
}
10+
]
11+
}

sampleapps/SubscriberService/Program.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Text.Json;
5+
using AWS.Messaging.Configuration;
56
using AWS.Messaging.Telemetry.OpenTelemetry;
67
using Microsoft.Extensions.Configuration;
78
using Microsoft.Extensions.DependencyInjection;
@@ -49,6 +50,13 @@ await Host.CreateDefaultBuilder(args)
4950
});
5051
});
5152

53+
// Optional: Configure a PollingControlToken, you can call Start()/Stop() to start and stop message processing, by default it will be started
54+
builder.ConfigurePollingControlToken(new PollingControlToken
55+
{
56+
// Optional: Set how frequently it will check for changes to the state of the PollingControlToken
57+
PollingWaitTime = TimeSpan.FromMilliseconds(200)
58+
});
59+
5260
// Logging data messages is disabled by default to protect sensitive user data. If you want this enabled, uncomment the line below.
5361
// builder.EnableMessageContentLogging();
5462
})

src/AWS.Messaging/Configuration/IMessageBusBuilder.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,9 @@ public interface IMessageBusBuilder
112112
/// Configures the backoff policy used by <see cref="BackoffHandler"/> and its available options.
113113
/// </summary>
114114
IMessageBusBuilder ConfigureBackoffPolicy(Action<BackoffPolicyBuilder> configure);
115+
116+
/// <summary>
117+
/// Configures the <see cref="PollingControlToken"/>, which can be used to start and stop the SQS Message Poller.
118+
/// </summary>
119+
IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken);
115120
}

src/AWS.Messaging/Configuration/IMessageConfiguration.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using AWS.Messaging.Configuration.Internal;
55
using AWS.Messaging.Serialization;
6+
using AWS.Messaging.Services;
67
using AWS.Messaging.Services.Backoff;
78
using AWS.Messaging.Services.Backoff.Policies;
89
using AWS.Messaging.Services.Backoff.Policies.Options;
@@ -93,4 +94,9 @@ public interface IMessageConfiguration
9394
/// Holds an instance of <see cref="CappedExponentialBackoffOptions"/> to control the behavior of <see cref="CappedExponentialBackoffPolicy"/>.
9495
/// </summary>
9596
CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; }
97+
98+
/// <summary>
99+
/// Holds an instance of <see cref="PollingControlToken"/> to control behaviour of <see cref="IMessagePoller"/>
100+
/// </summary>
101+
PollingControlToken PollingControlToken { get; }
96102
}

src/AWS.Messaging/Configuration/MessageBusBuilder.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
using System.Collections.Concurrent;
45
using AWS.Messaging.Configuration.Internal;
56
using AWS.Messaging.Publishers;
67
using AWS.Messaging.Publishers.EventBridge;
@@ -26,7 +27,7 @@ namespace AWS.Messaging.Configuration;
2627
/// </summary>
2728
public class MessageBusBuilder : IMessageBusBuilder
2829
{
29-
private static readonly Dictionary<IServiceCollection, MessageConfiguration> _messageConfigurations = new();
30+
private static readonly ConcurrentDictionary<IServiceCollection, MessageConfiguration> _messageConfigurations = new();
3031
private readonly MessageConfiguration _messageConfiguration;
3132
private readonly IList<ServiceDescriptor> _additionalServices = new List<ServiceDescriptor>();
3233
private readonly IServiceCollection _serviceCollection;
@@ -164,6 +165,13 @@ public IMessageBusBuilder AddMessageSourceSuffix(string suffix)
164165
return this;
165166
}
166167

168+
/// <inheritdoc/>
169+
public IMessageBusBuilder ConfigurePollingControlToken(PollingControlToken pollingControlToken)
170+
{
171+
_messageConfiguration.PollingControlToken = pollingControlToken;
172+
return this;
173+
}
174+
167175
/// <inheritdoc/>
168176
[RequiresDynamicCode("This method requires loading types dynamically as defined in the configuration system.")]
169177
[RequiresUnreferencedCode("This method requires loading types dynamically as defined in the configuration system.")]
@@ -327,6 +335,7 @@ internal void Build()
327335
_serviceCollection.TryAdd(ServiceDescriptor.Singleton<ILoggerFactory, NullLoggerFactory>());
328336
_serviceCollection.TryAdd(ServiceDescriptor.Singleton(typeof(ILogger<>), typeof(NullLogger<>)));
329337

338+
_serviceCollection.TryAddSingleton(_messageConfiguration.PollingControlToken);
330339
_serviceCollection.TryAddSingleton<IMessageConfiguration>(_messageConfiguration);
331340
_serviceCollection.TryAddSingleton<IMessageSerializer, MessageSerializer>();
332341
_serviceCollection.TryAddSingleton<IEnvelopeSerializer, EnvelopeSerializer>();

src/AWS.Messaging/Configuration/MessageConfiguration.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,7 @@ public class MessageConfiguration : IMessageConfiguration
6565

6666
/// <inheritdoc/>
6767
public CappedExponentialBackoffOptions CappedExponentialBackoffOptions { get; set; } = new();
68+
69+
/// <inheritdoc/>
70+
public PollingControlToken PollingControlToken { get; set; } = new();
6871
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
namespace AWS.Messaging.Configuration
5+
{
6+
/// <summary>
7+
/// Control token to start and stop message polling for a service.
8+
/// </summary>
9+
public class PollingControlToken
10+
{
11+
/// <summary>
12+
/// Indicates if polling is enabled.
13+
/// </summary>
14+
internal bool IsPollingEnabled { get; private set; } = true;
15+
16+
/// <summary>
17+
/// Start polling of the SQS Queue.
18+
/// </summary>
19+
public void StartPolling() => IsPollingEnabled = true;
20+
21+
/// <summary>
22+
/// Stop polling of the SQS Queue.
23+
/// </summary>
24+
public void StopPolling() => IsPollingEnabled = false;
25+
26+
/// <summary>
27+
/// Configurable amount of time to wait between polling for a change in status
28+
/// </summary>
29+
public TimeSpan PollingWaitTime { get; init; } = TimeSpan.FromMilliseconds(200);
30+
}
31+
}

src/AWS.Messaging/SQS/SQSMessagePoller.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
2222
private readonly SQSMessagePollerConfiguration _configuration;
2323
private readonly IEnvelopeSerializer _envelopeSerializer;
2424
private readonly IBackoffHandler _backoffHandler;
25+
private readonly PollingControlToken _pollingControlToken;
2526
private readonly bool _isFifoEndpoint;
2627

2728
/// <summary>
@@ -49,19 +50,22 @@ internal class SQSMessagePoller : IMessagePoller, ISQSMessageCommunication
4950
/// <param name="configuration">The SQS message poller configuration.</param>
5051
/// <param name="envelopeSerializer">Serializer used to deserialize the SQS messages</param>
5152
/// <param name="backoffHandler">Backoff handler for performing back-offs if exceptions are thrown when polling SQS.</param>
53+
/// <param name="pollingControlToken">Control token to start and stop the poller.</param>
5254
public SQSMessagePoller(
5355
ILogger<SQSMessagePoller> logger,
5456
IMessageManagerFactory messageManagerFactory,
5557
IAWSClientProvider awsClientProvider,
5658
SQSMessagePollerConfiguration configuration,
5759
IEnvelopeSerializer envelopeSerializer,
58-
IBackoffHandler backoffHandler)
60+
IBackoffHandler backoffHandler,
61+
PollingControlToken pollingControlToken)
5962
{
6063
_logger = logger;
6164
_sqsClient = awsClientProvider.GetServiceClient<IAmazonSQS>();
6265
_configuration = configuration;
6366
_envelopeSerializer = envelopeSerializer;
6467
_backoffHandler = backoffHandler;
68+
_pollingControlToken = pollingControlToken;
6569
_isFifoEndpoint = configuration.SubscriberEndpoint.EndsWith(".fifo");
6670

6771
_messageManager = messageManagerFactory.CreateMessageManager(this, _configuration.ToMessageManagerConfiguration());
@@ -74,13 +78,18 @@ public async Task StartPollingAsync(CancellationToken token = default)
7478
}
7579

7680
/// <summary>
77-
/// Polls SQS indefinitely until cancelled
78-
/// </summary>
81+
/// Polls SQS indefinitely until cancelled. Message receipt can be stopped and started using <see cref="PollingControlToken"/></summary>
7982
/// <param name="token">Cancellation token to shutdown the poller.</param>
8083
private async Task PollQueue(CancellationToken token)
8184
{
8285
while (!token.IsCancellationRequested)
8386
{
87+
if (!_pollingControlToken.IsPollingEnabled)
88+
{
89+
await Task.Delay(_pollingControlToken.PollingWaitTime, token);
90+
continue;
91+
}
92+
8493
var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount;
8594

8695
// If already processing the maximum number of messages, wait for at least one to complete and then try again

test/AWS.Messaging.IntegrationTests/SubscriberTests.cs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Threading.Tasks;
88
using Amazon.SQS;
99
using Amazon.SQS.Model;
10+
using AWS.Messaging.Configuration;
1011
using AWS.Messaging.IntegrationTests.Handlers;
1112
using AWS.Messaging.IntegrationTests.Models;
1213
using AWS.Messaging.Tests.Common.Services;
@@ -146,6 +147,88 @@ await publisher.PublishAsync(new ChatMessage
146147
}
147148
}
148149

150+
[Fact]
151+
public async Task ReceiveMultipleMessagesOnlyWhenPollingControlTokenStarted()
152+
{
153+
var pollingControlToken = new PollingControlToken();
154+
_serviceCollection.AddSingleton<TempStorage<ChatMessage>>();
155+
_serviceCollection.AddAWSMessageBus(builder =>
156+
{
157+
builder.ConfigurePollingControlToken(pollingControlToken);
158+
builder.AddSQSPublisher<ChatMessage>(_sqsQueueUrl);
159+
builder.AddSQSPoller(_sqsQueueUrl, options =>
160+
{
161+
options.VisibilityTimeoutExtensionThreshold = 3; // and a message is eligible for extension after it's been processing at least 3 seconds
162+
options.MaxNumberOfConcurrentMessages = 10;
163+
options.WaitTimeSeconds = 2;
164+
});
165+
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
166+
builder.AddMessageSource("/aws/messaging");
167+
builder.ConfigureBackoffPolicy(policyBuilder =>
168+
{
169+
policyBuilder.UseNoBackoff();
170+
});
171+
});
172+
var serviceProvider = _serviceCollection.BuildServiceProvider();
173+
174+
var publishStartTime = DateTime.UtcNow;
175+
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
176+
for (int i = 0; i < 5; i++)
177+
{
178+
await publisher.PublishAsync(new ChatMessage
179+
{
180+
MessageDescription = $"Test{i + 1}"
181+
});
182+
}
183+
var publishEndTime = DateTime.UtcNow;
184+
185+
var pump = serviceProvider.GetRequiredService<IHostedService>() as MessagePumpService;
186+
Assert.NotNull(pump);
187+
var source = new CancellationTokenSource();
188+
189+
await pump.StartAsync(source.Token);
190+
191+
// Wait for the pump to shut down after processing the expected number of messages,
192+
// with some padding to ensure messages aren't being processed more than once
193+
source.CancelAfter(30_000);
194+
195+
var tempStorage = serviceProvider.GetRequiredService<TempStorage<ChatMessage>>();
196+
while (tempStorage.Messages.Count < 5 && !source.IsCancellationRequested)
197+
{
198+
await Task.Delay(200, source.Token);
199+
}
200+
201+
// Stop polling and wait for the polling cycle to complete with a buffer
202+
pollingControlToken.StopPolling();
203+
204+
await Task.Delay(5_000);
205+
206+
// Publish the next 5 messages that should not be received due to stopping polling
207+
for (int i = 5; i < 10; i++)
208+
{
209+
await publisher.PublishAsync(new ChatMessage
210+
{
211+
MessageDescription = $"Test{i + 1}"
212+
});
213+
}
214+
215+
SpinWait.SpinUntil(() => source.IsCancellationRequested);
216+
217+
var inMemoryLogger = serviceProvider.GetRequiredService<InMemoryLogger>();
218+
219+
Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest")));
220+
Assert.Equal(5, tempStorage.Messages.Count);
221+
for (int i = 0; i < 5; i++)
222+
{
223+
var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}"));
224+
Assert.NotNull(message);
225+
Assert.False(string.IsNullOrEmpty(message.Id));
226+
Assert.Equal("/aws/messaging", message.Source.ToString());
227+
Assert.True(message.TimeStamp > publishStartTime);
228+
Assert.True(message.TimeStamp < publishEndTime);
229+
}
230+
}
231+
149232
[Theory]
150233
[InlineData(20)]
151234
public async Task SendMixOfMessageTypesToSameQueue(int numberOfMessages)

test/AWS.Messaging.UnitTests/MessagePublisherTests.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,7 @@ public async Task EventBridgePublisher_UnhappyPath()
764764
new DefaultTelemetryFactory(serviceProvider)
765765
);
766766

767-
var publishResponse = Assert.ThrowsAsync<FailedToPublishException>(async () => await messagePublisher.PublishAsync(_chatMessage));
767+
var publishResponse = await Assert.ThrowsAsync<FailedToPublishException>(async () => await messagePublisher.PublishAsync(_chatMessage));
768768

769769
_eventBridgeClient.Verify(x =>
770770
x.PutEventsAsync(
@@ -774,10 +774,9 @@ public async Task EventBridgePublisher_UnhappyPath()
774774
It.IsAny<CancellationToken>()),
775775
Times.Exactly(1));
776776

777-
var publishResponseResult = await publishResponse;
778-
Assert.Equal("Message failed to publish.", publishResponseResult.Message);
779-
Assert.Equal("ErrorMessage", publishResponseResult.InnerException!.Message);
780-
Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponseResult.InnerException).ErrorCode);
777+
Assert.Equal("Message failed to publish.", publishResponse.Message);
778+
Assert.Equal("ErrorMessage", publishResponse.InnerException!.Message);
779+
Assert.Equal("ErrorCode", ((EventBridgePutEventsException)publishResponse.InnerException).ErrorCode);
781780
}
782781

783782
[Fact]

0 commit comments

Comments
 (0)