Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make EventHubs creation more robust #136

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,29 @@ IEnumerable<EventHubClient> Clients()
await Task.WhenAll(Clients().Select(client => client.CloseAsync()).ToList());
}

public async Task EnsurePartitionsAsync(int partitionCount, int retries = 3)
const int EventHubCreationRetries = 5;

async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount)
{
this.TraceHelper.LogDebug("Creating EventHub {name}", eventHubName);
bool success = await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionString, eventHubName, partitionCount);
if (success)
{
this.TraceHelper.LogInformation("Created EventHub {name}", eventHubName);
}
else
{
this.TraceHelper.LogDebug("Conflict on EventHub {name}", eventHubName);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}

internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionString, this.partitionHub);
}

public async Task EnsurePartitionsAsync(int partitionCount, int retries = EventHubCreationRetries)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
{
Expand Down Expand Up @@ -121,8 +143,7 @@ public async Task EnsurePartitionsAsync(int partitionCount, int retries = 3)
}
catch (Microsoft.Azure.EventHubs.MessagingEntityNotFoundException) when (retries > 0)
{
this.TraceHelper.LogInformation("Creating EventHub {name}", this.partitionHub);
await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionString, this.partitionHub, partitionCount);
await this.EnsureEventHubExistsAsync(this.partitionHub, partitionCount);
}

// try again
Expand All @@ -137,11 +158,6 @@ public async Task EnsurePartitionsAsync(int partitionCount, int retries = 3)
}
}

internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionString, this.partitionHub);
}

async Task EnsureClientsAsync()
{
var clientTasks = new List<Task<(EventHubClient, EventHubRuntimeInformation)>>();
Expand All @@ -164,7 +180,7 @@ async Task EnsureClientsAsync()
}
}

async Task<(EventHubClient, EventHubRuntimeInformation)> EnsureClientAsync(int i, int retries = 2)
async Task<(EventHubClient, EventHubRuntimeInformation)> EnsureClientAsync(int i, int retries = EventHubCreationRetries)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
{
Expand All @@ -178,15 +194,14 @@ async Task EnsureClientsAsync()
}
catch (Microsoft.Azure.EventHubs.MessagingEntityNotFoundException) when (retries > 0)
{
this.TraceHelper.LogInformation("Creating EventHub {name}", this.clientHubs[i]);
await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionString, this.clientHubs[i], 32);
await this.EnsureEventHubExistsAsync(this.clientHubs[i], 32);
}
// try again
return await this.EnsureClientAsync(i, retries - 1);
}


async Task EnsureLoadMonitorAsync(int retries = 2)
async Task EnsureLoadMonitorAsync(int retries = EventHubCreationRetries)
{
// create loadmonitor client
var connectionStringBuilder = new EventHubsConnectionStringBuilder(this.connectionString)
Expand All @@ -201,8 +216,7 @@ async Task EnsureLoadMonitorAsync(int retries = 2)
}
catch (Microsoft.Azure.EventHubs.MessagingEntityNotFoundException) when (retries > 0)
{
this.TraceHelper.LogInformation("Creating EventHub {name}", this.loadMonitorHub);
await EventHubsUtil.EnsureEventHubExistsAsync(this.connectionString, this.loadMonitorHub, 1);
await this.EnsureEventHubExistsAsync(this.loadMonitorHub, 1);
}
// try again
await this.EnsureLoadMonitorAsync(retries - 1);
Expand Down