Skip to content

Commit

Permalink
Better Scheduling/Load Balancing for activities (#52)
Browse files Browse the repository at this point in the history
* Add random scheduler

* add loadmonitor component

* add scheduling options & filehash tweaks

* keep partition prefix consistent with all the other tracing

* fix bug (failed to update estimated load when remote activities complete)

* Add aggresive scheduler mode

* fixed a typo

* basic load monitor

* checkpoint

* fix bug in ETW tracing

* change conditions for load monitor reporting

* use full tracing

* trace more information for OffloadCommandReceived

* change tracing of task messages to provide more consistent results

* several package updates, including DurableTask.Core 2.5.6

* update to account for tracing changes

* remove load monitor interval and add idle message

* add latency field to ActivityCompleted and RemoteActivityResultReceived

* added offload algorithms based on waiting time

* fix overflow in filehash

* fixes to loadmonitor logic, autoscaler, and temporary info sender

* fix missing file and update scale tracing

* change loadmonitor hosting so it moves less often

* implement overlay of pending commands on offload estimation

* improve precision of load estimation and distinguish between stationary and mobile.

* remove unnecessary left over line of code.

* replace push based algo with pull based

* fix so we don't pull more than mobile

* fix dumb mistake

* fix formula

* need more conservative default estimate for completion time, otherwise offload is too aggressive

* remove unnecessary constants.

* add tracing for RTT

* tweak parameters for RTT and smoothing

* batchworker tracing for senders

* use array instead of dictionary, fix concurrent modification exception

* revise sender for load monitor events to send only latest state, and to do rate limiting

* fix tracing

* turn off all loadmonitor activity when the parameter ActivityScheduler is not set LoadMonitor

* implemented "Static" setting for ActivityScheduler

* reduce severity of tracing in loadmonitor

* make filehash scale configurable

* update names, remove old algorithms

* improve terminology and implement solicitation

* fixes and simplifications

* minor updates

* simplify data structure for local activities

* use smarter concurrency defaults, same as for default backend

* fix tracing in LoadMonitor

* update filehash a bit to make it more uniform

* add test series for comparing local vs locavore

Co-authored-by: Sebastian Burckhardt <sburckha@microsoft.com>
  • Loading branch information
Romero027 and sebastianburckhardt authored Sep 1, 2021
1 parent 9c8e0d3 commit 6b3b372
Show file tree
Hide file tree
Showing 57 changed files with 1,765 additions and 513 deletions.
4 changes: 0 additions & 4 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
break;
}

this.scalingMonitor.Logger.LogInformation(
"Netherite autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}",
scaleStatus.Vote.ToString(), workerCount, recommendation.Reason);

return scaleStatus;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ public class NetheriteProviderFactory : IDurabilityProviderFactory
readonly ConcurrentDictionary<DurableClientAttribute, NetheriteProvider> cachedProviders
= new ConcurrentDictionary<DurableClientAttribute, NetheriteProvider>();

readonly DurableTaskOptions extensionOptions;
readonly DurableTaskOptions options;
readonly IConnectionStringResolver connectionStringResolver;
readonly IHostIdProvider hostIdProvider;

readonly bool inConsumption;

// the following are boolean options that can be specified in host.json,
// but are not passed on to the backend
public bool TraceToConsole { get; }
Expand All @@ -42,14 +44,18 @@ public NetheriteProviderFactory(
IOptions<DurableTaskOptions> extensionOptions,
ILoggerFactory loggerFactory,
IConnectionStringResolver connectionStringResolver,
IHostIdProvider hostIdProvider)
IHostIdProvider hostIdProvider,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformationService platformInfo)
#pragma warning restore CS0612 // Type or member is obsolete
{
this.extensionOptions = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions));
this.options = extensionOptions?.Value ?? throw new ArgumentNullException(nameof(extensionOptions));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.connectionStringResolver = connectionStringResolver ?? throw new ArgumentNullException(nameof(connectionStringResolver));
this.hostIdProvider = hostIdProvider;
this.inConsumption = platformInfo.InConsumption();

bool ReadBooleanSetting(string name) => this.extensionOptions.StorageProvider.TryGetValue(name, out object objValue)
bool ReadBooleanSetting(string name) => this.options.StorageProvider.TryGetValue(name, out object objValue)
&& objValue is string stringValue && bool.TryParse(stringValue, out bool boolValue) && boolValue;

this.TraceToConsole = ReadBooleanSetting(nameof(this.TraceToConsole));
Expand All @@ -63,9 +69,18 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s
// override DTFx defaults to the defaults we want to use in DF
eventSourcedSettings.ThrowExceptionOnInvalidDedupeStatus = true;

