Skip to content

Commit

Permalink
* Stop using amq.fanout in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed May 1, 2024
1 parent 0e88862 commit 0079ee7
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 18 deletions.
3 changes: 2 additions & 1 deletion build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ if ($RunTests)
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[WARNING] tests errored, exiting" -Foreground "Red"
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,29 @@ public async Task TestExchangeRecoveryTest()
public async Task TestExchangeToExchangeBindingRecovery()
{
string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;
string x1 = "amq.fanout";
string x2 = GenerateExchangeName();

await _channel.ExchangeDeclareAsync(x2, "fanout");
await _channel.ExchangeBindAsync(x1, x2, "");
await _channel.QueueBindAsync(q, x1, "");
string ex_source = GenerateExchangeName();
string ex_destination = GenerateExchangeName();

await _channel.ExchangeDeclareAsync(ex_source, ExchangeType.Fanout);
await _channel.ExchangeDeclareAsync(ex_destination, ExchangeType.Fanout);

await _channel.ExchangeBindAsync(destination: ex_destination, source: ex_source, "");
await _channel.QueueBindAsync(q, ex_destination, "");

try
{
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg"));
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"));
await AssertMessageCountAsync(q, 1);
}
finally
{
await WithTemporaryChannelAsync(async ch =>
{
await ch.ExchangeDeleteAsync(x2);
await ch.ExchangeDeleteAsync(ex_source);
await ch.ExchangeDeleteAsync(ex_destination);
await ch.QueueDeleteAsync(q);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,30 @@ public TestRecoveryWithDeletedEntities(ITestOutputHelper output) : base(output)
public async Task TestThatDeletedExchangeBindingsDontReappearOnRecovery()
{
string q = (await _channel.QueueDeclareAsync("", false, false, false)).QueueName;
string x1 = "amq.fanout";
string x2 = GenerateExchangeName();

await _channel.ExchangeDeclareAsync(x2, "fanout");
await _channel.ExchangeBindAsync(x1, x2, "");
await _channel.QueueBindAsync(q, x1, "");
await _channel.ExchangeUnbindAsync(x1, x2, "");
string ex_source = GenerateExchangeName();
string ex_destination = GenerateExchangeName();

await _channel.ExchangeDeclareAsync(ex_source, ExchangeType.Fanout);
await _channel.ExchangeDeclareAsync(ex_destination, ExchangeType.Fanout);

await _channel.ExchangeBindAsync(destination: ex_destination, source: ex_source, "");
await _channel.QueueBindAsync(q, ex_destination, "");
await _channel.ExchangeUnbindAsync(ex_destination, ex_source, "");

try
{
await CloseAndWaitForRecoveryAsync();
Assert.True(_channel.IsOpen);
await _channel.BasicPublishAsync(x2, "", _encoding.GetBytes("msg"));
await _channel.BasicPublishAsync(ex_source, "", _encoding.GetBytes("msg"));
await AssertMessageCountAsync(q, 0);
}
finally
{
await WithTemporaryChannelAsync(async ch =>
{
await ch.ExchangeDeleteAsync(x2);
await ch.ExchangeDeleteAsync(ex_source);
await ch.ExchangeDeleteAsync(ex_destination);
await ch.QueueDeleteAsync(q);
});
}
Expand Down
9 changes: 7 additions & 2 deletions projects/Test/Integration/TestExchangeDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public async Task TestConcurrentExchangeDeclareAndBind()
{
var exchangeNames = new ConcurrentBag<string>();
var tasks = new List<Task>();

string ex_destination = GenerateExchangeName();
await _channel.ExchangeDeclareAsync(exchange: ex_destination, type: "fanout", false, false);

NotSupportedException nse = null;
for (int i = 0; i < 256; i++)
{
Expand All @@ -61,7 +65,7 @@ async Task f()
await Task.Delay(S_Random.Next(5, 50));
string exchangeName = GenerateExchangeName();
await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: "fanout", false, false);
await _channel.ExchangeBindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused");
await _channel.ExchangeBindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused");
exchangeNames.Add(exchangeName);
}
catch (NotSupportedException e)
Expand All @@ -84,7 +88,7 @@ async Task f()
try
{
await Task.Delay(S_Random.Next(5, 50));
await _channel.ExchangeUnbindAsync(destination: "amq.fanout", source: exchangeName, routingKey: "unused",
await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused",
noWait: false, arguments: null);
await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false);
}
Expand All @@ -99,6 +103,7 @@ await _channel.ExchangeUnbindAsync(destination: "amq.fanout", source: exchangeNa

await AssertRanToCompletion(tasks);
Assert.Null(nse);
await _channel.ExchangeDeleteAsync(exchange: ex_destination);
}

[Fact]
Expand Down

0 comments on commit 0079ee7

Please sign in to comment.