Skip to content

Commit

Permalink
Combine position and load publishing, and make sure position publishi…
Browse files Browse the repository at this point in the history
…ng waits for persistence (#137)
  • Loading branch information
sebastianburckhardt authored Apr 13, 2022
1 parent 43b5532 commit b050e46
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 43 deletions.
86 changes: 49 additions & 37 deletions src/DurableTask.Netherite/StorageProviders/Faster/StoreWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ class StoreWorker : BatchWorker<PartitionEvent>

// periodic load publishing
PartitionLoadInfo loadInfo;
DateTime lastLoadPublished;
DateTime lastPositionsPublished;
DateTime lastPublished;
string lastPublishedLatencyTrend;
public static TimeSpan LoadPublishInterval = TimeSpan.FromSeconds(8);
public static TimeSpan PositionsPublishInterval = TimeSpan.FromSeconds(8);
public static TimeSpan PublishInterval = TimeSpan.FromSeconds(8);
public static TimeSpan PokePeriod = TimeSpan.FromSeconds(3); // allows storeworker to checkpoint and publish load even while idle


Expand All @@ -63,8 +61,7 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel
this.replayChecker = this.partition.Settings.TestHooks?.ReplayChecker;

this.loadInfo = PartitionLoadInfo.FirstFrame(this.partition.Settings.WorkerId);
this.lastLoadPublished = DateTime.MinValue;
this.lastPositionsPublished = DateTime.MinValue;
this.lastPublished = DateTime.MinValue;
this.lastPublishedLatencyTrend = "";

// construct an effect tracker that we use to apply effects to the store
Expand Down Expand Up @@ -167,7 +164,40 @@ public Task RunPrefetchSession(IAsyncEnumerable<TrackedObjectKey> keys)
return this.store.RunPrefetchSession(keys);
}

async Task PublishPartitionLoad()
async ValueTask PublishLoadAndPositions()
{
bool publishLoadInfo = await this.UpdateLoadInfo();

// take the current load info and put the next frame in its place
var loadInfoToPublish = this.loadInfo;
this.loadInfo = loadInfoToPublish.NextFrame();

this.lastPublishedLatencyTrend = loadInfoToPublish.LatencyTrend;

this.partition.TraceHelper.TracePartitionLoad(loadInfoToPublish);

var positionsReceived = await this.CollectSendAndReceivePositions();

// to avoid publishing not-yet committed state, publish
// only after the current log is persisted.
if (publishLoadInfo || positionsReceived != null)
{
var task = this.LogWorker.WaitForCompletionAsync()
.ContinueWith((t) =>
{
if (publishLoadInfo)
{
this.partition.LoadPublisher?.Submit((this.partition.PartitionId, loadInfoToPublish));
}
if (positionsReceived != null)
{
this.partition.Send(positionsReceived);
}
});
}
}

async ValueTask<bool> UpdateLoadInfo()
{
foreach (var k in TrackedObjectKey.GetSingletons())
{
Expand Down Expand Up @@ -200,24 +230,10 @@ async Task PublishPartitionLoad()
publish = true;
}

if (publish)
{
// take the current load info and put the next frame in its place
var loadInfoToPublish = this.loadInfo;
this.loadInfo = loadInfoToPublish.NextFrame();
this.lastLoadPublished = DateTime.UtcNow;
this.lastPublishedLatencyTrend = loadInfoToPublish.LatencyTrend;

this.partition.TraceHelper.TracePartitionLoad(loadInfoToPublish);

// to avoid publishing not-yet committed state, publish
// only after the current log is persisted.
var task = this.LogWorker.WaitForCompletionAsync()
.ContinueWith((t) => this.partition.LoadPublisher?.Submit((this.partition.PartitionId, loadInfoToPublish)));
}
return publish;
}

async Task PublishSendAndReceivePositions()
async ValueTask<PositionsReceived> CollectSendAndReceivePositions()
{
int numberPartitions = (int)this.partition.NumberPartitions();
if (numberPartitions > 1)
Expand All @@ -232,9 +248,11 @@ async Task PublishSendAndReceivePositions()
((DedupState)await this.store.ReadAsync(TrackedObjectKey.Dedup, this.effectTracker)).RecordPositions(positionsReceived);
((OutboxState)await this.store.ReadAsync(TrackedObjectKey.Outbox, this.effectTracker)).RecordPositions(positionsReceived);

this.partition.Send(positionsReceived);

this.lastPositionsPublished = DateTime.UtcNow;
return positionsReceived;
}
else
{
return null;
}
}

Expand Down Expand Up @@ -420,18 +438,12 @@ protected override async Task Process(IList<PartitionEvent> batch)
this.pendingCheckpointTrigger = trigger;
this.pendingCompaction = this.RunCompactionAsync(compactUntil);
}


// periodically publish the partition load information
if (this.lastLoadPublished + LoadPublishInterval < DateTime.UtcNow)
{
await this.PublishPartitionLoad();
}

// periodically publish the send and receive positions
if (this.lastPositionsPublished + PositionsPublishInterval < DateTime.UtcNow)

// periodically publish the partition load information and the send/receive positions
if (this.lastPublished + PublishInterval < DateTime.UtcNow)
{
await this.PublishSendAndReceivePositions();
this.lastPublished = DateTime.UtcNow;
await this.PublishLoadAndPositions();
}

if (this.partition.NumberPartitions() > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
try
{

bool[] sent = new bool[32];
bool[] sentLoadInformationReceived = new bool[32];
bool[] sentPositionsReceived = new bool[32];

this.stopwatch.Restart();
int numEvents = 0;

for (int i = toSend.Count - 1; i >= 0; i--)
{
Expand All @@ -61,13 +64,24 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
// send only the most recent packet from each partition
if (evt is LoadInformationReceived loadInformationReceived)
{
if (sent[loadInformationReceived.PartitionId])
if (sentLoadInformationReceived[loadInformationReceived.PartitionId])
{
continue;
}
else
{
sentLoadInformationReceived[loadInformationReceived.PartitionId] = true;
}
}
else if (evt is PositionsReceived positionsReceived)
{
if (sentPositionsReceived[positionsReceived.PartitionId])
{
continue;
}
else
{
sent[loadInformationReceived.PartitionId] = true;
sentPositionsReceived[positionsReceived.PartitionId] = true;
}
}

Expand All @@ -76,16 +90,20 @@ protected override async Task Process(IList<LoadMonitorEvent> toSend)
var arraySegment = new ArraySegment<byte>(this.stream.GetBuffer(), 0, length);
var eventData = new EventData(arraySegment);
await this.sender.SendAsync(eventData);
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} sent packet ({size} bytes) id={eventId}", this.eventHubName, this.eventHubPartition, length, evt.EventIdString);
this.stream.Seek(0, SeekOrigin.Begin);
numEvents++;
}

long elapsed = this.stopwatch.ElapsedMilliseconds;
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} sent info numEvents={numEvents} latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, numEvents, elapsed);

// rate limit this sender by making each iteration last at least 100ms duration
long toSpare = 100 - this.stopwatch.ElapsedMilliseconds;
long toSpare = 100 - elapsed;
if (toSpare > 10)
{
await Task.Delay(TimeSpan.FromMilliseconds(toSpare));
this.traceHelper.LogDebug("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
this.traceHelper.LogTrace("EventHubsSender {eventHubName}/{eventHubPartitionId} iteration padded to latencyMs={latencyMs}", this.eventHubName, this.eventHubPartition, this.stopwatch.ElapsedMilliseconds);
}
}
catch (Exception e)
Expand Down

0 comments on commit b050e46

Please sign in to comment.