// The consumption plan has different performance characteristics so we provide
// different defaults for key configuration values.
int maxConcurrentOrchestratorsDefault = this.inConsumption ? 5 : 10 * Environment.ProcessorCount;
int maxConcurrentActivitiesDefault = this.inConsumption ? 10 : 10 * Environment.ProcessorCount;

// The following defaults are only applied if the customer did not explicitely set them on `host.json`
this.options.MaxConcurrentOrchestratorFunctions = this.options.MaxConcurrentOrchestratorFunctions ?? maxConcurrentOrchestratorsDefault;
this.options.MaxConcurrentActivityFunctions = this.options.MaxConcurrentActivityFunctions ?? maxConcurrentActivitiesDefault;

// copy all applicable fields from both the options and the storageProvider options
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.extensionOptions), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.extensionOptions.StorageProvider), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options), eventSourcedSettings);
JsonConvert.PopulateObject(JsonConvert.SerializeObject(this.options.StorageProvider), eventSourcedSettings);

// if worker id is specified in environment, it overrides the configured setting
string workerId = Environment.GetEnvironmentVariable("WorkerId");
Expand All @@ -78,7 +93,7 @@ NetheriteOrchestrationServiceSettings GetNetheriteOrchestrationServiceSettings(s
eventSourcedSettings.WorkerId = workerId;
}

eventSourcedSettings.HubName = this.extensionOptions.HubName;
eventSourcedSettings.HubName = this.options.HubName;

if (taskHubNameOverride != null)
{
Expand Down
28 changes: 27 additions & 1 deletion src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface IHost
/// <param name="clientId">A globally unique identifier for this client</param>
/// <param name="taskHubGuid">the unique identifier of the taskhub</param>
/// <param name="batchSender">A sender that can be used by the client for sending messages</param>
/// <returns>A sender for passing messages to the transport backend</returns>
/// <returns>A handle to the created client</returns>
IClient AddClient(Guid clientId, Guid taskHubGuid, ISender batchSender);

/// <summary>
Expand All @@ -47,6 +47,14 @@ public interface IHost
/// <returns></returns>
IPartition AddPartition(uint partitionId, ISender batchSender);

/// <summary>
/// Creates a client on this host.
/// </summary>
/// <param name="taskHubGuid">the unique identifier of the taskhub</param>
/// <param name="batchSender">A sender that can be used by the load monitor for sending messages</param>
/// <returns>A handle to the created load monitor</returns>
ILoadMonitor AddLoadMonitor(Guid taskHubGuid, ISender batchSender);

/// <summary>
/// Returns an error handler object for the given partition.
/// </summary>
Expand Down Expand Up @@ -138,6 +146,24 @@ public interface IClient
void ReportTransportError(string msg, Exception e);
}

/// <summary>
/// The load monitor functionality, as seen by the transport back-end.
/// </summary>
public interface ILoadMonitor
{
/// <summary>
/// Processes a single event on this client.
/// </summary>
/// <param name="loadMonitorEvent">The event to process.</param>
void Process(LoadMonitorEvent loadMonitorEvent);

/// <summary>
/// Stop processing events and shut down.
/// </summary>
/// <returns>When the load monitor is shut down.</returns>
Task StopAsync();
}

/// <summary>
/// A sender abstraction, passed to clients and partitions, for sending messages via the transport.
/// </summary>
Expand Down
17 changes: 10 additions & 7 deletions src/DurableTask.Netherite/Events/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ protected virtual void ExtraTraceInformation(StringBuilder s)

public static IEnumerable<Type> KnownTypes()
{
yield return typeof(ClientEventFragment);
yield return typeof(CreationResponseReceived);
yield return typeof(DeletionResponseReceived);
yield return typeof(HistoryResponseReceived);
yield return typeof(PurgeResponseReceived);
yield return typeof(QueryResponseReceived);
yield return typeof(StateResponseReceived);
yield return typeof(WaitResponseReceived);
yield return typeof(ClientEventFragment);
yield return typeof(PartitionEventFragment);
yield return typeof(LoadInformationReceived);
yield return typeof(ClientTaskMessagesReceived);
yield return typeof(CreationRequestReceived);
yield return typeof(DeletionRequestReceived);
Expand All @@ -63,16 +65,17 @@ public static IEnumerable<Type> KnownTypes()
yield return typeof(PurgeRequestReceived);
yield return typeof(StateRequestReceived);
yield return typeof(WaitRequestReceived);
yield return typeof(ActivityCompleted);
yield return typeof(BatchProcessed);
yield return typeof(SendConfirmed);
yield return typeof(TimerFired);
yield return typeof(ActivityOffloadReceived);
yield return typeof(TransferCommandReceived);
yield return typeof(SolicitationReceived);
yield return typeof(ActivityTransferReceived);
yield return typeof(RemoteActivityResultReceived);
yield return typeof(TaskMessagesReceived);
yield return typeof(ActivityCompleted);
yield return typeof(BatchProcessed);
yield return typeof(OffloadDecision);
yield return typeof(PurgeBatchIssued);
yield return typeof(PartitionEventFragment);
yield return typeof(SendConfirmed);
yield return typeof(TimerFired);
}

