From 733b989598fb64e79a046e083584a7282d53a0a1 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Tue, 11 May 2021 12:54:55 -0700 Subject: [PATCH] fix bug in scalemonitor, and add support for using blobs instead of tables --- DurableTask.Netherite.sln | 2 +- .../NetheriteProvider.cs | 96 +++++++++++-------- .../NetheriteOrchestrationService.cs | 11 ++- .../NetheriteOrchestrationServiceSettings.cs | 3 +- .../Scaling/AzureBlobLoadMonitor.cs | 86 +++++++++++++++++ ...nitorTable.cs => AzureTableLoadMonitor.cs} | 4 +- .../Scaling/PartitionLoadInfo.cs | 13 +++ .../Scaling/ScalingMonitor.cs | 35 ++++--- .../Faster/AzureBlobs/BlobManager.cs | 2 +- .../PerformanceTests/scripts/scalemonitor.ps1 | 11 +++ .../ScaleMonitor}/ConsoleLoggerProvider.cs | 0 .../ScaleMonitor}/Program.cs | 2 - .../ScaleMonitor/ScaleMonitor.csproj | 2 +- 13 files changed, 203 insertions(+), 64 deletions(-) create mode 100644 src/DurableTask.Netherite/Scaling/AzureBlobLoadMonitor.cs rename src/DurableTask.Netherite/Scaling/{AzureLoadMonitorTable.cs => AzureTableLoadMonitor.cs} (98%) create mode 100644 test/PerformanceTests/scripts/scalemonitor.ps1 rename {ScalingTests => test/ScaleMonitor}/ConsoleLoggerProvider.cs (100%) rename {ScalingTests => test/ScaleMonitor}/Program.cs (99%) rename ScalingTests/ScalingTests.csproj => test/ScaleMonitor/ScaleMonitor.csproj (65%) diff --git a/DurableTask.Netherite.sln b/DurableTask.Netherite.sln index 8526b909..150ea612 100644 --- a/DurableTask.Netherite.sln +++ b/DurableTask.Netherite.sln @@ -33,7 +33,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventProducer", "test\Event EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventConsumer", "test\EventConsumer\EventConsumer.csproj", "{1D16A6FD-944C-49A1-8727-6236861F437A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ScalingTests", "ScalingTests\ScalingTests.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScaleMonitor", "test\ScaleMonitor\ScaleMonitor.csproj", "{2F4D331C-62E4-47E8-852E-163166944DF8}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 5ed556ca..7b42511f 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -6,7 +6,9 @@ namespace DurableTask.Netherite.AzureFunctions using System; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; + using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; @@ -124,10 +126,17 @@ public override bool TryGetScaleMonitor( } } - class ScaleMonitor : IScaleMonitor + public class NetheriteScaleMetrics : ScaleMetrics + { + public byte[] Metrics { get; set; } + } + + class ScaleMonitor : IScaleMonitor { readonly ScalingMonitor scalingMonitor; readonly ScaleMonitorDescriptor descriptor; + readonly DataContractSerializer serializer = new DataContractSerializer(typeof(ScalingMonitor.Metrics)); + static Tuple cachedMetrics; public ScaleMonitor(ScalingMonitor scalingMonitor) { @@ -137,10 +146,6 @@ public ScaleMonitor(ScalingMonitor scalingMonitor) public ScaleMonitorDescriptor Descriptor => this.descriptor; - public class NetheriteScaleMetrics : ScaleMetrics - { - public ScalingMonitor.Metrics Metrics { get; set; } - } async Task IScaleMonitor.GetMetricsAsync() { @@ -149,27 +154,37 @@ async Task IScaleMonitor.GetMetricsAsync() public async Task GetMetricsAsync() { + // if we recently collected the metrics, return the cached result now. + var cached = cachedMetrics; + if (cached != null && DateTime.UtcNow - cached.Item1 < TimeSpan.FromSeconds(1.5)) + { + return cached.Item2; + } + + var metrics = new NetheriteScaleMetrics(); + try { Stopwatch sw = new Stopwatch(); sw.Start(); - var metrics = await this.scalingMonitor.CollectMetrics(); + var collectedMetrics = await this.scalingMonitor.CollectMetrics(); sw.Stop(); - this.scalingMonitor.Logger.LogInformation( - "Collected scale info for {partitionCount} partitions in {latencyMs:F2}ms.", - metrics.LoadInformation.Count, sw.Elapsed.TotalMilliseconds); + var stream = new MemoryStream(); + this.serializer.WriteObject(stream, collectedMetrics); + metrics.Metrics = stream.ToArray(); - return new NetheriteScaleMetrics() - { - Metrics = metrics - }; + this.scalingMonitor.Logger.LogInformation( + "Collected scale info for {partitionCount} partitions at {time:o} in {latencyMs:F2}ms.", + collectedMetrics.LoadInformation.Count, collectedMetrics.Timestamp, sw.Elapsed.TotalMilliseconds); } catch (Exception e) when (!Utils.IsFatal(e)) { this.scalingMonitor.Logger.LogError("IScaleMonitor.GetMetricsAsync() failed: {exception}", e); - throw; } + + cachedMetrics = new Tuple(DateTime.UtcNow, metrics); + return metrics; } ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) @@ -184,44 +199,45 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext cont ScaleStatus GetScaleStatusCore(int workerCount, NetheriteScaleMetrics[] metrics) { + ScaleRecommendation recommendation; try - { - ScaleStatus scaleStatus = new ScaleStatus(); - ScaleRecommendation recommendation; - + { if (metrics.Length == 0) { recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "missing metrics"); } else { - recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, metrics[^1].Metrics); - } - - switch (recommendation.Action) - { - case ScaleAction.AddWorker: - scaleStatus.Vote = ScaleVote.ScaleOut; - break; - case ScaleAction.RemoveWorker: - scaleStatus.Vote = ScaleVote.ScaleIn; - break; - default: - scaleStatus.Vote = ScaleVote.None; - break; + var stream = new MemoryStream(metrics[^1].Metrics); + var collectedMetrics = (ScalingMonitor.Metrics) this.serializer.ReadObject(stream); + recommendation = this.scalingMonitor.GetScaleRecommendation(workerCount, collectedMetrics); } - - this.scalingMonitor.Logger.LogInformation( - "Autoscaler recommends: {scaleRecommendation} because: {reason}", - scaleStatus.Vote.ToString(), recommendation.Reason); - - return scaleStatus; - } + } catch (Exception e) when (!Utils.IsFatal(e)) { this.scalingMonitor.Logger.LogError("IScaleMonitor.GetScaleStatus() failed: {exception}", e); - throw; + recommendation = new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: "unexpected error"); + } + + ScaleStatus scaleStatus = new ScaleStatus(); + switch (recommendation?.Action) + { + case ScaleAction.AddWorker: + scaleStatus.Vote = ScaleVote.ScaleOut; + break; + case ScaleAction.RemoveWorker: + scaleStatus.Vote = ScaleVote.ScaleIn; + break; + default: + scaleStatus.Vote = ScaleVote.None; + break; } + + this.scalingMonitor.Logger.LogInformation( + "Netherite autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}", + scaleStatus.Vote.ToString(), workerCount, recommendation.Reason); + + return scaleStatus; } } } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 73e99525..01226a7b 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -117,7 +117,16 @@ public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings setti this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, this.StorageAccountName, this.Settings.HubName); if (this.configuredTransport != TransportConnectionString.TransportChoices.Memory) - this.LoadMonitorService = new AzureLoadMonitorTable(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName); + { + if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName)) + { + this.LoadMonitorService = new AzureTableLoadMonitor(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName); + } + else + { + this.LoadMonitorService = new AzureBlobLoadMonitor(settings.ResolvedStorageConnectionString, settings.HubName); + } + } this.workItemStopwatch.Start(); diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 7d5a49fa..4488d0cc 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -55,7 +55,8 @@ public class NetheriteOrchestrationServiceSettings public int PartitionCount { get; set; } = 12; /// - /// The name to use for the Azure table with the load information + /// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty, + /// then Azure blobs are used instead. /// public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions"; diff --git a/src/DurableTask.Netherite/Scaling/AzureBlobLoadMonitor.cs b/src/DurableTask.Netherite/Scaling/AzureBlobLoadMonitor.cs new file mode 100644 index 00000000..e9dd083b --- /dev/null +++ b/src/DurableTask.Netherite/Scaling/AzureBlobLoadMonitor.cs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Scaling +{ + using DurableTask.Netherite.Faster; + using Microsoft.Azure.Storage; + using Microsoft.Azure.Storage.Blob; + using Newtonsoft.Json; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + + class AzureBlobLoadMonitor : ILoadMonitorService + { + readonly string taskHubName; + readonly CloudBlobContainer blobContainer; + + int? numPartitions; + + public AzureBlobLoadMonitor(string connectionString, string taskHubName) + { + var cloudStorageAccount = CloudStorageAccount.Parse(connectionString); + CloudBlobClient serviceClient = cloudStorageAccount.CreateCloudBlobClient(); + string containerName = BlobManager.GetContainerName(taskHubName); + this.blobContainer = serviceClient.GetContainerReference(containerName); + this.taskHubName = taskHubName; + } + + public TimeSpan PublishInterval => TimeSpan.FromSeconds(10); + + public Task DeleteIfExistsAsync(CancellationToken cancellationToken) + { + // not needed since this blob is stored together with the taskhub storage + return Task.CompletedTask; + } + + public Task CreateIfNotExistsAsync(CancellationToken cancellationToken) + { + // not needed since this blob is stored together with the taskhub storage + return Task.CompletedTask; + } + + public Task PublishAsync(Dictionary info, CancellationToken cancellationToken) + { + Task UploadPartitionInfo(uint partitionId, PartitionLoadInfo loadInfo) + { + var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}"); + var blob = blobDirectory.GetBlockBlobReference("loadinfo.json"); + var json = JsonConvert.SerializeObject(loadInfo, Formatting.Indented); + return blob.UploadTextAsync(json, cancellationToken); + } + + List tasks = info.Select(kvp => UploadPartitionInfo(kvp.Key, kvp.Value)).ToList(); + return Task.WhenAll(tasks); + } + + public async Task> QueryAsync(CancellationToken cancellationToken) + { + if (!this.numPartitions.HasValue) + { + // determine number of partitions of taskhub + var blob = this.blobContainer.GetBlockBlobReference("taskhubparameters.json"); + var jsonText = await blob.DownloadTextAsync().ConfigureAwait(false); + var info = JsonConvert.DeserializeObject(jsonText); + this.numPartitions = info.StartPositions.Length; + } + + async Task<(uint,PartitionLoadInfo)> DownloadPartitionInfo(uint partitionId) + { + var blobDirectory = this.blobContainer.GetDirectoryReference($"p{partitionId:D2}"); + var blob = blobDirectory.GetBlockBlobReference("loadinfo.json"); + string json = await blob.DownloadTextAsync(cancellationToken); + PartitionLoadInfo info = JsonConvert.DeserializeObject(json); + return (partitionId, info); + } + + var tasks = Enumerable.Range(0, this.numPartitions.Value).Select(partitionId => DownloadPartitionInfo((uint) partitionId)).ToList(); + await Task.WhenAll(tasks); + return tasks.Select(task => task.Result).ToDictionary(pair => pair.Item1, pair => pair.Item2); + } + } +} diff --git a/src/DurableTask.Netherite/Scaling/AzureLoadMonitorTable.cs b/src/DurableTask.Netherite/Scaling/AzureTableLoadMonitor.cs similarity index 98% rename from src/DurableTask.Netherite/Scaling/AzureLoadMonitorTable.cs rename to src/DurableTask.Netherite/Scaling/AzureTableLoadMonitor.cs index 8620c26b..14252577 100644 --- a/src/DurableTask.Netherite/Scaling/AzureLoadMonitorTable.cs +++ b/src/DurableTask.Netherite/Scaling/AzureTableLoadMonitor.cs @@ -11,12 +11,12 @@ namespace DurableTask.Netherite.Scaling using System.Threading; using System.Threading.Tasks; - class AzureLoadMonitorTable : ILoadMonitorService + class AzureTableLoadMonitor : ILoadMonitorService { readonly CloudTable table; readonly string taskHubName; - public AzureLoadMonitorTable(string connectionString, string tableName, string taskHubName) + public AzureTableLoadMonitor(string connectionString, string tableName, string taskHubName) { var account = CloudStorageAccount.Parse(connectionString); this.table = account.CreateCloudTableClient().GetTableReference(tableName); diff --git a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs index 8c924d8e..9d146cb2 100644 --- a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs +++ b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs @@ -6,41 +6,49 @@ namespace DurableTask.Netherite.Scaling using System; using System.Collections.Generic; using System.Linq; + using System.Runtime.Serialization; using System.Text; /// /// Reported load information about a specific partition. /// + [DataContract] public class PartitionLoadInfo { /// /// The number of orchestration work items waiting to be processed. /// + [DataMember] public int WorkItems { get; set; } /// /// The number of activities that are waiting to be processed. /// + [DataMember] public int Activities { get; set; } /// /// The number of timers that are waiting to fire. /// + [DataMember] public int Timers { get; set; } /// /// The number of client requests waiting to be processed. /// + [DataMember] public int Requests { get; set; } /// /// The number of work items that have messages waiting to be sent. /// + [DataMember] public int Outbox { get; set; } /// /// The next time on which to wake up. /// + [DataMember] public DateTime? Wakeup { get; set; } /// @@ -80,26 +88,31 @@ public string IsBusy() /// /// The input queue position of this partition, which is the next expected EventHubs sequence number. /// + [DataMember] public long InputQueuePosition { get; set; } /// /// The commit log position of this partition. /// + [DataMember] public long CommitLogPosition { get; set; } /// /// The worker id of the host that is currently running this partition. /// + [DataMember] public string WorkerId { get; set; } /// /// A string encoding of the latency trend. /// + [DataMember] public string LatencyTrend { get; set; } /// /// Percentage of message batches that miss in the cache. /// + [DataMember] public double MissRate { get; set; } /// diff --git a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs index 6287d5ba..f0a03606 100644 --- a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs +++ b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs @@ -7,6 +7,7 @@ namespace DurableTask.Netherite.Scaling using System.Collections; using System.Collections.Generic; using System.Linq; + using System.Runtime.Serialization; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -24,9 +25,7 @@ public class ScalingMonitor readonly string taskHubName; readonly TransportConnectionString.TransportChoices configuredTransport; - readonly AzureLoadMonitorTable table; - - Metrics? CachedMetrics; + readonly ILoadMonitorService loadMonitor; /// /// The name of the taskhub. @@ -60,33 +59,45 @@ public ScalingMonitor( TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport); - this.table = new AzureLoadMonitorTable(storageConnectionString, partitionLoadTableName, taskHubName); + if (!string.IsNullOrEmpty(partitionLoadTableName)) + { + this.loadMonitor = new AzureTableLoadMonitor(storageConnectionString, partitionLoadTableName, taskHubName); + } + else + { + this.loadMonitor = new AzureBlobLoadMonitor(storageConnectionString, taskHubName); + } } /// /// The metrics that are collected prior to making a scaling decision /// + [DataContract] public struct Metrics { /// /// the most recent load information published for each partition /// + [DataMember] public Dictionary LoadInformation { get; set; } /// /// A reason why the taskhub is not idle, or null if it is idle /// + [DataMember] public string Busy { get; set; } /// /// Whether the taskhub is idle /// + [IgnoreDataMember] public bool TaskHubIsIdle => string.IsNullOrEmpty(this.Busy); /// /// The time at which the metrics were collected /// - public DateTime Timestamp; + [DataMember] + public DateTime Timestamp { get; set; } } /// @@ -96,21 +107,15 @@ public struct Metrics public async Task CollectMetrics() { DateTime now = DateTime.UtcNow; - - if (this.CachedMetrics.HasValue && now - this.CachedMetrics.Value.Timestamp < TimeSpan.FromSeconds(1.5)) - { - return this.CachedMetrics.Value; - } - - var loadInformation = await this.table.QueryAsync(CancellationToken.None).ConfigureAwait(false); + var loadInformation = await this.loadMonitor.QueryAsync(CancellationToken.None).ConfigureAwait(false); var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false); - return (this.CachedMetrics = new Metrics() - { + return new Metrics() + { LoadInformation = loadInformation, Busy = busy, Timestamp = now, - }).Value; + }; } /// diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs index 3265ca7c..98e9b130 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/AzureBlobs/BlobManager.cs @@ -291,7 +291,7 @@ public BlobManager( Task LeaseMaintenanceLoopTask = Task.CompletedTask; volatile Task NextLeaseRenewalTask = Task.CompletedTask; - static string GetContainerName(string taskHubName) => taskHubName.ToLowerInvariant() + "-storage"; + public static string GetContainerName(string taskHubName) => taskHubName.ToLowerInvariant() + "-storage"; public async Task StartAsync() { diff --git a/test/PerformanceTests/scripts/scalemonitor.ps1 b/test/PerformanceTests/scripts/scalemonitor.ps1 new file mode 100644 index 00000000..0cd1e67f --- /dev/null +++ b/test/PerformanceTests/scripts/scalemonitor.ps1 @@ -0,0 +1,11 @@ +#!/usr/bin/pwsh + +# read the settings that are common to all scripts +. ./settings.ps1 + +# look up the two connection strings and assign them to the respective environment variables +$Env:AzureWebJobsStorage = (az storage account show-connection-string --name $storageName --resource-group $groupName | ConvertFrom-Json).connectionString +$Env:EventHubsConnection = (az eventhubs namespace authorization-rule keys list --resource-group $groupName --namespace-name $namespaceName --name RootManageSharedAccessKey | ConvertFrom-Json).primaryConnectionString + +# open visual studio +dotnet run -p ..\..\ScalingTests\ScalingTests.csproj diff --git a/ScalingTests/ConsoleLoggerProvider.cs b/test/ScaleMonitor/ConsoleLoggerProvider.cs similarity index 100% rename from ScalingTests/ConsoleLoggerProvider.cs rename to test/ScaleMonitor/ConsoleLoggerProvider.cs diff --git a/ScalingTests/Program.cs b/test/ScaleMonitor/Program.cs similarity index 99% rename from ScalingTests/Program.cs rename to test/ScaleMonitor/Program.cs index 8a1ecd19..ac02404c 100644 --- a/ScalingTests/Program.cs +++ b/test/ScaleMonitor/Program.cs @@ -7,7 +7,6 @@ namespace ScalingTests using System.Threading.Tasks; using DurableTask.Netherite; using DurableTask.Netherite.Scaling; - using Dynamitey; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -19,7 +18,6 @@ static async Task Main(string[] args) { HubName = "perftests", PartitionManagement = PartitionManagementOptions.ClientOnly, - LogLevelLimit = LogLevel.Trace, EventLogLevelLimit = LogLevel.Trace, StorageLogLevelLimit = LogLevel.Trace, diff --git a/ScalingTests/ScalingTests.csproj b/test/ScaleMonitor/ScaleMonitor.csproj similarity index 65% rename from ScalingTests/ScalingTests.csproj rename to test/ScaleMonitor/ScaleMonitor.csproj index ddea2006..489da27a 100644 --- a/ScalingTests/ScalingTests.csproj +++ b/test/ScaleMonitor/ScaleMonitor.csproj @@ -6,7 +6,7 @@ - +