Skip to content

Commit

Permalink
Fix autoscaler (#108)
Browse files Browse the repository at this point in the history
* fix bug in scaling logic

* catch and trace more errors

* overhaul the tracing and logging to use uniform prefix and include more detail
  • Loading branch information
sebastianburckhardt authored Jan 12, 2022
1 parent 9ec62a7 commit 2f4bce0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 39 deletions.
19 changes: 11 additions & 8 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public override bool TryGetScaleMonitor(
if (this.Service.TryGetScalingMonitor(out var monitor))
{
scaleMonitor = new ScaleMonitor(monitor);
monitor.InformationTracer($"ScaleMonitor Constructed Microsoft.Azure.WebJobs.Host.Scale.IScaleMonitor {scaleMonitor.Descriptor}");
return true;
}
else
Expand Down Expand Up @@ -158,6 +159,7 @@ public async Task<NetheriteScaleMetrics> GetMetricsAsync()
var cached = cachedMetrics;
if (cached != null && DateTime.UtcNow - cached.Item1 < TimeSpan.FromSeconds(1.5))
{
this.scalingMonitor.InformationTracer?.Invoke($"ScaleMonitor returned metrics cached previously, at {cached.Item2.Timestamp:o}");
return cached.Item2;
}

Expand All @@ -174,13 +176,12 @@ public async Task<NetheriteScaleMetrics> GetMetricsAsync()
this.serializer.WriteObject(stream, collectedMetrics);
metrics.Metrics = stream.ToArray();

this.scalingMonitor.Logger.LogInformation(
"Collected scale info for {partitionCount} partitions at {time:o} in {latencyMs:F2}ms.",
collectedMetrics.LoadInformation.Count, collectedMetrics.Timestamp, sw.Elapsed.TotalMilliseconds);
this.scalingMonitor.InformationTracer?.Invoke(
$"ScaleMonitor collected metrics for {collectedMetrics.LoadInformation.Count} partitions at {collectedMetrics.Timestamp:o} in {sw.Elapsed.TotalMilliseconds:F2}ms.");
}
catch (Exception e) when (!Utils.IsFatal(e))
catch (Exception e)
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetMetricsAsync() failed: {exception}", e);
this.scalingMonitor.ErrorTracer?.Invoke("ScaleMonitor failed to collect metrics", e);
}

cachedMetrics = new Tuple<DateTime, NetheriteScaleMetrics>(DateTime.UtcNow, metrics);
Expand All @@ -202,7 +203,7 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
ScaleRecommendation recommendation;
try
{
if (metrics.Length == 0)
if (metrics == null || metrics.Length == 0)
{
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "missing metrics");
}
Expand All @@ -212,12 +213,14 @@ ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
var collectedMetrics = (ScalingMonitor.Metrics) this.serializer.ReadObject(stream);
recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, collectedMetrics);
}
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetScaleStatus() failed: {exception}", e);
this.scalingMonitor.ErrorTracer?.Invoke("ScaleMonitor failed to compute scale recommendation", e);
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "unexpected error");
}

this.scalingMonitor.RecommendationTracer?.Invoke(recommendation.Action.ToString(), workerCount, recommendation.Reason);

ScaleStatus scaleStatus = new ScaleStatus();
switch (recommendation?.Action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,26 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor)
if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster
&& this.configuredTransport == TransportConnectionString.TransportChoices.EventHubs)
{
monitor = new ScalingMonitor(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedTransportConnectionString,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.Logger);
return true;
}
else
{
monitor = null;
return false;
try
{
monitor = new ScalingMonitor(
this.Settings.ResolvedStorageConnectionString,
this.Settings.ResolvedTransportConnectionString,
this.Settings.LoadInformationAzureTableName,
this.Settings.HubName,
this.TraceHelper.TraceScaleRecommendation,
this.TraceHelper.TraceProgress,
this.TraceHelper.TraceError);
return true;
}
catch (Exception e)
{
this.TraceHelper.TraceError("ScaleMonitor failure during construction", e);
}
}

