Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt committed Nov 17, 2021
2 parents 0390605 + 6207c0c commit 55d8b3f
Show file tree
Hide file tree
Showing 147 changed files with 5,378 additions and 981 deletions.
7 changes: 7 additions & 0 deletions DurableTask.Netherite.sln
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{AB95
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScaleMonitor", "test\ScaleMonitor\ScaleMonitor.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LoadGeneratorApp", "test\LoadGeneratorApp\LoadGeneratorApp.csproj", "{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -66,6 +68,10 @@ Global
{2F4D331C-62E4-47E8-852E-163166944DF8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2F4D331C-62E4-47E8-852E-163166944DF8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2F4D331C-62E4-47E8-852E-163166944DF8}.Release|Any CPU.Build.0 = Release|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -78,6 +84,7 @@ Global
{DD1E1B3F-4FA2-4F3A-9AE1-6B2A0B864AAF} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{654DA6B4-2E2F-4386-BB9F-7CE5A13998DE} = {AB958467-9236-402E-833C-B8DE4841AB9F}
{2F4D331C-62E4-47E8-852E-163166944DF8} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
{78B360B8-3A41-4DC0-A300-94A0FABC1FB0} = {4A7226CF-57BF-4CA3-A4AC-91A398A1D84B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {238A9613-5411-41CF-BDEC-168CCD5C03FB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>0</MajorVersion>
<MinorVersion>5</MinorVersion>
<MinorVersion>6</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>alpha</VersionSuffix>
Expand All @@ -38,8 +38,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.5.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.6" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.6.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.6.0" />
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class NetheriteProvider : DurabilityProvider
public NetheriteProvider(
NetheriteOrchestrationService service,
NetheriteOrchestrationServiceSettings settings)
: base("Netherite", service, service, settings.ResolvedStorageConnectionString)
: base("Netherite", service, service, settings.StorageConnectionName)
{
this.Service = service;
this.Settings = settings;
Expand Down Expand Up @@ -233,10 +233,6 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
break;
}

this.scalingMonitor.Logger.LogInformation(
"Netherite autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}",
scaleStatus.Vote.ToString(), workerCount, recommendation.Reason);

return scaleStatus;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace DurableTask.Netherite.AzureFunctions
using System.Threading;
using DurableTask.Core;
using DurableTask.Netherite;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.Logging;
Expand All @@ -20,10 +21,12 @@ public class NetheriteProviderFactory : IDurabilityProviderFactory
readonly ConcurrentDictionary<DurableClientAttribute, NetheriteProvider> cachedProviders
= new ConcurrentDictionary<DurableClientAttribute, NetheriteProvider>();

readonly DurableTaskOptions extensionOptions;
readonly IConnectionStringResolver connectionStringResolver;
readonly DurableTaskOptions options;
readonly INameResolver nameResolver;
readonly IHostIdProvider hostIdProvider;

readonly bool inConsumption;

// the following are boolean options that can be specified in host.json,
// but are not passed on to the backend
public bool TraceToConsole { get; }
Expand All @@ -42,14 +45,20 @@ public NetheriteProviderFactory(
IOptions<DurableTaskOptions> extensionOptions,
ILoggerFactory loggerFactory,
IConnectionStringResolver connectionStringResolver,
IHostIdProvider hostIdProvider)
IHostIdProvider hostIdProvider,
INameResolver nameResolver,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformation platformInfo)
#pragma warning restore CS0612 // Type or member is obsolete
{
this.extensionOptions = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions));
this.options = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.connectionStringResolver = connectionStringResolver ?? throw new ArgumentNullException(nameof(connectionStringResolver));
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));

this.hostIdProvider = hostIdProvider;
this.inConsumption = platformInfo.IsInConsumptionPlan();

bool ReadBooleanSetting(string name) => this.extensionOptions.StorageProvider.TryGetValue(name, out object objValue)
bool ReadBooleanSetting(string name) => this.options.StorageProvider.TryGetValue(name, out object objValue)
&& objValue is string stringValue && bool.TryParse(stringValue, out bool boolValue) && boolValue;

this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole));
Expand All @@ -63,9 +72,18 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s
// override DTFx defaults to the defaults we want to use in DF
eventSourcedSettings.ThrowExceptionOnInvalidDedupeStatus = true;

// The consumption plan has different performance characteristics so we provide
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 20 : 25 * Environment.ProcessorCount;

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;

// copy all applicable fields from both the options and the storageProvider options
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.extensionOptions), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.extensionOptions.StorageProvider), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options.StorageProvider), eventSourcedSettings);

// if worker id is specified in environment, it overrides the configured setting
string workerId = Environment.GetEnvironmentVariable("WorkerId");
Expand All @@ -78,14 +96,20 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s
eventSourcedSettings.WorkerId = workerId;
}

eventSourcedSettings.HubName = this.extensionOptions.HubName;
eventSourcedSettings.HubName = this.options.HubName;

if (taskHubNameOverride != null)
{
eventSourcedSettings.HubName = taskHubNameOverride;
}

eventSourcedSettings.Validate((name) => this.connectionStringResolver.Resolve(name));
string runtimeLanguage = this.nameResolver.Resolve("FUNCTIONS_WORKER_RUNTIME");
if (runtimeLanguage != null && !string.Equals(runtimeLanguage, "dotnet", StringComparison.OrdinalIgnoreCase))
{
eventSourcedSettings.CacheOrchestrationCursors = false; // cannot resume orchestrations in the middle
}

eventSourcedSettings.Validate((name) => this.nameResolver.Resolve(name));

