Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support specification of a consumer group #283

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ interface IPartitionState
/// <param name="localPartition">The partition.</param>
/// <param name="errorHandler">An error handler to initiate and/or indicate termination of this partition.</param>
/// <param name="inputQueueFingerprint">A fingerprint for the input queue.</param>
/// <param name="initialOffset">Initial offset for the input queue, if this partition is being created.</param>
/// <returns>the input queue position from which to resume input processing</returns>
/// <exception cref="OperationCanceledException">Indicates that termination was signaled before the operation completed.</exception>
Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset);

/// <summary>
/// Starts processing, after creating or restoring the partition state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ public interface IPartition
/// </summary>
/// <param name="termination">A termination object for initiating and/or detecting termination of the partition.</param>
/// <param name="inputQueueFingerprint">Fingerprint for the intput queue.</param>
/// <param name="initialOffset">Initial offset for the input queue, if this partition is being created.</param>
/// <returns>The input queue position of the next message to receive.</returns>
/// <remarks>
/// The termination token source can be used for immediately terminating the partition.
/// Also, it can be used to detect that the partition has terminated for any other reason,
/// be it cleanly (after StopAsync) or uncleanly (after losing a lease or hitting a fatal error).
/// </remarks>
Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint);
Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset = 0);

/// <summary>
/// Clean shutdown: stop processing, save partition state to storage, and release ownership.
Expand Down
45 changes: 41 additions & 4 deletions src/DurableTask.Netherite/Connections/EventHubsUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static class EventHubsUtil
/// <returns>true if the event hub was created.</returns>
public static async Task<bool> EnsureEventHubExistsAsync(ConnectionInfo info, string eventHubName, int partitionCount, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(info, eventHubName, partitionCount, cancellationToken);
var response = await SendEventHubRequest(info, eventHubName, partitionCount, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
Expand All @@ -42,20 +42,32 @@ public static async Task<bool> EnsureEventHubExistsAsync(ConnectionInfo info, st
/// <returns>true if the event hub was deleted.</returns>
public static async Task<bool> DeleteEventHubIfExistsAsync(ConnectionInfo info, string eventHubName, CancellationToken cancellationToken)
{
var response = await SendHttpRequest(info, eventHubName, null, cancellationToken);
var response = await SendEventHubRequest(info, eventHubName, null, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.NotFound)
{
response.EnsureSuccessStatusCode();
}
return response.StatusCode == System.Net.HttpStatusCode.OK;
}

static async Task<HttpResponseMessage> SendHttpRequest(ConnectionInfo info, string eventHubName, int? partitionCount, CancellationToken cancellationToken)
public static async Task<bool> EnsureConsumerGroupExistsAsync(ConnectionInfo info, string eventHubName, string consumerGroup, CancellationToken cancellationToken)
{
var response = await SendConsumerGroupRequest(info, eventHubName, consumerGroup, delete: false, cancellationToken);
if (response.StatusCode != System.Net.HttpStatusCode.Conflict)
{
response.EnsureSuccessStatusCode();
}
return response.StatusCode == System.Net.HttpStatusCode.Created;
}

// for documentation of these APIs, see https://learn.microsoft.com/en-us/rest/api/eventhub/event-hubs-management-rest

static async Task<HttpResponseMessage> SendEventHubRequest(ConnectionInfo info, string eventHubPath, int? partitionCount, CancellationToken cancellationToken)
{
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubName}?timeout=60&api-version=2014-01");
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}?timeout=60&api-version=2014-01");
request.Method = partitionCount.HasValue ? HttpMethod.Put : HttpMethod.Delete;
if (partitionCount.HasValue)
{
Expand All @@ -78,5 +90,30 @@ static async Task<HttpResponseMessage> SendHttpRequest(ConnectionInfo info, stri

return await client.SendAsync(request);
}

static async Task<HttpResponseMessage> SendConsumerGroupRequest(ConnectionInfo info, string eventHubPath, string consumerGroupName, bool delete, CancellationToken cancellationToken)
{
// send an http request to create or delete the eventhub
HttpClient client = new HttpClient();
var request = new HttpRequestMessage();
request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}/consumerGroups/{consumerGroupName}?timeout=60&api-version=2014-01");
request.Method = delete ? HttpMethod.Delete : HttpMethod.Put;
request.Content = new StringContent(@"
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<ConsumerGroupDescription xmlns:i='http://www.w3.org/2001/XMLSchema-instance' xmlns='http://schemas.microsoft.com/netservices/2010/10/servicebus/connect'>
</ConsumerGroupDescription>
</content>
</entry>",
Encoding.UTF8,
"application/xml");

request.Headers.Add("Host", info.HostName);

// add an authorization header to the request
await info.AuthorizeHttpRequestMessage(request, cancellationToken);

return await client.SendAsync(request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public class NetheriteOrchestrationServiceSettings
/// </summary>
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";

/// <summary>
/// If true, use a consumer group with the same name as this task hub, for all event hubs in the namespace. This allows connecting multiple task hubs to the same namespace.
/// </summary>
/// <remarks>
/// This feature is recommended only for small message volumes and a small number of hubs - because all messages are delivered to all connected task hubs, the total
/// message volume grows quadratically.
/// All task hubs connecting to the same namespace must have different names.
/// Note that even though the consumer groups are automatically created for each task hub, they are not automatically deleted when the task hub is deleted.</remarks>
public bool UseSeparateConsumerGroups { get; set; }

/// <summary>
/// Tuning parameters for the FASTER logs
/// </summary>
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Partition(
this.LastTransition = this.CurrentTimeMs;
}

public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint)
public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset)
{
EventTraceContext.Clear();

Expand Down Expand Up @@ -130,7 +130,7 @@ public Partition(
this.PendingTimers = new BatchTimer<PartitionEvent>(this.ErrorHandler.Token, this.TimersFired);

// goes to storage to create or restore the partition state
inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false);
inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint, initialOffset).ConfigureAwait(false);

// start processing the timers
this.PendingTimers.Start($"Timer{this.PartitionId:D2}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ async Task<bool> IStorageLayer.CreateTaskhubIfNotExistsAsync()
this.traceHelper.TraceProgress("Created new taskhub");

// zap the partition hub so we start from zero queue positions
if (this.settings.TransportChoice == TransportChoices.EventHubs)
if (this.settings.TransportChoice == TransportChoices.EventHubs && !this.settings.UseSeparateConsumerGroups)
{
this.traceHelper.TraceProgress("Deleting partition event hub, to ensure all partitions start at sequence number zero");
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, CancellationToken.None);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async Task TerminationWrapper(Task what)
await what;
}

public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint)
public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset)
{
this.partition = partition;
this.terminationToken = errorHandler.Token;
Expand Down Expand Up @@ -146,7 +146,7 @@ async Task TerminationWrapper(Task what)
this.TraceHelper.FasterProgress("Creating store");

// this is a fresh partition
await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint));
await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint, initialOffset));

