From b050e468d26195efb4f8d142580330de6773c5e3 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 13 Apr 2022 15:07:51 -0700 Subject: [PATCH] Combine position and load publishing, and make sure position publishing waits for persistence (#137) --- .../StorageProviders/Faster/StoreWorker.cs | 86 +++++++++++-------- .../EventHubs/LoadMonitorSender.cs | 30 +++++-- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs index f48aa4e3..75a19aca 100644 --- a/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs @@ -42,11 +42,9 @@ class StoreWorker : BatchWorker // periodic load publishing PartitionLoadInfo loadInfo; - DateTime lastLoadPublished; - DateTime lastPositionsPublished; + DateTime lastPublished; string lastPublishedLatencyTrend; - public static TimeSpan LoadPublishInterval = TimeSpan.FromSeconds(8); - public static TimeSpan PositionsPublishInterval = TimeSpan.FromSeconds(8); + public static TimeSpan PublishInterval = TimeSpan.FromSeconds(8); public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle @@ -63,8 +61,7 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel this.replayChecker = this.partition.Settings.TestHooks?.ReplayChecker; this.loadInfo = PartitionLoadInfo.FirstFrame(this.partition.Settings.WorkerId); - this.lastLoadPublished = DateTime.MinValue; - this.lastPositionsPublished = DateTime.MinValue; + this.lastPublished = DateTime.MinValue; this.lastPublishedLatencyTrend = ""; // construct an effect tracker that we use to apply effects to the store @@ -167,7 +164,40 @@ public Task RunPrefetchSession(IAsyncEnumerable keys) return this.store.RunPrefetchSession(keys); } - async Task PublishPartitionLoad() + async ValueTask PublishLoadAndPositions() + { + bool publishLoadInfo = await this.UpdateLoadInfo(); + + // take the current load info and put the next frame in its place + var loadInfoToPublish = this.loadInfo; + this.loadInfo = loadInfoToPublish.NextFrame(); + + this.lastPublishedLatencyTrend = loadInfoToPublish.LatencyTrend; + + this.partition.TraceHelper.TracePartitionLoad(loadInfoToPublish); + + var positionsReceived = await this.CollectSendAndReceivePositions(); + + // to avoid publishing not-yet committed state, publish + // only after the current log is persisted. + if (publishLoadInfo || positionsReceived != null) + { + var task = this.LogWorker.WaitForCompletionAsync() + .ContinueWith((t) => + { + if (publishLoadInfo) + { + this.partition.LoadPublisher?.Submit((this.partition.PartitionId, loadInfoToPublish)); + } + if (positionsReceived != null) + { + this.partition.Send(positionsReceived); + } + }); + } + } + + async ValueTask UpdateLoadInfo() { foreach (var k in TrackedObjectKey.GetSingletons()) { @@ -200,24 +230,10 @@ async Task PublishPartitionLoad() publish = true; } - if (publish) - { - // take the current load info and put the next frame in its place - var loadInfoToPublish = this.loadInfo; - this.loadInfo = loadInfoToPublish.NextFrame(); - this.lastLoadPublished = DateTime.UtcNow; - this.lastPublishedLatencyTrend = loadInfoToPublish.LatencyTrend; - - this.partition.TraceHelper.TracePartitionLoad(loadInfoToPublish); - - // to avoid publishing not-yet committed state, publish - // only after the current log is persisted. - var task = this.LogWorker.WaitForCompletionAsync() - .ContinueWith((t) => this.partition.LoadPublisher?.Submit((this.partition.PartitionId, loadInfoToPublish))); - } + return publish; } - async Task PublishSendAndReceivePositions() + async ValueTask CollectSendAndReceivePositions() { int numberPartitions = (int)this.partition.NumberPartitions(); if (numberPartitions > 1) @@ -232,9 +248,11 @@ async Task PublishSendAndReceivePositions() ((DedupState)await this.store.ReadAsync(TrackedObjectKey.Dedup, this.effectTracker)).RecordPositions(positionsReceived); ((OutboxState)await this.store.ReadAsync(TrackedObjectKey.Outbox, this.effectTracker)).RecordPositions(positionsReceived); - this.partition.Send(positionsReceived); - - this.lastPositionsPublished = DateTime.UtcNow; + return positionsReceived; + } + else + { + return null; } } @@ -420,18 +438,12 @@ protected override async Task Process(IList batch) this.pendingCheckpointTrigger = trigger; this.pendingCompaction = this.RunCompactionAsync(compactUntil); } - - - // periodically publish the partition load information - if (this.lastLoadPublished + LoadPublishInterval < DateTime.UtcNow) - { - await this.PublishPartitionLoad(); - } - - // periodically publish the send and receive positions - if (this.lastPositionsPublished + PositionsPublishInterval < DateTime.UtcNow) + + // periodically publish the partition load information and the send/receive positions + if (this.lastPublished + PublishInterval < DateTime.UtcNow) { - await this.PublishSendAndReceivePositions(); + this.lastPublished = DateTime.UtcNow; + await this.PublishLoadAndPositions(); } if (this.partition.NumberPartitions() > 1) diff --git a/src/DurableTask.Netherite/TransportProviders/EventHubs/LoadMonitorSender.cs b/src/DurableTask.Netherite/TransportProviders/EventHubs/LoadMonitorSender.cs index c38446bd..af88a296 100644 --- a/src/DurableTask.Netherite/TransportProviders/EventHubs/LoadMonitorSender.cs +++ b/src/DurableTask.Netherite/TransportProviders/EventHubs/LoadMonitorSender.cs @@ -51,8 +51,11 @@ protected override async Task Process(IList toSend) try { - bool[] sent = new bool[32]; + bool[] sentLoadInformationReceived = new bool[32]; + bool[] sentPositionsReceived = new bool[32]; + this.stopwatch.Restart(); + int numEvents = 0; for (int i = toSend.Count - 1; i >= 0; i--) { @@ -61,13 +64,24 @@ protected override async Task Process(IList toSend) // send only the most recent packet from each partition if (evt is LoadInformationReceived loadInformationReceived) { - if (sent[loadInformationReceived.PartitionId]) + if (sentLoadInformationReceived[loadInformationReceived.PartitionId]) + { + continue; + } + else + { + sentLoadInformationReceived[loadInformationReceived.PartitionId] = true; + } + } + else if (evt is PositionsReceived positionsReceived) + { + if (sentPositionsReceived[positionsReceived.PartitionId]) { continue; } else { - sent[loadInformationReceived.PartitionId] = true; + sentPositionsReceived[positionsReceived.PartitionId] = true; } } @@ -76,16 +90,20 @@ protected override async Task Process(IList toSend) var arraySegment = new ArraySegment(this.stream.GetBuffer(), 0, length); var eventData = new EventData(arraySegment); await this.sender.SendAsync(eventData); - this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString); + this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString); this.stream.Seek(0, SeekOrigin.Begin); + numEvents++; } + long elapsed = this.stopwatch.ElapsedMilliseconds; + this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent info numEvents={numEvents} latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, numEvents, elapsed); + // rate limit this sender by making each iteration last at least 100ms duration - long toSpare = 100 - this.stopwatch.ElapsedMilliseconds; + long toSpare = 100 - elapsed; if (toSpare > 10) { await Task.Delay(TimeSpan.FromMilliseconds(toSpare)); - this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds); + this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds); } } catch (Exception e)