Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix autoscaler #108

Merged
merged 3 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public override bool TryGetScaleMonitor(
if (this.Service.TryGetScalingMonitor(out var monitor))
{
scaleMonitor = new ScaleMonitor(monitor);
monitor.InformationTracer($"ScaleMonitor Constructed Microsoft.Azure.WebJobs.Host.Scale.IScaleMonitor {scaleMonitor.Descriptor}");
return true;
}
else
Expand Down Expand Up @@ -158,6 +159,7 @@ public async Task<NetheriteScaleMetrics> GetMetricsAsync()
var cached = cachedMetrics;
if (cached != null && DateTime.UtcNow - cached.Item1 < TimeSpan.FromSeconds(1.5))
{
this.scalingMonitor.InformationTracer?.Invoke($"ScaleMonitor returned metrics cached previously, at {cached.Item2.Timestamp:o}");
return cached.Item2;
}

Expand All @@ -174,13 +176,12 @@ public async Task<NetheriteScaleMetrics> GetMetricsAsync()
this.serializer.WriteObject(stream, collectedMetrics);
metrics.Metrics = stream.ToArray();

this.scalingMonitor.Logger.LogInformation(
"Collected scale info for {partitionCount} partitions at {time:o} in {latencyMs:F2}ms.",
collectedMetrics.LoadInformation.Count, collectedMetrics.Timestamp, sw.Elapsed.TotalMilliseconds);
this.scalingMonitor.InformationTracer?.Invoke(
$"ScaleMonitor collected metrics for {collectedMetrics.LoadInformation.Count} partitions at {collectedMetrics.Timestamp:o} in {sw.Elapsed.TotalMilliseconds:F2}ms.");
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetMetricsAsync() failed: {exception}", e);
this.scalingMonitor.ErrorTracer?.Invoke("ScaleMonitor failed to collect metrics", e);
}

cachedMetrics = new Tuple<DateTime, NetheriteScaleMetrics>(DateTime.UtcNow, metrics);
Expand All @@ -202,7 +203,7 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
ScaleRecommendation recommendation;
try
{
if (metrics.Length == 0)
if (metrics == null || metrics.Length == 0)
{
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "missing metrics");
}
Expand All @@ -212,12 +213,14 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
var collectedMetrics = (ScalingMonitor.Metrics) this.serializer.ReadObject(stream);
recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, collectedMetrics);
}
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetScaleStatus() failed: {exception}", e);
this.scalingMonitor.ErrorTracer?.Invoke("ScaleMonitor failed to compute scale recommendation", e);
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "unexpected error");
}

this.scalingMonitor.RecommendationTracer?.Invoke(recommendation.Action.ToString(), workerCount, recommendation.Reason);

ScaleStatus scaleStatus = new ScaleStatus();
switch (recommendation?.Action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,26 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor)
if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster
&& this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
{
monitor = new ScalingMonitor(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedTransportConnectionString,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.Logger);
return true;
}
else
{
monitor = null;
return false;
try
{
monitor = new ScalingMonitor(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedTransportConnectionString,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.TraceProgress,
this.TraceHelper.TraceError);
return true;
}
catch (Exception e)
{
this.TraceHelper.TraceError("ScaleMonitor failure during construction", e);
}
}

monitor = null;
return false;
}


Expand Down
39 changes: 21 additions & 18 deletions src/DurableTask.Netherite/Scaling/ScalingMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ namespace DurableTask.Netherite.Scaling
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Netherite.EventHubs;
using Microsoft.Extensions.Logging;

/// <summary>
/// Monitors the performance of the Netherite backend and makes scaling decisions.
/// </summary>
public class ScalingMonitor
public class ScalingMonitor
{
readonly string storageConnectionString;
readonly string eventHubsConnectionString;
readonly string partitionLoadTableName;
readonly string taskHubName;
readonly TransportConnectionString.TransportChoices configuredTransport;
readonly Action<string, int, string> recommendationTracer;

// public logging actions to enable collection of scale-monitor-related logging within the Netherite infrastructure
public Action<string, int, string> RecommendationTracer { get; }
public Action<string> InformationTracer { get; }
public Action<string, Exception> ErrorTracer { get; }

readonly ILoadMonitorService loadMonitor;

Expand All @@ -33,11 +36,6 @@ public class ScalingMonitor
/// </summary>
public string TaskHubName => this.taskHubName;

/// <summary>
/// A logger for scaling events.
/// </summary>
public ILogger Logger { get; }

/// <summary>
/// Creates an instance of the scaling monitor, with the given parameters.
/// </summary>
Expand All @@ -46,19 +44,22 @@ public class ScalingMonitor
/// <param name="partitionLoadTableName">The name of the storage table with the partition load information.</param>
/// <param name="taskHubName">The name of the taskhub.</param>
public ScalingMonitor(
string storageConnectionString,
string eventHubsConnectionString,
string partitionLoadTableName,
string storageConnectionString,
string eventHubsConnectionString,
string partitionLoadTableName,
string taskHubName,
Action<string, int, string> recommendationTracer,
ILogger logger)
Action<string> informationTracer,
Action<string, Exception> errorTracer)
{
this.RecommendationTracer = recommendationTracer;
this.InformationTracer = informationTracer;
this.ErrorTracer = errorTracer;

this.storageConnectionString = storageConnectionString;
this.eventHubsConnectionString = eventHubsConnectionString;
this.partitionLoadTableName = partitionLoadTableName;
this.taskHubName = taskHubName;
this.recommendationTracer = recommendationTracer;
this.Logger = logger;

TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport);

Expand Down Expand Up @@ -129,8 +130,6 @@ public ScaleRecommendation GetScaleRecommendation(int workerCount, Metrics metri
{
var recommendation = DetermineRecommendation();

this.recommendationTracer?.Invoke(recommendation.Action.ToString(), workerCount, recommendation.Reason);

return recommendation;

ScaleRecommendation DetermineRecommendation()
Expand All @@ -148,12 +147,16 @@ ScaleRecommendation DetermineRecommendation()
reason: "Task hub is idle");
}

int numberOfSlowPartitions = metrics.LoadInformation.Values.Count(info => info.LatencyTrend.Length > 1 && info.LatencyTrend.Last() == PartitionLoadInfo.MediumLatency);
bool isSlowPartition(PartitionLoadInfo info)
{
char mostRecent = info.LatencyTrend.Last();
return mostRecent == PartitionLoadInfo.HighLatency || mostRecent == PartitionLoadInfo.MediumLatency;
}
int numberOfSlowPartitions = metrics.LoadInformation.Values.Count(info => isSlowPartition(info));

if (workerCount < numberOfSlowPartitions)
{
// scale up to the number of busy partitions
var partition = metrics.LoadInformation.First(kvp => kvp.Value.LatencyTrend.Last() == PartitionLoadInfo.MediumLatency);
return new ScaleRecommendation(
ScaleAction.AddWorker,
keepWorkersAlive: true,
Expand Down