await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("initial checkpoint").AsTask());
this.TraceHelper.FasterStoreCreated(this.storeWorker.InputQueuePosition, stopwatch.ElapsedMilliseconds);
Expand Down
4 changes: 2 additions & 2 deletions src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel
this.effectTracker = new TrackedObjectStoreEffectTracker(this.partition, this, store);
}

public async Task Initialize(long initialCommitLogPosition, string fingerprint)
public async Task Initialize(long initialCommitLogPosition, string fingerprint, long initialOffset)
{
this.partition.ErrorHandler.Token.ThrowIfCancellationRequested();

this.InputQueueFingerprint = fingerprint;
this.InputQueuePosition = (0,0);
this.InputQueuePosition = (initialOffset, 0);
this.CommitLogPosition = initialCommitLogPosition;

this.store.InitMainSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void SubmitEvents(IList<PartitionEvent> entries)
base.SubmitBatch(entries);
}

public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint)
public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint, long initialOffset)
{
await Task.Yield();
this.partition = partition;
Expand All @@ -68,6 +68,11 @@ public void SubmitEvents(IList<PartitionEvent> entries)
}
}

if (initialOffset != 0)
{
throw new NetheriteConfigurationException("memory storage cannot be started with a non-zero initial offset");
}

this.commitPosition = 1;
this.inputQueuePosition = (0,0);
return this.inputQueuePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class BlobBatchReceiver<TEvent> where TEvent : Event
readonly BlobContainerClient containerClient;
readonly bool keepUntilConfirmed;
readonly bool isClientReceiver;
readonly bool useSeparateConsumerGroups;

// Event Hubs discards messages after 24h, so we can throw away batches that are older than that
readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1);
Expand All @@ -45,6 +46,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
this.keepUntilConfirmed = keepUntilConfirmed;
this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null;
this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent);
this.useSeparateConsumerGroups = settings.UseSeparateConsumerGroups;
}

