Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support specification of a consumer group #283

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions src/DurableTask.Netherite/Connections/EventHubsUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static class EventHubsUtil
/// <returns>true if the event hub was created.</returns>
public static async Task<bool> EnsureEventHubExistsAsync(ConnectionInfo info, string eventHubName, int partitionCount, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(info, eventHubName, partitionCount, cancellationToken);
var response = await SendEventHubRequest(info, eventHubName, partitionCount, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
Expand All @@ -42,20 +42,32 @@ public static async Task<bool> EnsureEventHubExistsAsync(ConnectionInfo info, st
/// <returns>true if the event hub was deleted.</returns>
public static async Task<bool> DeleteEventHubIfExistsAsync(ConnectionInfo info, string eventHubName, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(info, eventHubName, null, cancellationToken);
var response = await SendEventHubRequest(info, eventHubName, null, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.NotFound)
{
response.EnsureSuccessStatusCode();
}
return response.StatusCode == System.Net.HttpStatusCode.OK;
}

static async Task<HttpResponseMessage> SendHttpRequest(ConnectionInfo info, string eventHubName, int? partitionCount, CancellationToken cancellationToken)
public static async Task<bool> EnsureConsumerGroupExistsAsync(ConnectionInfo info, string eventHubName, string consumerGroup, CancellationToken cancellationToken)
{
var response = await SendConsumerGroupRequest(info, eventHubName, consumerGroup, delete: false, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
}
return response.StatusCode == System.Net.HttpStatusCode.Created;
}

// for documentation of these APIs, see https://learn.microsoft.com/en-us/rest/api/eventhub/event-hubs-management-rest

static async Task<HttpResponseMessage> SendEventHubRequest(ConnectionInfo info, string eventHubPath, int? partitionCount, CancellationToken cancellationToken)
{
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubName}?timeout=60&api-version=2014-01");
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}?timeout=60&api-version=2014-01");
request.Method = partitionCount.HasValue ? HttpMethod.Put : HttpMethod.Delete;
if (partitionCount.HasValue)
{
Expand All @@ -78,5 +90,30 @@ static async Task<HttpResponseMessage> SendHttpRequest(ConnectionInfo info, stri

return await client.SendAsync(request);
}

static async Task<HttpResponseMessage> SendConsumerGroupRequest(ConnectionInfo info, string eventHubPath, string consumerGroupName, bool delete, CancellationToken cancellationToken)
{
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}/consumerGroups/{consumerGroupName}?timeout=60&api-version=2014-01");
request.Method = delete ? HttpMethod.Delete : HttpMethod.Put;
request.Content = new StringContent(@"
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<ConsumerGroupDescription xmlns:i='http://www.w3.org/2001/XMLSchema-instance' xmlns='http://schemas.microsoft.com/netservices/2010/10/servicebus/connect'>
</ConsumerGroupDescription>
</content>
</entry>",
Encoding.UTF8,
"application/xml");

request.Headers.Add("Host", info.HostName);

// add an authorization header to the request
await info.AuthorizeHttpRequestMessage(request, cancellationToken);

return await client.SendAsync(request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";

/// <summary>
/// The consumer group to use. By specifying different consumer groups, two task hubs can use
/// the same event hubs namespace at the same time. However, note that this can waste bandwidth since messages
/// are always delivered to all consumer groups (even if only meaningfully processed by one of them). Also,
/// for some event hub plan the number of consumer groups is limited.
/// </summary>
public string EventHubsConsumerGroup { get; set; } = "$Default";

/// <summary>
/// Tuning parameters for the FASTER logs
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class EventHubsConnections
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;
readonly string consumerGroup;

EventHubClient partitionClient;
List<EventHubClient> clientClients;
Expand All @@ -46,15 +47,19 @@ public EventHubsConnections(
string partitionHub,
string[] clientHubs,
string loadMonitorHub,
string consumerGroup,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
this.consumerGroup = consumerGroup;
}

const string defaultConsumerGroup = "$Default";

public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";

public async Task StartAsync(TaskhubParameters parameters)
Expand All @@ -63,6 +68,11 @@ await Task.WhenAll(
this.EnsurePartitionsAsync(parameters.PartitionCount),
this.EnsureClientsAsync(),
this.EnsureLoadMonitorAsync());

if (this.consumerGroup != defaultConsumerGroup)
{
await this.EnsureConsumerGroupsExistAsync();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider doing the Consumer Group Creation in the OrchestrationServiceCreateAsync call? since its a resource being created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would not work beause the event hub itself is dynamically created (and sometimes deleted and recreated for certain recovery scenarios).

}
}

public Task StopAsync()
Expand Down Expand Up @@ -121,6 +131,31 @@ async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount)
}
}