monitor = null;
return false;
}


Expand Down
39 changes: 21 additions & 18 deletions src/DurableTask.Netherite/Scaling/ScalingMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ namespace DurableTask.Netherite.Scaling
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Netherite.EventHubs;
using Microsoft.Extensions.Logging;

/// <summary>
/// Monitors the performance of the Netherite backend and makes scaling decisions.
/// </summary>
public class ScalingMonitor
public class ScalingMonitor
{
readonly string storageConnectionString;
readonly string eventHubsConnectionString;
readonly string partitionLoadTableName;
readonly string taskHubName;
readonly TransportConnectionString.TransportChoices configuredTransport;
readonly Action<string, int, string> recommendationTracer;

// public logging actions to enable collection of scale-monitor-related logging within the Netherite infrastructure
public Action<string, int, string> RecommendationTracer { get; }
public Action<string> InformationTracer { get; }
public Action<string, Exception> ErrorTracer { get; }

readonly ILoadMonitorService loadMonitor;

Expand All @@ -33,11 +36,6 @@ public class ScalingMonitor
/// </summary>
public string TaskHubName => this.taskHubName;

/// <summary>
/// A logger for scaling events.
/// </summary>
public ILogger Logger { get; }

/// <summary>
/// Creates an instance of the scaling monitor, with the given parameters.
/// </summary>
Expand All @@ -46,19 +44,22 @@ public class ScalingMonitor
/// <param name="partitionLoadTableName">The name of the storage table with the partition load information.</param>
/// <param name="taskHubName">The name of the taskhub.</param>
public ScalingMonitor(
string storageConnectionString,
string eventHubsConnectionString,
string partitionLoadTableName,
string storageConnectionString,
string eventHubsConnectionString,
string partitionLoadTableName,
string taskHubName,
Action<string, int, string> recommendationTracer,
ILogger logger)
Action<string> informationTracer,
Action<string, Exception> errorTracer)
{
this.RecommendationTracer = recommendationTracer;
this.InformationTracer = informationTracer;
this.ErrorTracer = errorTracer;

this.storageConnectionString = storageConnectionString;
this.eventHubsConnectionString = eventHubsConnectionString;
this.partitionLoadTableName = partitionLoadTableName;
this.taskHubName = taskHubName;
this.recommendationTracer = recommendationTracer;
this.Logger = logger;

TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport);

Expand Down Expand Up @@ -129,8 +130,6 @@ public ScaleRecommendation GetScaleRecommendation(int workerCount, Metrics metri
{
var recommendation = DetermineRecommendation();

this.recommendationTracer?.Invoke(recommendation.Action.ToString(), workerCount, recommendation.Reason);

return recommendation;

ScaleRecommendation DetermineRecommendation()
Expand All @@ -148,12 +147,16 @@ ScaleRecommendation DetermineRecommendation()
reason: "Task hub is idle");
}

int numberOfSlowPartitions = metrics.LoadInformation.Values.Count(info => info.LatencyTrend.Length > 1 && info.LatencyTrend.Last() == PartitionLoadInfo.MediumLatency);
bool isSlowPartition(PartitionLoadInfo info)
{
char mostRecent = info.LatencyTrend.Last();
return mostRecent == PartitionLoadInfo.HighLatency || mostRecent == PartitionLoadInfo.MediumLatency;
}
int numberOfSlowPartitions = metrics.LoadInformation.Values.Count(info => isSlowPartition(info));

if (workerCount < numberOfSlowPartitions)
{
// scale up to the number of busy partitions
var partition = metrics.LoadInformation.First(kvp => kvp.Value.LatencyTrend.Last() == PartitionLoadInfo.MediumLatency);
return new ScaleRecommendation(
ScaleAction.AddWorker,
keepWorkersAlive: true,
Expand Down

0 comments on commit 2f4bce0

Please sign in to comment.