public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync(
Expand Down Expand Up @@ -195,6 +197,11 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,
// Ignored packets are very common for clients because multiple clients may share the same partition. We log this only for debug purposes.
this.traceHelper.LogDebug("{context} ignored {count} packets for different client", this.traceContext, ignoredPacketCount);
}
else if (this.useSeparateConsumerGroups)
{
// Ignored packets are common when using multiple task hubs with separate consumer groups. We log this only for debug purposes.
this.traceHelper.LogDebug("{context} ignored {count} packets for different taskhub", this.traceContext, ignoredPacketCount);
}
else
{
// Ignored packets may indicate misconfiguration (multiple taskhubs using same EH namespace). We create a visible warning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class EventHubsConnections
readonly string partitionHub;
readonly string loadMonitorHub;
readonly CancellationToken shutdownToken;
readonly string consumerGroup;

EventHubClient partitionClient;
List<EventHubClient> clientClients;
Expand All @@ -46,15 +47,19 @@ public EventHubsConnections(
string partitionHub,
string[] clientHubs,
string loadMonitorHub,
string consumerGroup,
CancellationToken shutdownToken)
{
this.connectionInfo = connectionInfo;
this.partitionHub = partitionHub;
this.clientHubs = clientHubs;
this.loadMonitorHub = loadMonitorHub;
this.shutdownToken = shutdownToken;
this.consumerGroup = consumerGroup;
}

const string defaultConsumerGroup = "$Default";

public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}";

public async Task StartAsync(TaskhubParameters parameters)
Expand All @@ -63,6 +68,11 @@ await Task.WhenAll(
this.EnsurePartitionsAsync(parameters.PartitionCount),
this.EnsureClientsAsync(),
this.EnsureLoadMonitorAsync());

if (this.consumerGroup != defaultConsumerGroup)
{
await this.EnsureConsumerGroupsExistAsync();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider doing the Consumer Group Creation in the OrchestrationServiceCreateAsync call? since its a resource being created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would not work beause the event hub itself is dynamically created (and sometimes deleted and recreated for certain recovery scenarios).

}
}

public Task StopAsync()
Expand Down Expand Up @@ -121,6 +131,31 @@ async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount)
}
}

public Task EnsureConsumerGroupsExistAsync()
{
return Task.WhenAll(
EnsureExistsAsync(this.partitionHub),
EnsureExistsAsync(this.loadMonitorHub),
Task.WhenAll(this.clientHubs.Select(clientHub => EnsureExistsAsync(clientHub)).ToList())
);

async Task EnsureExistsAsync(string eventHubName)
{
this.TraceHelper.LogDebug("Creating ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup);
bool success = await EventHubsUtil.EnsureConsumerGroupExistsAsync(this.connectionInfo, eventHubName, this.consumerGroup, CancellationToken.None);
if (success)
{
this.TraceHelper.LogInformation("Created ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
}
else
{
this.TraceHelper.LogDebug("Conflict on ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None);
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
}


internal async Task DeletePartitions()
{
await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None);
Expand Down Expand Up @@ -225,6 +260,23 @@ async Task EnsureLoadMonitorAsync(int retries = EventHubCreationRetries)
await this.EnsureLoadMonitorAsync(retries - 1);
}

public async Task<List<long>> GetStartingSequenceNumbers()
{
Task<long>[] tasks = new Task<long>[this.partitionPartitions.Count];
for (int i = 0; i < this.partitionPartitions.Count; i++)
{
tasks[i] = GetLastEnqueuedSequenceNumber(i);
}
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).ToList();

async Task<long> GetLastEnqueuedSequenceNumber(int i)
{
var info = await this.partitionPartitions[i].client.GetPartitionRuntimeInformationAsync(this.partitionPartitions[i].id);
return info.LastEnqueuedSequenceNumber + 1;
}
}

public static async Task<List<long>> GetQueuePositionsAsync(ConnectionInfo connectionInfo, string partitionHub)
{
var client = connectionInfo.CreateEventHubClient(partitionHub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async Task<PartitionIncarnation> StartPartitionAsync(PartitionIncarnation prior

// start this partition (which may include waiting for the lease to become available)
c.Partition = this.host.AddPartition(this.partitionId, this.sender);
c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint);
c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint, this.eventHubsTransport.GetInitialOffset((int)this.partitionId));

this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} started partition (incarnation {incarnation}), next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, c.Incarnation, c.NextPacketToReceive);

Expand Down
Loading