public Task EnsureConsumerGroupsExistAsync()
{
return Task.WhenAll(
EnsureExistsAsync(this.partitionHub),
EnsureExistsAsync(this.loadMonitorHub),
Task.WhenAll(this.clientHubs.Select(clientHub => EnsureExistsAsync(clientHub)).ToList())
);

async Task EnsureExistsAsync(string eventHubName)
{
this.TraceHelper.LogDebug("Creating ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup);
bool success = await EventHubsUtil.EnsureConsumerGroupExistsAsync(this.connectionInfo, eventHubName, this.consumerGroup, CancellationToken.None);
if (success)
{
this.TraceHelper.LogInformation("Created ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
}
else
{
this.TraceHelper.LogDebug("Conflict on ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}


internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class EventHubsTransport :
readonly ILogger logger;
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
readonly string shortClientId;
readonly string consumerGroup;

string shortClientId;

EventProcessorHost eventProcessorHost;
EventProcessorHost loadMonitorHost;
Expand Down Expand Up @@ -70,6 +72,7 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio
string namespaceName = settings.EventHubsConnection.ResourceName;
this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory);
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
this.consumerGroup = settings.EventHubsConsumerGroup;
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);
}
Expand All @@ -78,9 +81,6 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio
public static string PartitionHub = "partitions";
public static string[] ClientHubs = { "clients0", "clients1", "clients2", "clients3" };
public static string LoadMonitorHub = "loadmonitor";
public static string PartitionConsumerGroup = "$Default";
public static string ClientConsumerGroup = "$Default";
public static string LoadMonitorConsumerGroup = "$Default";

async Task<TaskhubParameters> ITransportLayer.StartAsync()
{
Expand All @@ -106,44 +106,91 @@ async Task<TaskhubParameters> ITransportLayer.StartAsync()
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);

this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.consumerGroup, this.shutdownSource.Token)
{
Host = host,
TraceHelper = this.traceHelper,
};

await this.connections.StartAsync(this.parameters);

this.traceHelper.LogInformation("EventHubsTransport is connecting to {fingerPrint} via consumer group {consumerGroupName}", this.connections.Fingerprint, this.consumerGroup);

return this.parameters;
}

async Task ITransportLayer.StartClientAsync()
{
this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);

var channel = Channel.CreateBounded<ClientEvent>(new BoundedChannelOptions(500)
{
SingleReader = true,
AllowSynchronousContinuations = true
});

var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
PartitionReceiver[] clientReceivers = null;

int attempt = 0;
int maxAttempts = 8;

while (attempt++ < maxAttempts)
{
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);

clientReceivers = this.connections.CreateClientReceivers(this.ClientId, this.consumerGroup);

try
{
this.clientConnectionsEstablished = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
.ToArray();

// we must wait for the client receive connections to be established before continuing
// otherwise, we may miss messages that are sent before the client receiver establishes the receive position
await Task.WhenAll(this.clientConnectionsEstablished);

break; // was successful, so we exit retry loop
}
catch (Microsoft.Azure.EventHubs.QuotaExceededException) when (attempt < maxAttempts)
{
this.traceHelper.LogWarning("EventHubsTransport encountered quota-exceeded exception");
}
catch (Exception exception) when (attempt < maxAttempts)
{
this.traceHelper.LogInformation("EventHubsTransport failed with exception {exception}", exception);
}

TimeSpan retryDelay = TimeSpan.FromSeconds(1 + attempt * 10);
this.traceHelper.LogDebug("EventHubsTransport retrying client connection in {retryDelay}", retryDelay);
Task retryDelayTask = Task.Delay(retryDelay);

foreach (var clientReceiver in clientReceivers)
{
try
{
await clientReceiver.CloseAsync();
}

catch (Exception exception)
{
this.traceHelper.LogWarning("EventHubsTransport failed to close partition receiver {clientReceiver} during retry: {exception}", clientReceiver, exception);
}
}

await retryDelayTask;
}

this.traceHelper.LogInformation("EventHubsTransport connected to {fingerPrint} via consumer group {consumerGroup}", this.connections.Fingerprint, this.consumerGroup);

this.clientConnectionsEstablished = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
.ToArray();

this.clientReceiveLoops = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
.ToArray();

this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);

// we must wait for the client receive connections to be established before continuing
// otherwise, we may miss messages that are sent before the client receiver establishes the receive position
await Task.WhenAll(this.clientConnectionsEstablished);
this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
}

async Task ITransportLayer.StartWorkersAsync()
Expand All @@ -170,7 +217,7 @@ async Task StartPartitionHost()
this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.consumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
Expand All @@ -192,7 +239,7 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host");
this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
EventHubsTransport.PartitionHub,
EventHubsTransport.PartitionConsumerGroup,
this.consumerGroup,
this.settings.EventHubsConnection,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
Expand All @@ -216,7 +263,7 @@ async Task StartLoadMonitorHost()
this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
LoadMonitorHub,
LoadMonitorConsumerGroup,
this.consumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}");
Expand Down
4 changes: 4 additions & 0 deletions test/PerformanceTests/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
// set this to control the max size of the orchestration instance cache
// "InstanceCacheSizeMB": "50",

// the consumer group to use. Two task hubs can use the same event hubs namespace
// if their consumer groups are different.
"EventHubsConsumerGroup": "$Default",

// set this to true to use the PSF support in Faster. Will soon be obsolete.
"UsePSFQueries": "false",

Expand Down