Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ScaleMonitor and support blobs #42

Merged
merged 1 commit into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DurableTask.Netherite.sln
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventProducer", "test\Event
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventConsumer", "test\EventConsumer\EventConsumer.csproj", "{1D16A6FD-944C-49A1-8727-6236861F437A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScalingTests", "ScalingTests\ScalingTests.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScaleMonitor", "test\ScaleMonitor\ScaleMonitor.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
96 changes: 56 additions & 40 deletions src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ namespace DurableTask.Netherite.AzureFunctions
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
Expand Down Expand Up @@ -124,10 +126,17 @@ public override bool TryGetScaleMonitor(
}
}

class ScaleMonitor : IScaleMonitor<ScaleMonitor.NetheriteScaleMetrics>
public class NetheriteScaleMetrics : ScaleMetrics
{
public byte[] Metrics { get; set; }
}

class ScaleMonitor : IScaleMonitor<NetheriteScaleMetrics>
{
readonly ScalingMonitor scalingMonitor;
readonly ScaleMonitorDescriptor descriptor;
readonly DataContractSerializer serializer = new DataContractSerializer(typeof(ScalingMonitor.Metrics));
static Tuple<DateTime, NetheriteScaleMetrics> cachedMetrics;

public ScaleMonitor(ScalingMonitor scalingMonitor)
{
Expand All @@ -137,10 +146,6 @@ public ScaleMonitor(ScalingMonitor scalingMonitor)

public ScaleMonitorDescriptor Descriptor => this.descriptor;

public class NetheriteScaleMetrics : ScaleMetrics
{
public ScalingMonitor.Metrics Metrics { get; set; }
}

async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
Expand All @@ -149,27 +154,37 @@ async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()

public async Task<NetheriteScaleMetrics> GetMetricsAsync()
{
// if we recently collected the metrics, return the cached result now.
var cached = cachedMetrics;
if (cached != null && DateTime.UtcNow - cached.Item1 < TimeSpan.FromSeconds(1.5))
{
return cached.Item2;
}

var metrics = new NetheriteScaleMetrics();

try
{
Stopwatch sw = new Stopwatch();
sw.Start();
var metrics = await this.scalingMonitor.CollectMetrics();
var collectedMetrics = await this.scalingMonitor.CollectMetrics();
sw.Stop();

this.scalingMonitor.Logger.LogInformation(
"Collected scale info for {partitionCount} partitions in {latencyMs:F2}ms.",
metrics.LoadInformation.Count, sw.Elapsed.TotalMilliseconds);
var stream = new MemoryStream();
this.serializer.WriteObject(stream, collectedMetrics);
metrics.Metrics = stream.ToArray();

return new NetheriteScaleMetrics()
{
Metrics = metrics
};
this.scalingMonitor.Logger.LogInformation(
"Collected scale info for {partitionCount} partitions at {time:o} in {latencyMs:F2}ms.",
collectedMetrics.LoadInformation.Count, collectedMetrics.Timestamp, sw.Elapsed.TotalMilliseconds);
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetMetricsAsync() failed: {exception}", e);
throw;
}

cachedMetrics = new Tuple<DateTime, NetheriteScaleMetrics>(DateTime.UtcNow, metrics);
return metrics;
}

ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
Expand All @@ -184,44 +199,45 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext<NetheriteScaleMetrics> cont

ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics)
{
ScaleRecommendation recommendation;
try
{
ScaleStatus scaleStatus = new ScaleStatus();
ScaleRecommendation recommendation;

{
if (metrics.Length == 0)
{
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "missing metrics");
}
else
{
recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, metrics[^1].Metrics);
}

switch (recommendation.Action)
{
case ScaleAction.AddWorker:
scaleStatus.Vote = ScaleVote.ScaleOut;
break;
case ScaleAction.RemoveWorker:
scaleStatus.Vote = ScaleVote.ScaleIn;
break;
default:
scaleStatus.Vote = ScaleVote.None;
break;
var stream = new MemoryStream(metrics[^1].Metrics);
var collectedMetrics = (ScalingMonitor.Metrics) this.serializer.ReadObject(stream);
recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, collectedMetrics);
}

this.scalingMonitor.Logger.LogInformation(
"Autoscaler recommends: {scaleRecommendation} because: {reason}",
scaleStatus.Vote.ToString(), recommendation.Reason);

return scaleStatus;
}
}
catch (Exception e) when (!Utils.IsFatal(e))
{
this.scalingMonitor.Logger.LogError("IScaleMonitor.GetScaleStatus() failed: {exception}", e);
throw;
recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "unexpected error");
}

ScaleStatus scaleStatus = new ScaleStatus();
switch (recommendation?.Action)
{
case ScaleAction.AddWorker:
scaleStatus.Vote = ScaleVote.ScaleOut;
break;
case ScaleAction.RemoveWorker:
scaleStatus.Vote = ScaleVote.ScaleIn;
break;
default:
scaleStatus.Vote = ScaleVote.None;
break;
}

this.scalingMonitor.Logger.LogInformation(
"Netherite autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}",
scaleStatus.Vote.ToString(), workerCount, recommendation.Reason);

return scaleStatus;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,16 @@ public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings setti
this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, this.StorageAccountName, this.Settings.HubName);

