diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs
index 62f52716..98a48939 100644
--- a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs
+++ b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs
@@ -19,9 +19,10 @@ interface IPartitionState
/// The partition.
/// An error handler to initiate and/or indicate termination of this partition.
/// A fingerprint for the input queue.
+ /// Initial offset for the input queue, if this partition is being created.
/// the input queue position from which to resume input processing
/// Indicates that termination was signaled before the operation completed.
- Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);
+ Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset);
///
/// Starts processing, after creating or restoring the partition state.
diff --git a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs
index ba66e667..28b1d203 100644
--- a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs
+++ b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs
@@ -84,13 +84,14 @@ public interface IPartition
///
/// A termination object for initiating and/or detecting termination of the partition.
/// Fingerprint for the intput queue.
+ /// Initial offset for the input queue, if this partition is being created.
/// The input queue position of the next message to receive.
///
/// The termination token source can be used for immediately terminating the partition.
/// Also, it can be used to detect that the partition has terminated for any other reason,
/// be it cleanly (after StopAsync) or uncleanly (after losing a lease or hitting a fatal error).
///
- Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);
+ Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset = 0);
///
/// Clean shutdown: stop processing, save partition state to storage, and release ownership.
diff --git a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs
index 7786676d..a22dc9f9 100644
--- a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs
+++ b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs
@@ -26,7 +26,7 @@ public static class EventHubsUtil
/// true if the event hub was created.
public static async Task EnsureEventHubExistsAsync(ConnectionInfo info, string eventHubName, int partitionCount, CancellationToken cancellationToken)
{
- var response = await SendHttpRequest(info, eventHubName, partitionCount, cancellationToken);
+ var response = await SendEventHubRequest(info, eventHubName, partitionCount, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
@@ -42,7 +42,7 @@ public static async Task EnsureEventHubExistsAsync(ConnectionInfo info, st
/// true if the event hub was deleted.
public static async Task DeleteEventHubIfExistsAsync(ConnectionInfo info, string eventHubName, CancellationToken cancellationToken)
{
- var response = await SendHttpRequest(info, eventHubName, null, cancellationToken);
+ var response = await SendEventHubRequest(info, eventHubName, null, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.NotFound)
{
response.EnsureSuccessStatusCode();
@@ -50,12 +50,24 @@ public static async Task DeleteEventHubIfExistsAsync(ConnectionInfo info,
return response.StatusCode == System.Net.HttpStatusCode.OK;
}
- static async Task SendHttpRequest(ConnectionInfo info, string eventHubName, int? partitionCount, CancellationToken cancellationToken)
+ public static async Task EnsureConsumerGroupExistsAsync(ConnectionInfo info, string eventHubName, string consumerGroup, CancellationToken cancellationToken)
+ {
+ var response = await SendConsumerGroupRequest(info, eventHubName, consumerGroup, delete: false, cancellationToken);
+ if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
+ {
+ response.EnsureSuccessStatusCode();
+ }
+ return response.StatusCode == System.Net.HttpStatusCode.Created;
+ }
+
+ // for documentation of these APIs, see https://learn.microsoft.com/en-us/rest/api/eventhub/event-hubs-management-rest
+
+ static async Task SendEventHubRequest(ConnectionInfo info, string eventHubPath, int? partitionCount, CancellationToken cancellationToken)
{
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
- request.RequestUri = new Uri($"https://{info.HostName}/{eventHubName}?timeout=60&api-version=2014-01");
+ request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}?timeout=60&api-version=2014-01");
request.Method = partitionCount.HasValue ? HttpMethod.Put : HttpMethod.Delete;
if (partitionCount.HasValue)
{
@@ -78,5 +90,30 @@ static async Task SendHttpRequest(ConnectionInfo info, stri
return await client.SendAsync(request);
}
+
+ static async Task SendConsumerGroupRequest(ConnectionInfo info, string eventHubPath, string consumerGroupName, bool delete, CancellationToken cancellationToken)
+ {
+ // send an http request to create or delete the eventhub
+ HttpClient client = new HttpClient();
+ var request = new HttpRequestMessage();
+ request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}/consumerGroups/{consumerGroupName}?timeout=60&api-version=2014-01");
+ request.Method = delete ? HttpMethod.Delete : HttpMethod.Put;
+ request.Content = new StringContent(@"
+
+
+
+
+
+ ",
+ Encoding.UTF8,
+ "application/xml");
+
+ request.Headers.Add("Host", info.HostName);
+
+ // add an authorization header to the request
+ await info.AuthorizeHttpRequestMessage(request, cancellationToken);
+
+ return await client.SendAsync(request);
+ }
}
}
diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
index 687a7670..1c1ae038 100644
--- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
@@ -51,6 +51,16 @@ public class NetheriteOrchestrationServiceSettings
///
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";
+ ///
+ /// If true, use a consumer group with the same name as this task hub, for all event hubs in the namespace. This allows connecting multiple task hubs to the same namespace.
+ ///
+ ///
+ /// This feature is recommended only for small message volumes and a small number of hubs - because all messages are delivered to all connected task hubs, the total
+ /// message volume grows quadratically.
+ /// All task hubs connecting to the same namespace must have different names.
+ /// Note that even though the consumer groups are automatically created for each task hub, they are not automatically deleted when the task hub is deleted.
+ public bool UseSeparateConsumerGroups { get; set; }
+
///
/// Tuning parameters for the FASTER logs
///
diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs
index 7f641cce..03b704ef 100644
--- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs
@@ -91,7 +91,7 @@ public Partition(
this.LastTransition = this.CurrentTimeMs;
}
- public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
+ public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset)
{
EventTraceContext.Clear();
@@ -130,7 +130,7 @@ public Partition(
this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired);
// goes to storage to create or restore the partition state
- inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);
+ inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint, initialOffset).ConfigureAwait(false);
// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
index 37ce09a3..0f172d7e 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs
@@ -159,8 +159,9 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync()
this.traceHelper.TraceProgress("Created new taskhub");
// zap the partition hub so we start from zero queue positions
- if (this.settings.TransportChoice == TransportChoices.EventHubs)
+ if (this.settings.TransportChoice == TransportChoices.EventHubs && !this.settings.UseSeparateConsumerGroups)
{
+ this.traceHelper.TraceProgress("Deleting partition event hub, to ensure all partitions start at sequence number zero");
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, CancellationToken.None);
}
}
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
index 18dddf85..edbb7511 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs
@@ -72,7 +72,7 @@ async Task TerminationWrapper(Task what)
await what;
}
- public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint)
+ public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset)
{
this.partition = partition;
this.terminationToken = errorHandler.Token;
@@ -146,7 +146,7 @@ async Task TerminationWrapper(Task what)
this.TraceHelper.FasterProgress("Creating store");
// this is a fresh partition
- await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint));
+ await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint, initialOffset));
await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("initial checkpoint").AsTask());
this.TraceHelper.FasterStoreCreated(this.storeWorker.InputQueuePosition, stopwatch.ElapsedMilliseconds);
diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
index 64e0231e..895e80d8 100644
--- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
@@ -87,12 +87,12 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel
this.effectTracker = new TrackedObjectStoreEffectTracker(this.partition, this, store);
}
- public async Task Initialize(long initialCommitLogPosition, string fingerprint)
+ public async Task Initialize(long initialCommitLogPosition, string fingerprint, long initialOffset)
{
this.partition.ErrorHandler.Token.ThrowIfCancellationRequested();
this.InputQueueFingerprint = fingerprint;
- this.InputQueuePosition = (0,0);
+ this.InputQueuePosition = (initialOffset, 0);
this.CommitLogPosition = initialCommitLogPosition;
this.store.InitMainSession();
diff --git a/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs b/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs
index 8e3ea367..c824b5c3 100644
--- a/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs
+++ b/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs
@@ -52,7 +52,7 @@ public void SubmitEvents(IList entries)
base.SubmitBatch(entries);
}
- public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint)
+ public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint, long initialOffset)
{
await Task.Yield();
this.partition = partition;
@@ -68,6 +68,11 @@ public void SubmitEvents(IList entries)
}
}
+ if (initialOffset != 0)
+ {
+ throw new NetheriteConfigurationException("memory storage cannot be started with a non-zero initial offset");
+ }
+
this.commitPosition = 1;
this.inputQueuePosition = (0,0);
return this.inputQueuePosition;
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs
index 84acf8f6..256a005e 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs
@@ -28,6 +28,7 @@ class BlobBatchReceiver where TEvent : Event
readonly BlobContainerClient containerClient;
readonly bool keepUntilConfirmed;
readonly bool isClientReceiver;
+ readonly bool useSeparateConsumerGroups;
// Event Hubs discards messages after 24h, so we can throw away batches that are older than that
readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1);
@@ -45,6 +46,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
this.keepUntilConfirmed = keepUntilConfirmed;
this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null;
this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent);
+ this.useSeparateConsumerGroups = settings.UseSeparateConsumerGroups;
}
public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync(
@@ -195,6 +197,11 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
// Ignored packets are very common for clients because multiple clients may share the same partition. We log this only for debug purposes.
this.traceHelper.LogDebug("{context} ignored {count} packets for different client", this.traceContext, ignoredPacketCount);
}
+ else if (this.useSeparateConsumerGroups)
+ {
+ // Ignored packets are common when using multiple task hubs with separate consumer groups. We log this only for debug purposes.
+ this.traceHelper.LogDebug("{context} ignored {count} packets for different taskhub", this.traceContext, ignoredPacketCount);
+ }
else
{
// Ignored packets may indicate misconfiguration (multiple taskhubs using same EH namespace). We create a visible warning.
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs
index 32598e5d..0b89678f 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs
@@ -20,6 +20,7 @@ class EventHubsConnections
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;
+ readonly string consumerGroup;
EventHubClient partitionClient;
List clientClients;
@@ -46,6 +47,7 @@ public EventHubsConnections(
string partitionHub,
string[] clientHubs,
string loadMonitorHub,
+ string consumerGroup,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
@@ -53,8 +55,11 @@ public EventHubsConnections(
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
+ this.consumerGroup = consumerGroup;
}
+ const string defaultConsumerGroup = "$Default";
+
public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";
public async Task StartAsync(TaskhubParameters parameters)
@@ -63,6 +68,11 @@ await Task.WhenAll(
this.EnsurePartitionsAsync(parameters.PartitionCount),
this.EnsureClientsAsync(),
this.EnsureLoadMonitorAsync());
+
+ if (this.consumerGroup != defaultConsumerGroup)
+ {
+ await this.EnsureConsumerGroupsExistAsync();
+ }
}
public Task StopAsync()
@@ -121,6 +131,31 @@ async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount)
}
}
+ public Task EnsureConsumerGroupsExistAsync()
+ {
+ return Task.WhenAll(
+ EnsureExistsAsync(this.partitionHub),
+ EnsureExistsAsync(this.loadMonitorHub),
+ Task.WhenAll(this.clientHubs.Select(clientHub => EnsureExistsAsync(clientHub)).ToList())
+ );
+
+ async Task EnsureExistsAsync(string eventHubName)
+ {
+ this.TraceHelper.LogDebug("Creating ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup);
+ bool success = await EventHubsUtil.EnsureConsumerGroupExistsAsync(this.connectionInfo, eventHubName, this.consumerGroup, CancellationToken.None);
+ if (success)
+ {
+ this.TraceHelper.LogInformation("Created ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
+ }
+ else
+ {
+ this.TraceHelper.LogDebug("Conflict on ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
+ await Task.Delay(TimeSpan.FromSeconds(5));
+ }
+ }
+ }
+
+
internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None);
@@ -225,6 +260,23 @@ async Task EnsureLoadMonitorAsync(int retries = EventHubCreationRetries)
await this.EnsureLoadMonitorAsync(retries - 1);
}
+ public async Task> GetStartingSequenceNumbers()
+ {
+ Task[] tasks = new Task[this.partitionPartitions.Count];
+ for (int i = 0; i < this.partitionPartitions.Count; i++)
+ {
+ tasks[i] = GetLastEnqueuedSequenceNumber(i);
+ }
+ await Task.WhenAll(tasks);
+ return tasks.Select(t => t.Result).ToList();
+
+ async Task GetLastEnqueuedSequenceNumber(int i)
+ {
+ var info = await this.partitionPartitions[i].client.GetPartitionRuntimeInformationAsync(this.partitionPartitions[i].id);
+ return info.LastEnqueuedSequenceNumber + 1;
+ }
+ }
+
public static async Task> GetQueuePositionsAsync(ConnectionInfo connectionInfo, string partitionHub)
{
var client = connectionInfo.CreateEventHubClient(partitionHub);
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
index 78acf837..464caf0f 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs
@@ -221,7 +221,7 @@ async Task StartPartitionAsync(PartitionIncarnation prior
// start this partition (which may include waiting for the lease to become available)
c.Partition = this.host.AddPartition(this.partitionId, this.sender);
- c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint);
+ c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint, this.eventHubsTransport.GetInitialOffset((int)this.partitionId));
this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} started partition (incarnation {incarnation}), next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, c.Incarnation, c.NextPacketToReceive);
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
index ad2ac872..3a723ca3 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs
@@ -32,7 +32,9 @@ class EventHubsTransport :
readonly ILogger logger;
readonly EventHubsTraceHelper traceHelper;
readonly IStorageLayer storage;
- readonly string shortClientId;
+ readonly string consumerGroup;
+
+ string shortClientId;
EventProcessorHost eventProcessorHost;
EventProcessorHost loadMonitorHost;
@@ -53,11 +55,13 @@ class EventHubsTransport :
CloudBlockBlob partitionScript;
ScriptedEventProcessorHost scriptedEventProcessorHost;
+ Offsets offsets;
+
int shutdownTriggered;
public Guid ClientId { get; private set; }
- public string Fingerprint => this.connections.Fingerprint;
+ public string Fingerprint { get; private set; }
public bool FatalExceptionObserved { get; private set; }
@@ -74,6 +78,7 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio
string namespaceName = settings.EventHubsConnection.ResourceName;
this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory);
this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName);
+ this.consumerGroup = settings.UseSeparateConsumerGroups ? settings.HubName : "$Default";
this.ClientId = Guid.NewGuid();
this.shortClientId = Client.GetShortId(this.ClientId);
}
@@ -82,9 +87,6 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio
public static string PartitionHub = "partitions";
public static string[] ClientHubs = { "clients0", "clients1", "clients2", "clients3" };
public static string LoadMonitorHub = "loadmonitor";
- public static string PartitionConsumerGroup = "$Default";
- public static string ClientConsumerGroup = "$Default";
- public static string LoadMonitorConsumerGroup = "$Default";
async Task ITransportLayer.StartAsync()
{
@@ -110,34 +112,146 @@ async Task ITransportLayer.StartAsync()
// check that the storage format is supported, and load the relevant FASTER tuning parameters
BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning);
- this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token)
+ this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.consumerGroup, this.shutdownSource.Token)
{
- Host = host,
+ Host = this.host,
TraceHelper = this.traceHelper,
};
await this.connections.StartAsync(this.parameters);
+ if (this.settings.UseSeparateConsumerGroups)
+ {
+ this.traceHelper.LogInformation($"Determining initial partition offsets for consumer group '{this.consumerGroup}'");
+ string formattedFingerprint = this.connections.CreationTimestamp.ToString("o").Replace("/", "-");
+ var offsetsBlob = this.cloudBlobContainer.GetBlockBlobReference($"{this.pathPrefix}offsets/{formattedFingerprint}");
+
+ try
+ {
+ var jsonText = await offsetsBlob.DownloadTextAsync(this.shutdownSource.Token);
+ this.offsets = JsonConvert.DeserializeObject(jsonText);
+ this.traceHelper.LogInformation($"Loaded initial partition offsets [{string.Join(", ", this.offsets.InitialOffsets)}]");
+ }
+ catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.NotFound)
+ {
+ try
+ {
+ this.traceHelper.LogDebug("Creating offsets");
+ this.offsets = new Offsets()
+ {
+ Guid = Guid.NewGuid(),
+ InitialOffsets = await this.connections.GetStartingSequenceNumbers(),
+ };
+ var jsonText = JsonConvert.SerializeObject(this.offsets, Formatting.Indented, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None });
+ var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*");
+ await offsetsBlob.UploadTextAsync(jsonText, null, noOverwrite, null, null);
+ this.traceHelper.LogInformation($"Created initial partition offsets [{string.Join(", ", this.offsets.InitialOffsets)}]");
+ }
+ catch (StorageException) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.Conflict)
+ {
+ this.traceHelper.LogDebug("Lost creation race, reloading");
+ var jsonText = await offsetsBlob.DownloadTextAsync(this.shutdownSource.Token);
+ this.offsets = JsonConvert.DeserializeObject(jsonText);
+ this.traceHelper.LogInformation($"Loaded initial partition offsets on second attempt [{string.Join(", ", this.offsets.InitialOffsets)}]");
+ }
+ }
+
+ this.Fingerprint = $"{this.connections.Fingerprint}-{this.offsets.Guid}";
+ }
+ else
+ {
+ this.Fingerprint = this.connections.Fingerprint;
+ }
+
+ this.traceHelper.LogInformation($"EventHubs transport connected to partition event hubs with fingerprint {this.Fingerprint}");
+
return this.parameters;
}
- async Task ITransportLayer.StartClientAsync()
+ class Offsets
{
- this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
+ public Guid Guid { get; set; }
+
+ public List InitialOffsets { get; set; }
+ }
+
+ internal long GetInitialOffset(int partitionId)
+ {
+ if (this.offsets != null)
+ {
+ return this.offsets.InitialOffsets[partitionId];
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ async Task ITransportLayer.StartClientAsync()
+ {
var channel = Channel.CreateBounded(new BoundedChannelOptions(500)
{
SingleReader = true,
AllowSynchronousContinuations = true
});
- var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup);
+ PartitionReceiver[] clientReceivers = null;
+
+ int attempt = 0;
+ int maxAttempts = 8;
+
+ while (attempt++ < maxAttempts)
+ {
+ this.ClientId = Guid.NewGuid();
+ this.shortClientId = Client.GetShortId(this.ClientId);
+
+ clientReceivers = this.connections.CreateClientReceivers(this.ClientId, this.consumerGroup);
+
+ try
+ {
+ this.clientConnectionsEstablished = Enumerable
+ .Range(0, EventHubsConnections.NumClientChannels)
+ .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
+ .ToArray();
+
+ // we must wait for the client receive connections to be established before continuing
+ // otherwise, we may miss messages that are sent before the client receiver establishes the receive position
+ await Task.WhenAll(this.clientConnectionsEstablished);
+
+ break; // was successful, so we exit retry loop
+ }
+ catch (Microsoft.Azure.EventHubs.QuotaExceededException) when (attempt < maxAttempts)
+ {
+ this.traceHelper.LogWarning("EventHubsTransport encountered quota-exceeded exception");
+ }
+ catch (Exception exception) when (attempt < maxAttempts)
+ {
+ this.traceHelper.LogInformation("EventHubsTransport failed with exception {exception}", exception);
+ }
+
+ TimeSpan retryDelay = TimeSpan.FromSeconds(1 + attempt * 10);
+ this.traceHelper.LogDebug("EventHubsTransport retrying client connection in {retryDelay}", retryDelay);
+ Task retryDelayTask = Task.Delay(retryDelay);
+
+ foreach (var clientReceiver in clientReceivers)
+ {
+ try
+ {
+ await clientReceiver.CloseAsync();
+ }
+
+ catch (Exception exception)
+ {
+ this.traceHelper.LogWarning("EventHubsTransport failed to close partition receiver {clientReceiver} during retry: {exception}", clientReceiver, exception);
+ }
+ }
+
+ await retryDelayTask;
+ }
+
+ this.traceHelper.LogInformation("EventHubsTransport sucessfully established client connection with {fingerPrint} via {consumerGroup}", this.Fingerprint, this.consumerGroup);
- this.clientConnectionsEstablished = Enumerable
- .Range(0, EventHubsConnections.NumClientChannels)
- .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i]))
- .ToArray();
-
this.clientReceiveLoops = Enumerable
.Range(0, EventHubsConnections.NumClientChannels)
.Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer))
@@ -145,9 +259,7 @@ async Task ITransportLayer.StartClientAsync()
this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader);
- // we must wait for the client receive connections to be established before continuing
- // otherwise, we may miss messages that are sent before the client receiver establishes the receive position
- await Task.WhenAll(this.clientConnectionsEstablished);
+ this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this);
}
async Task ITransportLayer.StartWorkersAsync()
@@ -174,7 +286,7 @@ async Task StartPartitionHost()
this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
EventHubsTransport.PartitionHub,
- EventHubsTransport.PartitionConsumerGroup,
+ this.consumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}");
@@ -196,7 +308,7 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync(
this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host");
this.scriptedEventProcessorHost = new ScriptedEventProcessorHost(
EventHubsTransport.PartitionHub,
- EventHubsTransport.PartitionConsumerGroup,
+ this.consumerGroup,
this.settings.EventHubsConnection,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
@@ -220,7 +332,7 @@ async Task StartLoadMonitorHost()
this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync(
Guid.NewGuid().ToString(),
LoadMonitorHub,
- LoadMonitorConsumerGroup,
+ this.consumerGroup,
this.settings.BlobStorageConnection,
this.cloudBlobContainer.Name,
$"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}");
diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs
index 42d92b00..80fff3be 100644
--- a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs
+++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs
@@ -142,7 +142,7 @@ async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumera
}
finally
{
- this.traceHelper.LogInformation("LoadMonitor exits receive loop");
+ this.traceHelper.LogTrace("LoadMonitor finished processing packets");
}
}
diff --git a/test/PerformanceTests/host.json b/test/PerformanceTests/host.json
index dc2cb594..f6f019b8 100644
--- a/test/PerformanceTests/host.json
+++ b/test/PerformanceTests/host.json
@@ -74,6 +74,10 @@
// set this to control the max size of the orchestration instance cache
// "InstanceCacheSizeMB": "50",
+ // the consumer group to use. Two task hubs can use the same event hubs namespace
+ // if their consumer groups are different.
+ "EventHubsConsumerGroup": "$Default",
+
// set this to true to use the PSF support in Faster. Will soon be obsolete.
"UsePSFQueries": "false",