Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest event hubs client SDK #385

Merged
merged 19 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e2b305d
update to latest event hubs client SDK.
sebastianburckhardt Apr 19, 2024
49204de
minor editing and renaming, catch startup cancellation, and ignore ca…
sebastianburckhardt Apr 23, 2024
4348f77
Merge branch 'main' into pr/update-eh-sdk
sebastianburckhardt Jun 4, 2024
9a93cd4
address PR feedback
sebastianburckhardt Sep 19, 2024
037f525
Apply suggestions from code review
sebastianburckhardt Sep 19, 2024
db2f694
Merge branch 'main' into pr/update-eh-sdk
sebastianburckhardt Sep 19, 2024
75061cb
optimization: cancel the wait when shutting down
sebastianburckhardt Sep 19, 2024
b6f40db
avoid race when opening partitions that are canceled immediately
sebastianburckhardt Sep 20, 2024
2637262
add test timeouts and fix handling of instant cancellation
sebastianburckhardt Sep 20, 2024
1d67e08
Merge commit '263726265beff84ea4216e32bdd0f3ee278a73aa' into pr/updat…
sebastianburckhardt Sep 23, 2024
1360ab4
better logging of exceptions in partition shutdown
sebastianburckhardt Sep 23, 2024
cb5f474
adding more timeouts
sebastianburckhardt Sep 23, 2024
1c5b85e
suggestion from PR review
sebastianburckhardt Sep 23, 2024
022704b
add more timeout wrappers
sebastianburckhardt Sep 23, 2024
a885503
Merge branch 'pr/update-eh-sdk' of https://github.com/microsoft/durab…
sebastianburckhardt Sep 23, 2024
d57120b
remove dead code
sebastianburckhardt Sep 23, 2024
d7a18c9
undo addition of redundant, coarse timeouts for tests that already ha…
sebastianburckhardt Sep 23, 2024
44dfe7e
simplify code that seemed to be hanging in unit tests
sebastianburckhardt Sep 23, 2024
c91349b
add statements to print progress to some of the unit tests, to facili…
sebastianburckhardt Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 16 additions & 68 deletions src/DurableTask.Netherite/Connections/ConnectionInfoExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ namespace DurableTask.Netherite
using System.Security.Cryptography;
using System.Web;
using DurableTask.Netherite.EventHubsTransport;
using Microsoft.Azure.EventHubs;
using Azure.Core;
using System.Runtime.CompilerServices;
using Microsoft.Azure.EventHubs.Processor;
using Newtonsoft.Json.Serialization;
using DurableTask.Netherite.Faster;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;

/// <summary>
/// Utilities for constructing various SDK objects from a connection information.
Expand Down Expand Up @@ -106,96 +105,45 @@ public static Azure.Storage.Blobs.BlobServiceClient GetAzureStorageV12BlobServic
}
}


/// <summary>
/// Creates an Event Hub client for the given connection info.
/// Creates an Event Hub connection.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="eventHub">The event hub name.</param>
/// <returns></returns>
public static EventHubClient CreateEventHubClient(this ConnectionInfo connectionInfo, string eventHub)
public static EventHubConnection CreateEventHubConnection(this ConnectionInfo connectionInfo, string eventHub)
{
if (connectionInfo.ConnectionString != null)
{
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionInfo.ConnectionString)
{
EntityPath = eventHub
};
return EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
return new Azure.Messaging.EventHubs.EventHubConnection(connectionInfo.ConnectionString, eventHub);
}
else
{
Uri uri = new Uri($"sb://{connectionInfo.HostName}");
var tokenProvider = new EventHubsTokenProvider(connectionInfo);
return EventHubClient.CreateWithTokenProvider(uri, eventHub, tokenProvider);
return new Azure.Messaging.EventHubs.EventHubConnection(
fullyQualifiedNamespace: connectionInfo.HostName,
eventHubName: eventHub,
credential: connectionInfo.TokenCredential,
connectionOptions: null);
}
}