public bool SafeToRetryFailedSend()
Expand Down
29 changes: 29 additions & 0 deletions src/DurableTask.Netherite/Events/EventId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public enum EventCategory
/// An event that is sent from a partition to another partition.
/// </summary>
PartitionToPartition,

/// <summary>
/// An event that is sent from a partition to the load monitor.
/// </summary>
ToLoadMonitor,

/// <summary>
/// An event that is sent from the load monitor to a partition
/// </summary>
LoadMonitorToPartition,
}

/// <summary>
Expand Down Expand Up @@ -90,6 +100,12 @@ public enum EventCategory
Category = EventCategory.ClientResponse
};

internal static EventId MakeLoadMonitorEventId(Guid RequestId) => new EventId()
{
WorkItemId = RequestId.ToString("N"),
Category = EventCategory.ToLoadMonitor
};

internal static EventId MakePartitionInternalEventId(string workItemId) => new EventId()
{
WorkItemId = workItemId,
Expand All @@ -103,6 +119,13 @@ public enum EventCategory
Category = EventCategory.PartitionToPartition
};

internal static EventId MakeLoadMonitorToPartitionEventId(Guid RequestId, uint destinationPartition) => new EventId()
{
WorkItemId = RequestId.ToString("N"),
PartitionId = destinationPartition,
Category = EventCategory.LoadMonitorToPartition
};

internal static EventId MakeSubEventId(EventId id, int fragment)
{
id.Index = fragment;
Expand All @@ -126,6 +149,12 @@ public override string ToString()
case EventCategory.PartitionToPartition:
return $"{this.WorkItemId}P{this.PartitionId:D2}{this.IndexSuffix}";

case EventCategory.ToLoadMonitor:
return $"{this.WorkItemId}{this.IndexSuffix}";

case EventCategory.LoadMonitorToPartition:
return $"{this.WorkItemId}P{this.PartitionId:D2}{this.IndexSuffix}";

default:
throw new InvalidOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;

[DataContract]
class LoadInformationReceived : LoadMonitorEvent
{
[DataMember]
public uint PartitionId { get; set; } // The partition that sent the load information

[DataMember]
public int Stationary { get; set; } // The number of queued activities that can only execute on this partition

[DataMember]
public int Mobile { get; set; } // The number of queued activities that are available for transfer

[DataMember]
public double? AverageActCompletionTime { get; set; }

[DataMember]
public DateTime[] TransfersReceived { get; set; }

public bool ConfirmsSource(TransferCommandReceived cmd)
{
uint source = cmd.PartitionId;
DateTime id = cmd.Timestamp;

return
source == this.PartitionId
&& this.TransfersReceived != null
&& this.TransfersReceived[source] >= id;
}

public bool ConfirmsDestination(TransferCommandReceived cmd)
{
uint source = cmd.PartitionId;
uint destination = cmd.TransferDestination;
DateTime id = cmd.Timestamp;

return
destination == this.PartitionId
&& this.TransfersReceived != null
&& this.TransfersReceived[source] >= id;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Runtime.Serialization;

[DataContract]
abstract class LoadMonitorEvent : Event
{
[DataMember]
public Guid RequestId { get; set; }

public override EventId EventId => EventId.MakeLoadMonitorEventId(this.RequestId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using DurableTask.Core;

[DataContract]
class SolicitationReceived : PartitionMessageEvent
{
[DataMember]
public Guid RequestId { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }

public override EventId EventId => EventId.MakeLoadMonitorToPartitionEventId(this.RequestId, this.PartitionId);

public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages => throw new NotImplementedException();

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite
{
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Text;
using DurableTask.Core;

[DataContract]
class TransferCommandReceived : PartitionMessageEvent
{
[DataMember]
public Guid RequestId { get; set; }

[DataMember]
public int NumActivitiesToSend { get; set; }

[DataMember]
public uint TransferDestination { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }

[IgnoreDataMember]
public List<(TaskMessage, string)> TransferredActivities { get; set; }

public override EventId EventId => EventId.MakeLoadMonitorToPartitionEventId(this.RequestId, this.PartitionId);

public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages => throw new NotImplementedException();

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
}

protected override void ExtraTraceInformation(StringBuilder s)
{
base.ExtraTraceInformation(s);

s.Append(this.NumActivitiesToSend);
s.Append("->Part");
s.Append(this.TransferDestination.ToString("D2"));
}
}
}
Loading

0 comments on commit 6b3b372

Please sign in to comment.