if (this.TraceToConsole || this.TraceToBlob)
{
Expand All @@ -111,7 +135,7 @@ public DurabilityProvider GetDurabilityProvider()
var key = new DurableClientAttribute()
{
TaskHub = settings.HubName,
ConnectionName = settings.ResolvedStorageConnectionString,
ConnectionName = settings.StorageConnectionName,
};

this.defaultProvider = this.cachedProviders.GetOrAdd(key, _ =>
Expand All @@ -130,15 +154,15 @@ public DurabilityProvider GetDurabilityProvider(DurableClientAttribute attribute
var settings = this.GetNetheriteOrchestrationServiceSettings(attribute.TaskHub);

if (string.Equals(this.defaultProvider.Settings.HubName, settings.HubName, StringComparison.OrdinalIgnoreCase) &&
string.Equals(this.defaultProvider.Settings.ResolvedStorageConnectionString, settings.ResolvedStorageConnectionString, StringComparison.OrdinalIgnoreCase))
string.Equals(this.defaultProvider.Settings.StorageConnectionName, settings.StorageConnectionName, StringComparison.OrdinalIgnoreCase))
{
return this.defaultProvider;
}

DurableClientAttribute key = new DurableClientAttribute()
{
TaskHub = settings.HubName,
ConnectionName = settings.ResolvedStorageConnectionString,
ConnectionName = settings.StorageConnectionName,
};

return this.cachedProviders.GetOrAdd(key, _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,25 @@ interface IPartitionState
Task CleanShutdown(bool takeFinalCheckpoint);

/// <summary>
/// Queues an internal event (originating on this same partition)
/// for processing on this partition.
/// Queues a read or update event
/// for in-order processing on this partition.
/// </summary>
/// <param name="evt">The event to process.</param>
void SubmitInternalEvent(PartitionEvent evt);
void SubmitEvent(PartitionEvent evt);

/// <summary>
/// Queues external events (originating on clients or other partitions)
/// for processing on this partition.
/// Queues a list of read or update events
/// for in-order processing on this partition.
/// </summary>
/// <param name="evt">The collection of events to process.</param>
void SubmitExternalEvents(IList<PartitionEvent> evt);
void SubmitEvents(IList<PartitionEvent> evt);

/// <summary>
/// Launches a read or query event
/// for immediate processing on this partition, bypassing the queue
/// </summary>
/// <param name="evt">The event to process.</param>
void SubmitParallelEvent(PartitionEvent evt);

/// <summary>
/// Prefetches the supplied keys.
Expand Down
32 changes: 29 additions & 3 deletions src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface IHost
/// <param name="clientId">A globally unique identifier for this client</param>
/// <param name="taskHubGuid">the unique identifier of the taskhub</param>
/// <param name="batchSender">A sender that can be used by the client for sending messages</param>
/// <returns>A sender for passing messages to the transport backend</returns>
/// <returns>A handle to the created client</returns>
IClient AddClient(Guid clientId, Guid taskHubGuid, ISender batchSender);

/// <summary>
Expand All @@ -47,6 +47,14 @@ public interface IHost
/// <returns></returns>
IPartition AddPartition(uint partitionId, ISender batchSender);

/// <summary>
/// Creates a client on this host.
/// </summary>
/// <param name="taskHubGuid">the unique identifier of the taskhub</param>
/// <param name="batchSender">A sender that can be used by the load monitor for sending messages</param>
/// <returns>A handle to the created load monitor</returns>
ILoadMonitor AddLoadMonitor(Guid taskHubGuid, ISender batchSender);

/// <summary>
/// Returns an error handler object for the given partition.
/// </summary>
Expand Down Expand Up @@ -88,13 +96,13 @@ public interface IPartition
/// Queues a single event for processing on this partition.
/// </summary>
/// <param name="partitionEvent">The event to process.</param>
void SubmitInternalEvent(PartitionUpdateEvent partitionEvent);
void SubmitEvent(PartitionUpdateEvent partitionEvent);

/// <summary>
/// Queues a batch of incoming external events for processing on this partition.
/// </summary>
/// <param name="partitionEvents">The events to process.</param>
void SubmitExternalEvents(IList<PartitionEvent> partitionEvents);
void SubmitEvents(IList<PartitionEvent> partitionEvents);

/// <summary>
/// The error handler for this partition.
Expand Down Expand Up @@ -138,6 +146,24 @@ public interface IClient
void ReportTransportError(string msg, Exception e);
}

/// <summary>
/// The load monitor functionality, as seen by the transport back-end.
/// </summary>
public interface ILoadMonitor
{
/// <summary>
/// Processes a single event on this client.
/// </summary>
/// <param name="loadMonitorEvent">The event to process.</param>
void Process(LoadMonitorEvent loadMonitorEvent);

/// <summary>
/// Stop processing events and shut down.
/// </summary>
/// <returns>When the load monitor is shut down.</returns>
Task StopAsync();
}

/// <summary>
/// A sender abstraction, passed to clients and partitions, for sending messages via the transport.
/// </summary>
Expand Down
8 changes: 4 additions & 4 deletions src/DurableTask.Netherite/DurableTask.Netherite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<!-- Version settings: https://andrewlock.net/version-vs-versionsuffix-vs-packageversion-what-do-they-all-mean/ -->
<PropertyGroup>
<MajorVersion>0</MajorVersion>
<MinorVersion>5</MinorVersion>
<MinorVersion>6</MinorVersion>
<PatchVersion>0</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>alpha</VersionSuffix>
Expand All @@ -48,10 +48,10 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Azure.EventHubs.Processor" Version="4.3.2" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.3" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.8.0" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.5.6" />
<PackageReference Include="Microsoft.FASTER.Core" Version="1.9.6" />
<PackageReference Include="Microsoft.Azure.DurableTask.Core" Version="2.6.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.*" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 55d8b3f

Please sign in to comment.