if (this.configuredTransport != TransportConnectionString.TransportChoices.Memory)
this.LoadMonitorService = new AzureLoadMonitorTable(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName);
{
if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName))
{
this.LoadMonitorService = new AzureTableLoadMonitor(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName);
}
else
{
this.LoadMonitorService = new AzureBlobLoadMonitor(settings.ResolvedStorageConnectionString, settings.HubName);
}
}

this.workItemStopwatch.Start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public class NetheriteOrchestrationServiceSettings
public int PartitionCount { get; set; } = 12;

/// <summary>
/// The name to use for the Azure table with the load information
/// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty,
/// then Azure blobs are used instead.
/// </summary>
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";

Expand Down
86 changes: 86 additions & 0 deletions src/DurableTask.Netherite/Scaling/AzureBlobLoadMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

namespace DurableTask.Netherite.Scaling
{
using DurableTask.Netherite.Faster;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

class AzureBlobLoadMonitor : ILoadMonitorService
{
readonly string taskHubName;
readonly CloudBlobContainer blobContainer;

int? numPartitions;

public AzureBlobLoadMonitor(string connectionString, string taskHubName)
{
var cloudStorageAccount = CloudStorageAccount.Parse(connectionString);
CloudBlobClient serviceClient = cloudStorageAccount.CreateCloudBlobClient();
string containerName = BlobManager.GetContainerName(taskHubName);
this.blobContainer = serviceClient.GetContainerReference(containerName);
this.taskHubName = taskHubName;
}

public TimeSpan PublishInterval => TimeSpan.FromSeconds(10);

public Task DeleteIfExistsAsync(CancellationToken cancellationToken)
{
// not needed since this blob is stored together with the taskhub storage
return Task.CompletedTask;
}

public Task CreateIfNotExistsAsync(CancellationToken cancellationToken)
{
// not needed since this blob is stored together with the taskhub storage
return Task.CompletedTask;
}

public Task PublishAsync(Dictionary<uint, PartitionLoadInfo> info, CancellationToken cancellationToken)
{
Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo)
{
var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}");
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented);
return blob.UploadTextAsync(json, cancellationToken);
}

List<Task> tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList();
return Task.WhenAll(tasks);
}

public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationToken cancellationToken)
{
if (!this.numPartitions.HasValue)
{
// determine number of partitions of taskhub
var blob = this.blobContainer.GetBlockBlobReference("taskhubparameters.json");
var jsonText = await blob.DownloadTextAsync().ConfigureAwait(false);
var info = JsonConvert.DeserializeObject<EventHubs.TaskhubParameters>(jsonText);
this.numPartitions = info.StartPositions.Length;
}

async Task<(uint,PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId)
{
var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}");
var blob = blobDirectory.GetBlockBlobReference("loadinfo.json");
string json = await blob.DownloadTextAsync(cancellationToken);
PartitionLoadInfo info = JsonConvert.DeserializeObject<PartitionLoadInfo>(json);
return (partitionId, info);
}

var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DownloadPartitionInfo((uint) partitionId)).ToList();
await Task.WhenAll(tasks);
return tasks.Select(task => task.Result).ToDictionary(pair => pair.Item1, pair => pair.Item2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ namespace DurableTask.Netherite.Scaling
using System.Threading;
using System.Threading.Tasks;

class AzureLoadMonitorTable : ILoadMonitorService
class AzureTableLoadMonitor : ILoadMonitorService
{
readonly CloudTable table;
readonly string taskHubName;

public AzureLoadMonitorTable(string connectionString, string tableName, string taskHubName)
public AzureTableLoadMonitor(string connectionString, string tableName, string taskHubName)
{
var account = CloudStorageAccount.Parse(connectionString);
this.table = account.CreateCloudTableClient().GetTableReference(tableName);
Expand Down
13 changes: 13 additions & 0 deletions src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,49 @@ namespace DurableTask.Netherite.Scaling
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;

/// <summary>
/// Reported load information about a specific partition.
/// </summary>
[DataContract]
public class PartitionLoadInfo
{
/// <summary>
/// The number of orchestration work items waiting to be processed.
/// </summary>
[DataMember]
public int WorkItems { get; set; }

/// <summary>
/// The number of activities that are waiting to be processed.
/// </summary>
[DataMember]
public int Activities { get; set; }

/// <summary>
/// The number of timers that are waiting to fire.
/// </summary>
[DataMember]
public int Timers { get; set; }

/// <summary>
/// The number of client requests waiting to be processed.
/// </summary>
[DataMember]
public int Requests { get; set; }

/// <summary>
/// The number of work items that have messages waiting to be sent.
/// </summary>
[DataMember]
public int Outbox { get; set; }

/// <summary>
/// The next time on which to wake up.
/// </summary>
[DataMember]
public DateTime? Wakeup { get; set; }

/// <summary>
Expand Down Expand Up @@ -80,26 +88,31 @@ public string IsBusy()
/// <summary>
/// The input queue position of this partition, which is the next expected EventHubs sequence number.
/// </summary>
[DataMember]
public long InputQueuePosition { get; set; }

/// <summary>
/// The commit log position of this partition.
/// </summary>
[DataMember]
public long CommitLogPosition { get; set; }

/// <summary>
/// The worker id of the host that is currently running this partition.
/// </summary>
[DataMember]
public string WorkerId { get; set; }

/// <summary>
/// A string encoding of the latency trend.
/// </summary>
[DataMember]
public string LatencyTrend { get; set; }

/// <summary>
/// Percentage of message batches that miss in the cache.
/// </summary>
[DataMember]
public double MissRate { get; set; }

/// <summary>
Expand Down
Loading