/// <summary>
/// Creates an event processor host for the given connection info.
/// </summary>
/// <param name="connectionInfo">The connection info.</param>
/// <param name="hostName">The host name.</param>
/// <param name="eventHubPath">The event hub name.</param>
/// <param name="consumerGroupName">The consumer group name.</param>
/// <param name="checkpointStorage">A connection info for the checkpoint storage.</param>
/// <param name="leaseContainerName">The name of the lease container.</param>
/// <param name="storageBlobPrefix">A prefix for storing the blobs.</param>
/// <param name="args">The constructor arguments.</param>
/// <returns>An event processor host.</returns>
public static async Task<EventProcessorHost> GetEventProcessorHostAsync(
this ConnectionInfo connectionInfo,
string hostName,
string eventHubPath,
string consumerGroupName,
ConnectionInfo checkpointStorage,
string leaseContainerName,
string storageBlobPrefix)
{
public static EventProcessorHost CreateEventProcessorHost(
this ConnectionInfo connectionInfo,
EventProcessorHost.ConstructorArguments args)
{
if (connectionInfo.ConnectionString != null)
{
return new EventProcessorHost(
hostName,
eventHubPath,
consumerGroupName,
connectionInfo.ConnectionString,
checkpointStorage.ConnectionString,
leaseContainerName,
storageBlobPrefix);
return new EventProcessorHost(args, connectionInfo.ConnectionString);
}
else
{
var storageAccount = await checkpointStorage.GetAzureStorageV11AccountAsync();
return new EventProcessorHost(
new Uri($"sb://{connectionInfo.HostName}"),
eventHubPath,
consumerGroupName,
(ITokenProvider) (new EventHubsTokenProvider(connectionInfo)),
storageAccount,
leaseContainerName,
storageBlobPrefix);
}
}

class EventHubsTokenProvider : Microsoft.Azure.EventHubs.ITokenProvider
{
readonly ConnectionInfo info;

public EventHubsTokenProvider(ConnectionInfo info)
{
this.info = info;
}

static TimeSpan NextRefresh(AccessToken token)
{
DateTimeOffset now = DateTimeOffset.UtcNow;
return token.ExpiresOn - now - TimeSpan.FromMinutes(1); // refresh it a bit early.
}

async Task<SecurityToken> ITokenProvider.GetTokenAsync(string appliesTo, TimeSpan timeout)
{
TokenRequestContext request = new(this.info.Scopes);
AccessToken accessToken = await this.info.TokenCredential.GetTokenAsync(request, CancellationToken.None);
return new JsonSecurityToken(accessToken.Token, appliesTo);
return new EventProcessorHost(args, connectionInfo.HostName, connectionInfo.TokenCredential);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.33.0" />
<PackageReference Include="Azure.Core" Version="1.38.0" />
<PackageReference Include="Azure.Data.Tables" Version="12.8.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.11.2" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.16.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.16.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace DurableTask.Netherite
using System.Collections.Generic;
using System.Runtime;
using System.Text;
using Microsoft.Azure.EventHubs;
using Azure.Messaging.EventHubs;

/// <summary>
/// Encapsulates how the transport connection string setting is interpreted.
Expand Down Expand Up @@ -63,8 +63,8 @@ public static string EventHubsNamespaceName(string transportConnectionString)
{
try
{
var builder = new EventHubsConnectionStringBuilder(transportConnectionString);
var host = builder.Endpoint.Host;
var properties = EventHubsConnectionStringProperties.Parse(transportConnectionString);
var host = properties.Endpoint.Host;
return host.Substring(0, host.IndexOf('.'));
}
catch(Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public AzureStorageDevice(string blobName, BlobUtilsV12.BlobDirectory blockBlobD
this.pageBlobDirectory = pageBlobDirectory;
this.blobName = blobName;
this.PartitionErrorHandler = blobManager.PartitionErrorHandler;
this.PartitionErrorHandler.Token.Register(this.CancelAllRequests);
this.BlobManager = blobManager;
this.underLease = underLease;
this.hangCheckTimer = new Timer(this.DetectHangs, null, 0, 20000);
this.singleWriterSemaphore = underLease ? new SemaphoreSlim(1) : null;
this.limit = TimeSpan.FromSeconds(90);
this.PartitionErrorHandler.Token.Register(this.CancelAllRequests);
}

/// <inheritdoc/>
Expand All @@ -106,7 +106,7 @@ public async Task StartAsync()
var prefix = $"{this.blockBlobDirectory}{this.blobName}.";

string continuationToken = null;
IEnumerable<BlobItem> pageResults = null;
List<BlobItem> pageResults = null;

do
{
Expand All @@ -124,25 +124,15 @@ await this.BlobManager.PerformWithRetriesAsync(
{
var client = this.pageBlobDirectory.Client.WithRetries;

var enumerator = client.GetBlobsAsync(
Azure.AsyncPageable<BlobItem> pageable = client.GetBlobsAsync(
prefix: prefix,
cancellationToken: this.PartitionErrorHandler.Token)
.AsPages(continuationToken, 100)
.GetAsyncEnumerator(cancellationToken: this.PartitionErrorHandler.Token);
cancellationToken: this.PartitionErrorHandler.Token);

if (await enumerator.MoveNextAsync())
{
var page = enumerator.Current;
pageResults = page.Values;
continuationToken = page.ContinuationToken;
return page.Values.Count; // not accurate, in terms of bytes, but still useful for tracing purposes
}
else
{
pageResults = Enumerable.Empty<BlobItem>();
continuationToken = null;
return 0;
};
IAsyncEnumerable<Azure.Page<BlobItem>> pages = pageable.AsPages(continuationToken, 100);
Azure.Page<BlobItem> firstPage = await pages.FirstAsync();
pageResults = firstPage.Values.ToList();
continuationToken = firstPage.ContinuationToken;
return pageResults.Count; // not accurate, in terms of bytes, but still useful for tracing purposes
});

foreach (var item in pageResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Storage;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -53,7 +52,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,

foreach (var eventData in hubMessages)
{
var seqno = eventData.SystemProperties.SequenceNumber;
var seqno = eventData.SequenceNumber;

if (nextPacketToReceive != null)
{
Expand All @@ -75,19 +74,19 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper,

try
{
Packet.Deserialize(eventData.Body, out evt, out blobReference, guid);
Packet.Deserialize(eventData.EventBody.ToStream(), out evt, out blobReference, guid);
}
catch (Exception)
{
this.traceHelper.LogError("{context} could not deserialize packet #{seqno} ({size} bytes)", this.traceContext, seqno, eventData.Body.Count);
this.traceHelper.LogError("{context} could not deserialize packet #{seqno} ({size} bytes)", this.traceContext, seqno, eventData.EventBody.ToMemory().Length);
throw;
}

if (blobReference == null)
{
if (evt == null)
{
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.Body.Count);
this.lowestTraceLevel?.LogTrace("{context} ignored packet #{seqno} ({size} bytes) because its guid does not match taskhub/client", this.traceContext, seqno, eventData.EventBody.ToMemory().Length);
ignoredPacketCount++;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ namespace DurableTask.Netherite.EventHubsTransport
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using DurableTask.Netherite.Abstractions;
using DurableTask.Netherite.Faster;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;

class BlobBatchSender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,8 @@

namespace DurableTask.Netherite.EventHubsTransport
{
using DurableTask.Core.Common;
using Microsoft.Azure.EventHubs;
using Microsoft.Extensions.Logging;
using Azure.Messaging.EventHubs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,12 +14,12 @@ class EventHubsClientSender
readonly EventHubsSender<ClientEvent>[] channels;
int roundRobin;

public EventHubsClientSender(TransportAbstraction.IHost host, Guid clientId, PartitionSender[] senders, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
public EventHubsClientSender(TransportAbstraction.IHost host, Guid clientId, (EventHubConnection connection, string partitionId)[] partitions, CancellationToken shutdownToken, EventHubsTraceHelper traceHelper, NetheriteOrchestrationServiceSettings settings)
{
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[senders.Length];
for (int i = 0; i < senders.Length; i++)
this.channels = new Netherite.EventHubsTransport.EventHubsSender<ClientEvent>[partitions.Length];
for (int i = 0; i < partitions.Length; i++)
{
this.channels[i] = new EventHubsSender<ClientEvent>(host, clientId.ToByteArray(), senders[i], shutdownToken, traceHelper, settings);
this.channels[i] = new EventHubsSender<ClientEvent>(host, clientId.ToByteArray(), partitions[i].connection, partitions[i].partitionId, shutdownToken, traceHelper, settings);
}
}

Expand All @@ -44,7 +39,7 @@ public void Submit(ClientEvent toSend)

public Task WaitForShutdownAsync()
{
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()));
return Task.WhenAll(this.channels.Select(sender => sender.WaitForShutdownAsync()).ToList());
}
}
}
Loading
Loading