Skip to content

Commit 3596df7

Browse files
committed
* Add queue arguments to test to simulate nacks.
1 parent f4ba70e commit 3596df7

File tree

1 file changed

+61
-12
lines changed

1 file changed

+61
-12
lines changed

projects/Test/Integration/TestFloodPublishing.cs

+61-12
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Concurrent;
3334
using System.Collections.Generic;
3435
using System.Diagnostics;
3536
using System.Threading;
@@ -43,7 +44,7 @@ namespace Test.Integration
4344
{
4445
public class TestFloodPublishing : IntegrationFixture
4546
{
46-
private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5);
47+
private static readonly TimeSpan ElapsedMax = TimeSpan.FromSeconds(10);
4748
private readonly byte[] _body = GetRandomBody(2048);
4849

4950
public TestFloodPublishing(ITestOutputHelper output) : base(output)
@@ -92,36 +93,82 @@ public async Task TestUnthrottledFloodPublishing()
9293
return Task.CompletedTask;
9394
};
9495

96+
var queueArguments = new Dictionary<string, object>
97+
{
98+
["x-max-length"] = 131072,
99+
["x-overflow"] = "reject-publish"
100+
};
101+
102+
QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty,
103+
passive: false, durable: false, exclusive: true, autoDelete: true, arguments: queueArguments);
104+
string queueName = q.QueueName;
105+
106+
var exceptions = new ConcurrentBag<Exception>();
95107
var stopwatch = Stopwatch.StartNew();
96108
int publishCount = 0;
97109
try
98110
{
99111
var tasks = new List<Task>();
100-
for (int j = 0; j < 64; j++)
112+
for (int j = 0; j < 8; j++)
101113
{
102114
tasks.Add(Task.Run(async () =>
103115
{
104-
var publishTasks = new List<Task>();
105-
for (int i = 0; i < 65536 * 2; i++)
116+
var publishTasks = new List<ValueTask>();
117+
for (int i = 0; i < 65536; i++)
106118
{
107-
if (stopwatch.Elapsed > FiveSeconds)
119+
if (stopwatch.Elapsed > ElapsedMax)
108120
{
109-
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
121+
foreach (ValueTask pt in publishTasks)
122+
{
123+
try
124+
{
125+
await pt;
126+
}
127+
catch (Exception ex)
128+
{
129+
exceptions.Add(ex);
130+
}
131+
}
110132
publishTasks.Clear();
111-
break;
133+
return;
112134
}
113135

114136
Interlocked.Increment(ref publishCount);
115-
publishTasks.Add(_channel.BasicPublishAsync(CachedString.Empty, CachedString.Empty, _body).AsTask());
137+
publishTasks.Add(_channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, mandatory: true,
138+
body: _body));
116139

117-
if (i % 500 == 0)
140+
if (i % 128 == 0)
118141
{
119-
await Task.WhenAll(publishTasks).WaitAsync(ShortSpan);
142+
foreach (ValueTask pt in publishTasks)
143+
{
144+
try
145+
{
146+
await pt;
147+
}
148+
catch (Exception ex)
149+
{
150+
exceptions.Add(ex);
151+
}
152+
}
120153
publishTasks.Clear();
121154
}
122155
}
156+
157+
foreach (ValueTask pt in publishTasks)
158+
{
159+
try
160+
{
161+
await pt;
162+
}
163+
catch (Exception ex)
164+
{
165+
exceptions.Add(ex);
166+
}
167+
}
168+
publishTasks.Clear();
123169
}));
124170
}
171+
125172
await Task.WhenAll(tasks).WaitAsync(WaitSpan);
126173
}
127174
finally
@@ -131,9 +178,11 @@ public async Task TestUnthrottledFloodPublishing()
131178

132179
Assert.True(_conn.IsOpen);
133180
Assert.False(sawUnexpectedShutdown);
134-
if (IsVerbose)
181+
// if (IsVerbose)
182+
if (true)
135183
{
136-
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
184+
_output.WriteLine("[INFO] published {0} messages in {1}, exceptions: {2}",
185+
publishCount, stopwatch.Elapsed, exceptions.Count);
137186
}
138187
}
139188

0 commit comments

Comments
 (0)