From fae606f47d51fa4c4871a9bbe0a7b18657a3233d Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 20 Oct 2021 08:11:51 -0700 Subject: [PATCH] various fixes to tracing (#84) --- .../FromClients/ClientTaskMessagesReceived.cs | 3 + .../FromClients/CreationRequestReceived.cs | 4 +- .../FromClients/DeletionRequestReceived.cs | 7 +- .../FromClients/HistoryRequestReceived.cs | 7 +- .../FromClients/StateRequestReceived.cs | 3 + .../FromClients/WaitRequestReceived.cs | 4 + .../Internal/BatchProcessed.cs | 6 +- .../PartitionEvents/Internal/TimerFired.cs | 4 + .../Events/PartitionEvents/PartitionEvent.cs | 6 + .../LoadMonitor/LoadMonitor.cs | 2 +- .../OrchestrationService/Client.cs | 2 +- .../NetheriteOrchestrationService.cs | 175 ++++++++++-------- .../OrchestrationService/Partition.cs | 2 +- .../PartitionState/HistoryState.cs | 6 +- .../Scaling/LoadPublisher.cs | 8 +- .../Scaling/ScalingMonitor.cs | 10 +- .../Tracing/EtwSource.cs | 98 ++++++---- .../Tracing/EventTraceHelper.cs | 23 +-- .../OrchestrationServiceTraceHelper.cs | 122 ++++++++++++ .../Tracing/WorkItemTraceHelper.cs | 20 +- 20 files changed, 339 insertions(+), 173 deletions(-) create mode 100644 src/DurableTask.Netherite/Tracing/OrchestrationServiceTraceHelper.cs diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs index 9b1d5f64..286d9e91 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs @@ -13,6 +13,9 @@ class ClientTaskMessagesReceived : ClientRequestEvent [DataMember] public TaskMessage[] TaskMessages { get; set; } + [IgnoreDataMember] + public override string TracedInstanceId => this.TaskMessages[0].OrchestrationInstance.InstanceId; + public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Sessions); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs index 2ae0b48f..91016047 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs @@ -32,11 +32,13 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public string InstanceId => this.ExecutionStartedEvent.OrchestrationInstance.InstanceId; + + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; [IgnoreDataMember] public override TrackedObjectKey Target => TrackedObjectKey.Instance(this.InstanceId); - public override bool OnReadComplete(TrackedObject target, Partition partition) { // Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client. diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs index 787c12bc..3beb267b 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs @@ -22,10 +22,7 @@ class DeletionRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public override TrackedObjectKey? Prefetch => TrackedObjectKey.History(this.InstanceId); - protected override void ExtraTraceInformation(StringBuilder s) - { - s.Append(' '); - s.Append(this.InstanceId); - } + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/HistoryRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/HistoryRequestReceived.cs index b306c1fe..9006b267 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/HistoryRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/HistoryRequestReceived.cs @@ -16,11 +16,8 @@ class HistoryRequestReceived : ClientReadonlyRequestEvent [IgnoreDataMember] public override TrackedObjectKey ReadTarget => TrackedObjectKey.History(this.InstanceId); - protected override void ExtraTraceInformation(StringBuilder s) - { - s.Append(' '); - s.Append(this.InstanceId); - } + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; public override void OnReadComplete(TrackedObject target, Partition partition) { diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/StateRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/StateRequestReceived.cs index 447cb68a..25a2134c 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/StateRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/StateRequestReceived.cs @@ -21,6 +21,9 @@ class StateRequestReceived : ClientReadonlyRequestEvent [IgnoreDataMember] public override TrackedObjectKey ReadTarget => TrackedObjectKey.Instance(this.InstanceId); + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; + protected override void ExtraTraceInformation(StringBuilder s) { s.Append(' '); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs index e8c93110..da0c7926 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs @@ -16,8 +16,12 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch [DataMember] public string ExecutionId { get; set; } + [IgnoreDataMember] public override TrackedObjectKey Target => TrackedObjectKey.Instance(this.InstanceId); + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; + protected override void ExtraTraceInformation(StringBuilder s) { s.Append(' '); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs index 1c0fedd1..7603b085 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs @@ -63,6 +63,9 @@ public enum PersistFirstStatus { NotRequired, Required, Done }; [IgnoreDataMember] public override EventId EventId => EventId.MakePartitionInternalEventId(this.PersistFirst == PersistFirstStatus.Done ? this.WorkItemId + "P" : this.WorkItemId); + [IgnoreDataMember] + public override string TracedInstanceId => this.InstanceId; + IEnumerable IRequiresPrefetch.KeysToPrefetch { get @@ -112,9 +115,6 @@ protected override void ExtraTraceInformation(StringBuilder s) s.Append(' '); s.Append(this.State.OrchestrationStatus); } - - s.Append(' '); - s.Append(this.InstanceId); } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs index 69ab1f2b..08b678d1 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs @@ -30,6 +30,10 @@ class TimerFired : PartitionUpdateEvent [IgnoreDataMember] public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId); + [IgnoreDataMember] + public override string TracedInstanceId => this.TaskMessage.OrchestrationInstance.InstanceId; + + public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Sessions); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs index dbb40889..cf8e7450 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionEvent.cs @@ -30,6 +30,12 @@ abstract class PartitionEvent : Event [IgnoreDataMember] public double IssuedTimestamp { get; set; } + /// + /// For tracing purposes. Subclasses can override this to provide the instance id. + /// + [IgnoreDataMember] + public virtual string TracedInstanceId => string.Empty; + // some events trigger some processing immediately upon receive (e.g. prefetches or queries) public virtual void OnSubmit(Partition partition) { } diff --git a/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs b/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs index 32e6f773..dfca8888 100644 --- a/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs +++ b/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs @@ -69,7 +69,7 @@ public LoadMonitor( TransportAbstraction.ISender batchSender) { this.host = host; - this.traceHelper = new LoadMonitorTraceHelper(host.Logger, host.Settings.LogLevelLimit, host.StorageAccountName, host.Settings.HubName); + this.traceHelper = new LoadMonitorTraceHelper(host.TraceHelper.Logger, host.Settings.LogLevelLimit, host.StorageAccountName, host.Settings.HubName); this.BatchSender = batchSender; this.LoadInfo = new SortedDictionary(); this.PendingOnSource = new List(); diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index c8ea00b4..2bbde310 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -49,7 +49,7 @@ public Client( this.host = host; this.ClientId = clientId; this.taskHubGuid = taskHubGuid; - this.traceHelper = new ClientTraceHelper(host.Logger, host.Settings.LogLevelLimit, host.StorageAccountName, host.Settings.HubName, this.ClientId); + this.traceHelper = new ClientTraceHelper(host.TraceHelper.Logger, host.Settings.LogLevelLimit, host.StorageAccountName, host.Settings.HubName, this.ClientId); this.workItemTraceHelper = workItemTraceHelper; this.account = host.StorageAccountName; this.BatchSender = batchSender; diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 20437097..ad302c02 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -69,9 +69,8 @@ internal Client CheckedClient internal WorkItemQueue OrchestrationWorkItemQueue { get; private set; } internal LoadPublisher LoadPublisher { get; private set; } - internal Guid ServiceInstanceId { get; } = Guid.NewGuid(); - internal ILogger Logger { get; } internal ILoggerFactory LoggerFactory { get; } + internal OrchestrationServiceTraceHelper TraceHelper { get; private set; } /// public override string ToString() @@ -89,68 +88,76 @@ public override string ToString() /// public NetheriteOrchestrationService(NetheriteOrchestrationServiceSettings settings, ILoggerFactory loggerFactory) { - this.Settings = settings; - TransportConnectionString.Parse(this.Settings.ResolvedTransportConnectionString, out this.configuredStorage, out this.configuredTransport); - this.Logger = loggerFactory.CreateLogger(LoggerCategoryName); this.LoggerFactory = loggerFactory; - this.StorageAccountName = this.configuredStorage == TransportConnectionString.StorageChoices.Memory - ? "Memory" - : CloudStorageAccount.Parse(this.Settings.ResolvedStorageConnectionString).Credentials.AccountName; - - EtwSource.Log.OrchestrationServiceCreated(this.ServiceInstanceId, this.StorageAccountName, this.Settings.HubName, this.Settings.WorkerId, TraceUtils.AppName, TraceUtils.ExtensionVersion); - this.Logger.LogInformation("NetheriteOrchestrationService created, workerId={workerId}, processorCount={processorCount}, transport={transport}, storage={storage}", this.Settings.WorkerId, Environment.ProcessorCount, this.configuredTransport, this.configuredStorage); - - if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster) + this.Settings = settings; + this.TraceHelper = new OrchestrationServiceTraceHelper(loggerFactory, settings.LogLevelLimit, settings.WorkerId, settings.HubName); + this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, settings.HubName); + + try { - // force dll load here so exceptions are observed early - var _ = System.Threading.Channels.Channel.CreateBounded(10); - - // throw descriptive exception if run on 32bit platform - if (!Environment.Is64BitProcess) + this.TraceHelper.TraceProgress("Reading configuration for transport and storage providers"); + TransportConnectionString.Parse(this.Settings.ResolvedTransportConnectionString, out this.configuredStorage, out this.configuredTransport); + this.StorageAccountName = this.configuredStorage == TransportConnectionString.StorageChoices.Memory + ? "Memory" + : CloudStorageAccount.Parse(this.Settings.ResolvedStorageConnectionString).Credentials.AccountName; + + // set the account name in the trace helpers + this.TraceHelper.StorageAccountName = this.workItemTraceHelper.StorageAccountName = this.StorageAccountName; + + this.TraceHelper.TraceCreated(Environment.ProcessorCount, this.configuredTransport, this.configuredStorage); + + if (this.configuredStorage == TransportConnectionString.StorageChoices.Faster) { - throw new NotSupportedException("Netherite backend requires 64bit, but current process is 32bit."); + // force dll load here so exceptions are observed early + var _ = System.Threading.Channels.Channel.CreateBounded(10); + + // throw descriptive exception if run on 32bit platform + if (!Environment.Is64BitProcess) + { + throw new NotSupportedException("Netherite backend requires 64bit, but current process is 32bit."); + } } - } - switch (this.configuredTransport) - { - case TransportConnectionString.TransportChoices.Memory: - this.taskHub = new Emulated.MemoryTransport(this, settings, this.Logger); - break; + switch (this.configuredTransport) + { + case TransportConnectionString.TransportChoices.Memory: + this.taskHub = new Emulated.MemoryTransport(this, settings, this.TraceHelper.Logger); + break; - case TransportConnectionString.TransportChoices.EventHubs: - this.taskHub = new EventHubs.EventHubsTransport(this, settings, loggerFactory); - break; + case TransportConnectionString.TransportChoices.EventHubs: + this.taskHub = new EventHubs.EventHubsTransport(this, settings, loggerFactory); + break; - default: - throw new NotImplementedException("no such transport choice"); - } + default: + throw new NotImplementedException("no such transport choice"); + } - this.workItemTraceHelper = new WorkItemTraceHelper(loggerFactory, settings.WorkItemLogLevelLimit, this.StorageAccountName, this.Settings.HubName); - if (this.configuredTransport != TransportConnectionString.TransportChoices.Memory) - { - if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName)) - { - this.LoadMonitorService = new AzureTableLoadMonitor(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName); - } - else + if (this.configuredTransport != TransportConnectionString.TransportChoices.Memory) { - this.LoadMonitorService = new AzureBlobLoadMonitor(settings.ResolvedStorageConnectionString, settings.HubName); + this.TraceHelper.TraceProgress("Creating LoadMonitor Service"); + if (!string.IsNullOrEmpty(settings.LoadInformationAzureTableName)) + { + this.LoadMonitorService = new AzureTableLoadMonitor(settings.ResolvedStorageConnectionString, settings.LoadInformationAzureTableName, settings.HubName); + } + else + { + this.LoadMonitorService = new AzureBlobLoadMonitor(settings.ResolvedStorageConnectionString, settings.HubName); + } } - } - this.workItemStopwatch.Start(); + this.workItemStopwatch.Start(); - this.Logger.LogInformation( - "trace generation limits: general={general} , transport={transport}, storage={storage}, events={events}; workitems={workitems}; etwEnabled={etwEnabled}; core.IsTraceEnabled={core}", - settings.LogLevelLimit, - settings.TransportLogLevelLimit, - settings.StorageLogLevelLimit, - settings.EventLogLevelLimit, - settings.WorkItemLogLevelLimit, - EtwSource.Log.IsEnabled(), - DurableTask.Core.Tracing.DefaultEventSource.Log.IsTraceEnabled); + this.TraceHelper.TraceProgress( + $"Configured trace generation limits: general={settings.LogLevelLimit} , transport={settings.TransportLogLevelLimit}, storage={settings.StorageLogLevelLimit}, " + + $"events={settings.EventLogLevelLimit}; workitems={settings.WorkItemLogLevelLimit}; etwEnabled={EtwSource.Log.IsEnabled()}; " + + $"core.IsTraceEnabled={DurableTask.Core.Tracing.DefaultEventSource.Log.IsTraceEnabled}"); + } + catch (Exception e) when (!Utils.IsFatal(e)) + { + this.TraceHelper.TraceError("Could not create NetheriteOrchestrationService", e); + throw; + } } /// @@ -168,8 +175,7 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) this.Settings.ResolvedTransportConnectionString, this.Settings.LoadInformationAzureTableName, this.Settings.HubName, - this.TraceScaleRecommendation, - this.LoggerFactory.CreateLogger($"{LoggerCategoryName}.Scaling")); + this.TraceHelper.TraceScaleRecommendation); return true; } else @@ -179,10 +185,7 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) } } - void TraceScaleRecommendation(string action, int workerCount, string details) - { - EtwSource.Log.OrchestrationServiceScaleRecommendation(this.StorageAccountName, this.Settings.HubName, action, workerCount, details, TraceUtils.AppName, TraceUtils.ExtensionVersion); - } + /******************************/ // storage provider @@ -193,7 +196,7 @@ IPartitionState IStorageProvider.CreatePartitionState() switch (this.configuredStorage) { case TransportConnectionString.StorageChoices.Memory: - return new MemoryStorage(this.Logger); + return new MemoryStorage(this.TraceHelper.Logger); case TransportConnectionString.StorageChoices.Faster: return new Faster.FasterStorage(this.Settings.ResolvedStorageConnectionString, this.Settings.ResolvedPageBlobStorageConnectionString, this.Settings.UseLocalDirectoryForPartitionStorage, this.Settings.HubName, this.LoggerFactory); @@ -277,10 +280,11 @@ public async Task StartAsync() if (this.serviceShutdownSource != null) { // we left the service running. No need to start it again. + this.TraceHelper.TraceProgress("Reusing"); return; } - this.Logger.LogInformation("NetheriteOrchestrationService is starting on workerId={workerId}", this.Settings.WorkerId); + this.TraceHelper.TraceProgress("Starting"); this.serviceShutdownSource = new CancellationTokenSource(); @@ -288,30 +292,43 @@ public async Task StartAsync() this.OrchestrationWorkItemQueue = new WorkItemQueue(); LeaseTimer.Instance.DelayWarning = (int delay) => - this.Logger.LogWarning("NetheriteOrchestrationService lease timer on workerId={workerId} is running {delay}s behind schedule", this.Settings.WorkerId, delay); + this.TraceHelper.TraceWarning($"Lease timer is running {delay}s behind schedule"); if (!(this.LoadMonitorService is null)) - this.LoadPublisher = new LoadPublisher(this.LoadMonitorService, CancellationToken.None, this.Logger); + { + this.TraceHelper.TraceProgress("Starting Load Publisher"); + this.LoadPublisher = new LoadPublisher(this.LoadMonitorService, CancellationToken.None, this.TraceHelper); + } - await this.taskHub.StartAsync().ConfigureAwait(false); + this.TraceHelper.TraceProgress("Starting TaskHub"); + await this.taskHub.StartAsync(); if (this.Settings.PartitionCount != this.NumberPartitions) { - this.Logger.LogWarning("NetheriteOrchestrationService is ignoring configuration setting partitionCount={specifiedPartitions} because existing TaskHub has {actualPartitions} partitions", this.Settings.PartitionCount, this.NumberPartitions); + this.TraceHelper.TraceWarning($"Ignoring configuration setting partitionCount={this.Settings.PartitionCount} because existing TaskHub has {this.NumberPartitions} partitions"); } System.Diagnostics.Debug.Assert(this.client != null, "Backend should have added client"); + + this.TraceHelper.TraceProgress($"Started partitionCount={this.NumberPartitions}"); } catch (Exception e) when (!Utils.IsFatal(e)) { this.startupException = e; - string message = $"NetheriteOrchestrationService failed to start: {e.Message}"; - EtwSource.Log.OrchestrationServiceError(this.StorageAccountName, message, e.ToString(), this.Settings.HubName, this.Settings.WorkerId, TraceUtils.AppName, TraceUtils.ExtensionVersion); - this.Logger.LogError("NetheriteOrchestrationService failed to start: {exception}", e); - this.serviceShutdownSource.Cancel(); - this.serviceShutdownSource.Dispose(); - this.serviceShutdownSource = null; + this.TraceHelper.TraceError($"Failed to start: {e.Message}", e); + + // invoke cancellation so that any partially-started partitions and event loops are terminated + try + { + this.serviceShutdownSource.Cancel(); + this.serviceShutdownSource.Dispose(); + this.serviceShutdownSource = null; + } + catch(Exception shutdownException) + { + this.TraceHelper.TraceError($"Exception while shutting down service: {shutdownException.Message}", shutdownException); + } throw; } @@ -322,8 +339,7 @@ public async Task StopAsync(bool quickly) { try { - - this.Logger.LogInformation("NetheriteOrchestrationService stopping, workerId={workerId} quickly={quickly}", this.Settings.WorkerId, quickly); + this.TraceHelper.TraceProgress($"Stopping quickly={quickly}"); if (!this.Settings.KeepServiceRunning && this.serviceShutdownSource != null) { @@ -331,22 +347,23 @@ public async Task StopAsync(bool quickly) this.serviceShutdownSource.Dispose(); this.serviceShutdownSource = null; - await this.taskHub.StopAsync().ConfigureAwait(false); + await this.taskHub.StopAsync(); this.ActivityWorkItemQueue.Dispose(); this.OrchestrationWorkItemQueue.Dispose(); - - this.Logger.LogInformation("NetheriteOrchestrationService stopped, workerId={workerId}", this.Settings.WorkerId); - EtwSource.Log.OrchestrationServiceStopped(this.ServiceInstanceId, this.StorageAccountName, this.Settings.HubName, this.Settings.WorkerId, TraceUtils.AppName, TraceUtils.ExtensionVersion); } + + this.TraceHelper.TraceProgress("Stopped cleanly"); } catch (Exception e) when (!Utils.IsFatal(e)) { - string message = $"NetheriteOrchestrationService failed to shut down: {e.Message}"; - EtwSource.Log.OrchestrationServiceError(this.StorageAccountName, message, e.ToString(), this.Settings.HubName, this.Settings.WorkerId, TraceUtils.AppName, TraceUtils.ExtensionVersion); - this.Logger.LogError("NetheriteOrchestrationService failed to shut down: {exception}", e); + this.TraceHelper.TraceError($"Failed to stop cleanly: {e.Message}", e); throw; } + finally + { + this.TraceHelper.TraceStopped(); + } } /// @@ -408,7 +425,7 @@ TransportAbstraction.ILoadMonitor TransportAbstraction.IHost.AddLoadMonitor(Guid IPartitionErrorHandler TransportAbstraction.IHost.CreateErrorHandler(uint partitionId) { - return new PartitionErrorHandler((int) partitionId, this.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName); + return new PartitionErrorHandler((int) partitionId, this.TraceHelper.Logger, this.Settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName); } /******************************/ diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 61326760..83bbe0ec 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -77,7 +77,7 @@ public Partition( this.ActivityWorkItemQueue = activityWorkItemQueue; this.OrchestrationWorkItemQueue = orchestrationWorkItemQueue; this.LoadPublisher = loadPublisher; - this.TraceHelper = new PartitionTraceHelper(host.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId); + this.TraceHelper = new PartitionTraceHelper(host.TraceHelper.Logger, settings.LogLevelLimit, this.StorageAccountName, this.Settings.HubName, this.PartitionId); this.EventTraceHelper = new EventTraceHelper(host.LoggerFactory, settings.EventLogLevelLimit, this); this.WorkItemTraceHelper = workItemTraceHelper; this.stopwatch.Start(); diff --git a/src/DurableTask.Netherite/PartitionState/HistoryState.cs b/src/DurableTask.Netherite/PartitionState/HistoryState.cs index b133f034..b6c67701 100644 --- a/src/DurableTask.Netherite/PartitionState/HistoryState.cs +++ b/src/DurableTask.Netherite/PartitionState/HistoryState.cs @@ -74,11 +74,13 @@ public void Process(BatchProcessed evt, EffectTracker effects) if (!effects.IsReplaying) { this.Partition.EventTraceHelper.TraceInstanceUpdate( - evt.WorkItemId, + evt.EventIdString, evt.State.OrchestrationInstance.InstanceId, evt.State.OrchestrationInstance.ExecutionId, + evt.State.OrchestrationStatus, this.History.Count, - evt.NewEvents, this.Episode); + evt.NewEvents, + this.Episode); // if present, we keep the work item so we can reuse the execution cursor this.CachedOrchestrationWorkItem = evt.WorkItemForReuse; diff --git a/src/DurableTask.Netherite/Scaling/LoadPublisher.cs b/src/DurableTask.Netherite/Scaling/LoadPublisher.cs index 2919745d..7eb207c7 100644 --- a/src/DurableTask.Netherite/Scaling/LoadPublisher.cs +++ b/src/DurableTask.Netherite/Scaling/LoadPublisher.cs @@ -12,16 +12,16 @@ namespace DurableTask.Netherite.Scaling class LoadPublisher : BatchWorker<(uint, PartitionLoadInfo)> { readonly ILoadMonitorService service; - readonly ILogger logger; + readonly OrchestrationServiceTraceHelper traceHelper; // we are pushing the aggregated load information on a somewhat slower interval public static TimeSpan AggregatePublishInterval = TimeSpan.FromSeconds(2); readonly CancellationTokenSource cancelWait = new CancellationTokenSource(); - public LoadPublisher(ILoadMonitorService service, CancellationToken token, ILogger logger) : base(nameof(LoadPublisher), false, int.MaxValue, token, null) + public LoadPublisher(ILoadMonitorService service, CancellationToken token, OrchestrationServiceTraceHelper traceHelper) : base(nameof(LoadPublisher), false, int.MaxValue, token, null) { this.service = service; - this.logger = logger; + this.traceHelper = traceHelper; this.cancelWait = new CancellationTokenSource(); } @@ -53,7 +53,7 @@ protected override async Task Process(IList<(uint, PartitionLoadInfo)> batch) catch (Exception exception) { // we swallow exceptions so we can tolerate temporary Azure storage errors - this.logger.LogWarning("LoadPublisher failed: {exception}", exception); + this.traceHelper.TraceError("LoadPublisher failed", exception); } } diff --git a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs index ea1175b9..49834ff6 100644 --- a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs +++ b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs @@ -50,14 +50,12 @@ public ScalingMonitor( string eventHubsConnectionString, string partitionLoadTableName, string taskHubName, - Action recommendationTracer, - ILogger logger) + Action recommendationTracer) { this.storageConnectionString = storageConnectionString; this.eventHubsConnectionString = eventHubsConnectionString; this.partitionLoadTableName = partitionLoadTableName; this.taskHubName = taskHubName; - this.Logger = logger; this.recommendationTracer = recommendationTracer; TransportConnectionString.Parse(eventHubsConnectionString, out _, out this.configuredTransport); @@ -129,11 +127,7 @@ public ScaleRecommendation GetScaleRecommendation(int workerCount, Metrics metri { var recommendation = DetermineRecommendation(); - this.Logger.LogInformation( - "Netherite autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}", - recommendation.Action.ToString(), workerCount, recommendation.Reason); - - this.recommendationTracer(recommendation.Action.ToString(), workerCount, recommendation.Reason); + this.recommendationTracer?.Invoke(recommendation.Action.ToString(), workerCount, recommendation.Reason); return recommendation; diff --git a/src/DurableTask.Netherite/Tracing/EtwSource.cs b/src/DurableTask.Netherite/Tracing/EtwSource.cs index 49d51535..b473a91e 100644 --- a/src/DurableTask.Netherite/Tracing/EtwSource.cs +++ b/src/DurableTask.Netherite/Tracing/EtwSource.cs @@ -31,11 +31,11 @@ class EtwSource : EventSource // and since there is only one of these per machine, we can save its id in this static field. static Guid serviceInstanceId; - [Event(200, Level = EventLevel.Informational, Opcode = EventOpcode.Start, Version = 1)] - public void OrchestrationServiceCreated(Guid OrchestrationServiceInstanceId, string Account, string TaskHub, string WorkerName, string AppName, string ExtensionVersion) + [Event(200, Level = EventLevel.Informational, Opcode = EventOpcode.Start, Version = 2)] + public void OrchestrationServiceCreated(Guid OrchestrationServiceInstanceId, string Transport, string Storage, string Account, string TaskHub, string WorkerName, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(OrchestrationServiceInstanceId); - this.WriteEvent(200, OrchestrationServiceInstanceId, Account, TaskHub, WorkerName, AppName, ExtensionVersion); + this.WriteEvent(200, OrchestrationServiceInstanceId, Transport, Storage, Account, TaskHub, WorkerName, AppName, ExtensionVersion); EtwSource.serviceInstanceId = OrchestrationServiceInstanceId; } @@ -53,8 +53,24 @@ public void OrchestrationServiceError(string Account, string Message, string Det this.WriteEvent(202, Account, Message, Details, TaskHub, WorkerName, AppName, ExtensionVersion); } - [Event(203, Level = EventLevel.Informational, Version = 1)] - public void OrchestrationServiceScaleRecommendation(string Account, string TaskHub, string Action, int WorkerCount, string Details, string AppName, string ExtensionVersion) + [Event(204, Level = EventLevel.Verbose, Version = 2)] + public void OrchestrationServiceProgress(string Account, string Details, string TaskHub, string WorkerName, string AppName, string ExtensionVersion) + { + SetCurrentThreadActivityId(serviceInstanceId); + this.WriteEvent(204, Account, Details, TaskHub, WorkerName, AppName, ExtensionVersion); + } + + [Event(205, Level = EventLevel.Warning, Version = 1)] + public void OrchestrationServiceWarning(string Account, string Details, string TaskHub, string WorkerName, string AppName, string ExtensionVersion) + { + SetCurrentThreadActivityId(serviceInstanceId); + this.WriteEvent(205, Account, Details, TaskHub, WorkerName, AppName, ExtensionVersion); + } + + // ----- orchestration service components + + [Event(203, Level = EventLevel.Informational, Version = 2)] + public void NetheriteScaleRecommendation(string Account, string TaskHub, string Action, int WorkerCount, string Details, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); this.WriteEvent(203, Account, TaskHub, Action, WorkerCount, Details, AppName, ExtensionVersion); @@ -63,10 +79,10 @@ public void OrchestrationServiceScaleRecommendation(string Account, string TaskH // ----- partition and client lifecycles [Event(210, Level = EventLevel.Informational, Version = 1)] - public void PartitionProgress(string Account, string TaskHub, int PartitionId, string Transition, double LatencyMs, string Details, string AppName, string ExtensionVersion) + public void PartitionProgress(string Account, string TaskHub, int PartitionId, string Transition, double ElapsedMs, string Details, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(210, Account, TaskHub, PartitionId, Transition, LatencyMs, Details, AppName, ExtensionVersion); + this.WriteEvent(210, Account, TaskHub, PartitionId, Transition, ElapsedMs, Details, AppName, ExtensionVersion); } [Event(211, Level = EventLevel.Warning, Version = 1)] @@ -155,11 +171,11 @@ public void WorkItemStarted(string Account, string TaskHub, int PartitionId, str this.WriteEvent(224, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, ExecutionType, ConsumedMessageIds, AppName, ExtensionVersion); } - [Event(225, Level = EventLevel.Informational, Version = 1)] - public void WorkItemCompleted(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string Status, double LatencyMs, long ProducedMessages, string AppName, string ExtensionVersion) + [Event(225, Level = EventLevel.Informational, Version = 3)] + public void WorkItemCompleted(string Account, string TaskHub, int PartitionId, string WorkItemType, string WorkItemId, string InstanceId, string RuntimeStatus, double ElapsedMs, long ProducedMessages, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(225, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, Status, LatencyMs, ProducedMessages, AppName, ExtensionVersion); + this.WriteEvent(225, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, RuntimeStatus, ElapsedMs, ProducedMessages, AppName, ExtensionVersion); } [Event(226, Level = EventLevel.Warning, Version = 1)] @@ -169,34 +185,34 @@ public void WorkItemDiscarded(string Account, string TaskHub, int PartitionId, s this.WriteEvent(226, Account, TaskHub, PartitionId, WorkItemType, WorkItemId, InstanceId, Details, ReplacedBy, AppName, ExtensionVersion); } - [Event(227, Level = EventLevel.Verbose, Version = 1)] - public void InstanceUpdated(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, string WorkItemId, int NewEventCount, int TotalEventCount, string NewEvents, string EventType, int Episode, string AppName, string ExtensionVersion) + [Event(227, Level = EventLevel.Verbose, Version = 2)] + public void InstanceUpdated(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, string PartitionEventId, string RuntimeStatus, int NewEventCount, int EventCount, string NewEvents, string EventType, int Episode, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(227, Account, TaskHub, PartitionId, InstanceId, ExecutionId, WorkItemId, NewEventCount, TotalEventCount, NewEvents, EventType, Episode, AppName, ExtensionVersion); + this.WriteEvent(227, Account, TaskHub, PartitionId, InstanceId, ExecutionId, PartitionEventId, RuntimeStatus, NewEventCount, EventCount, NewEvents, EventType, Episode, AppName, ExtensionVersion); } - [Event(228, Level = EventLevel.Verbose, Version = 1)] - public void InstanceStatusFetched(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, string status, string PartitionEventId, double LatencyMs, string AppName, string ExtensionVersion) + [Event(228, Level = EventLevel.Verbose, Version = 2)] + public void InstanceStatusFetched(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, string RuntimeStatus, string PartitionEventId, double ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(228, Account, TaskHub, PartitionId, InstanceId, ExecutionId, status, PartitionEventId, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(228, Account, TaskHub, PartitionId, InstanceId, ExecutionId, RuntimeStatus, PartitionEventId, ElapsedMs, AppName, ExtensionVersion); } - [Event(229, Level = EventLevel.Verbose, Version = 1)] - public void InstanceHistoryFetched(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, int EventCount, int Episode, string PartitionEventId, double LatencyMs, string AppName, string ExtensionVersion) + [Event(229, Level = EventLevel.Verbose, Version = 2)] + public void InstanceHistoryFetched(string Account, string TaskHub, int PartitionId, string InstanceId, string ExecutionId, int EventCount, int Episode, string PartitionEventId, double ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(229, Account, TaskHub, PartitionId, InstanceId, ExecutionId, EventCount, Episode, PartitionEventId, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(229, Account, TaskHub, PartitionId, InstanceId, ExecutionId, EventCount, Episode, PartitionEventId, ElapsedMs, AppName, ExtensionVersion); } // ----- general event processing and statistics - [Event(240, Level = EventLevel.Informational, Version = 1)] - public void PartitionEventProcessed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, string Category, string PartitionEventId, string EventInfo, long NextCommitLogPosition, long NextInputQueuePosition, double QueueLatencyMs, double FetchLatencyMs, double LatencyMs, bool IsReplaying, string AppName, string ExtensionVersion) + [Event(240, Level = EventLevel.Informational, Version = 3)] + public void PartitionEventProcessed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, string Category, string PartitionEventId, string EventInfo, string InstanceId, long NextCommitLogPosition, long NextInputQueuePosition, double QueueElapsedMs, double FetchElapsedMs, double ElapsedMs, bool IsReplaying, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(240, Account, TaskHub, PartitionId, CommitLogPosition, Category, PartitionEventId, EventInfo, NextCommitLogPosition, NextInputQueuePosition, QueueLatencyMs, FetchLatencyMs, LatencyMs, IsReplaying, AppName, ExtensionVersion); + this.WriteEvent(240, Account, TaskHub, PartitionId, CommitLogPosition, Category, PartitionEventId, EventInfo, InstanceId, NextCommitLogPosition, NextInputQueuePosition, QueueElapsedMs, FetchElapsedMs, ElapsedMs, IsReplaying, AppName, ExtensionVersion); } [Event(241, Level = EventLevel.Verbose, Version = 1)] @@ -241,20 +257,20 @@ public void PartitionLoadPublished(string Account, string TaskHub, int Partition this.WriteEvent(246, Account, TaskHub, PartitionId, WorkItems, Activities, Timers, Requests, Outbox, NextTimer, WorkerId, LatencyTrend, MissRate, InputQueuePosition, CommitLogPosition, AppName, ExtensionVersion); } - [Event(247, Level = EventLevel.Verbose, Version = 1)] - public void BatchWorkerProgress(string Account, string TaskHub, string PartitionId, string Worker, int BatchSize, double ElapsedMilliseconds, string NextBatch, string AppName, string ExtensionVersion) + [Event(247, Level = EventLevel.Verbose, Version = 2)] + public void BatchWorkerProgress(string Account, string TaskHub, string PartitionId, string Worker, int BatchSize, double ElapsedMs, string NextBatch, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(247, Account, TaskHub, PartitionId, Worker, BatchSize, ElapsedMilliseconds, NextBatch, AppName, ExtensionVersion); + this.WriteEvent(247, Account, TaskHub, PartitionId, Worker, BatchSize, ElapsedMs, NextBatch, AppName, ExtensionVersion); } // ----- Faster Storage - [Event(250, Level = EventLevel.Informational, Version = 1)] - public void FasterStoreCreated(string Account, string TaskHub, int PartitionId, long InputQueuePosition, long LatencyMs, string AppName, string ExtensionVersion) + [Event(250, Level = EventLevel.Informational, Version = 2)] + public void FasterStoreCreated(string Account, string TaskHub, int PartitionId, long InputQueuePosition, long ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(250, Account, TaskHub, PartitionId, InputQueuePosition, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(250, Account, TaskHub, PartitionId, InputQueuePosition, ElapsedMs, AppName, ExtensionVersion); } [Event(251, Level = EventLevel.Informational, Version = 1)] @@ -264,32 +280,32 @@ public void FasterCheckpointStarted(string Account, string TaskHub, int Partitio this.WriteEvent(251, Account, TaskHub, PartitionId, CheckpointId, Details, StoreStats, CommitLogPosition, InputQueuePosition, AppName, ExtensionVersion); } - [Event(252, Level = EventLevel.Informational, Version = 1)] - public void FasterCheckpointPersisted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, long CommitLogPosition, long InputQueuePosition, long LatencyMs, string AppName, string ExtensionVersion) + [Event(252, Level = EventLevel.Informational, Version = 2)] + public void FasterCheckpointPersisted(string Account, string TaskHub, int PartitionId, Guid CheckpointId, string Details, long CommitLogPosition, long InputQueuePosition, long ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(252, Account, TaskHub, PartitionId, CheckpointId, Details, CommitLogPosition, InputQueuePosition, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(252, Account, TaskHub, PartitionId, CheckpointId, Details, CommitLogPosition, InputQueuePosition, ElapsedMs, AppName, ExtensionVersion); } - [Event(253, Level = EventLevel.Verbose, Version = 1)] - public void FasterLogPersisted(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long NumberEvents, long SizeInBytes, long LatencyMs, string AppName, string ExtensionVersion) + [Event(253, Level = EventLevel.Verbose, Version = 2)] + public void FasterLogPersisted(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long NumberEvents, long SizeInBytes, long ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(253, Account, TaskHub, PartitionId, CommitLogPosition, NumberEvents, SizeInBytes, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(253, Account, TaskHub, PartitionId, CommitLogPosition, NumberEvents, SizeInBytes, ElapsedMs, AppName, ExtensionVersion); } - [Event(254, Level = EventLevel.Informational, Version = 1)] - public void FasterCheckpointLoaded(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, string StoreStats, long LatencyMs, string AppName, string ExtensionVersion) + [Event(254, Level = EventLevel.Informational, Version = 2)] + public void FasterCheckpointLoaded(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(254, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, StoreStats, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(254, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, StoreStats, ElapsedMs, AppName, ExtensionVersion); } - [Event(255, Level = EventLevel.Informational, Version = 1)] - public void FasterLogReplayed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, long NumberEvents, long SizeInBytes, string StoreStats, long LatencyMs, string AppName, string ExtensionVersion) + [Event(255, Level = EventLevel.Informational, Version = 2)] + public void FasterLogReplayed(string Account, string TaskHub, int PartitionId, long CommitLogPosition, long InputQueuePosition, long NumberEvents, long SizeInBytes, string StoreStats, long ElapsedMs, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); - this.WriteEvent(255, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, NumberEvents, SizeInBytes, StoreStats, LatencyMs, AppName, ExtensionVersion); + this.WriteEvent(255, Account, TaskHub, PartitionId, CommitLogPosition, InputQueuePosition, NumberEvents, SizeInBytes, StoreStats, ElapsedMs, AppName, ExtensionVersion); } [Event(256, Level = EventLevel.Error, Version = 1)] @@ -341,7 +357,7 @@ public void FasterLeaseProgress(string Account, string TaskHub, int PartitionId, this.WriteEvent(264, Account, TaskHub, PartitionId, Details, AppName, ExtensionVersion); } - [Event(265, Level = EventLevel.Verbose, Version = 1)] + [Event(265, Level = EventLevel.Verbose, Version = 2)] public void FasterStorageProgress(string Account, string TaskHub, int PartitionId, string Details, string AppName, string ExtensionVersion) { SetCurrentThreadActivityId(serviceInstanceId); diff --git a/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs b/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs index c56f0a56..c97eff8a 100644 --- a/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs +++ b/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs @@ -56,17 +56,18 @@ public void TraceEventProcessed(long commitLogPosition, PartitionEvent evt, Even if (this.logger.IsEnabled(LogLevel.Information)) { var details = string.Format($"{(replaying ? "Replayed" : "Processed")} {(evt.NextInputQueuePosition > 0 ? "external" : "internal")} {category}"); - this.logger.LogInformation("Part{partition:D2}.{commitLogPosition:D10} {details} {event} eventId={eventId} pos=({nextCommitLogPosition},{nextInputQueuePosition}) latency=({queueLatencyMs:F0}, {fetchLatencyMs:F0}, {latencyMs:F0})", this.partitionId, commitLogPosition, details, evt, evt.EventIdString, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs); + this.logger.LogInformation("Part{partition:D2}.{commitLogPosition:D10} {details} {event} eventId={eventId} instanceId={instanceId} pos=({nextCommitLogPosition},{nextInputQueuePosition}) latency=({queueLatencyMs:F0}, {fetchLatencyMs:F0}, {latencyMs:F0})", this.partitionId, commitLogPosition, details, evt, evt.EventIdString, evt.TracedInstanceId, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs); } - this.etw?.PartitionEventProcessed(this.account, this.taskHub, this.partitionId, commitLogPosition, category.ToString(), evt.EventIdString, evt.ToString(), nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs, replaying, TraceUtils.AppName, TraceUtils.ExtensionVersion) ; + this.etw?.PartitionEventProcessed(this.account, this.taskHub, this.partitionId, commitLogPosition, category.ToString(), evt.EventIdString, evt.ToString(), evt.TracedInstanceId ?? string.Empty, nextCommitLogPosition, evt.NextInputQueuePosition, queueLatencyMs, fetchLatencyMs, latencyMs, replaying, TraceUtils.AppName, TraceUtils.ExtensionVersion) ; } } public void TraceInstanceUpdate( - string workItemId, + string partitionEventId, string instanceId, - string executionId, + string executionId, + OrchestrationStatus runtimeStatus, int totalEventCount, List newEvents, int episode) @@ -120,15 +121,15 @@ public void TraceInstanceUpdate( (long commitLogPosition, string eventId) = EventTraceContext.Current; string prefix = commitLogPosition > 0 ? $".{commitLogPosition:D10} " : ""; - this.logger.LogDebug("Part{partition:D2}{prefix} Updated instance instanceId={instanceId} executionId={executionId} workItemId={workItemId} numNewEvents={numNewEvents} totalEventCount={totalEventCount} eventNames={eventNames} eventType={eventType} episode={episode}", - this.partitionId, prefix, instanceId, executionId, workItemId, numNewEvents, totalEventCount, eventNames, eventType, episode); + this.logger.LogDebug("Part{partition:D2}{prefix} Updated instance instanceId={instanceId} executionId={executionId} partitionEventId={partitionEventId} runtimeStatus={runtimeStatus} numNewEvents={numNewEvents} totalEventCount={totalEventCount} eventNames={eventNames} eventType={eventType} episode={episode}", + this.partitionId, prefix, instanceId, executionId, partitionEventId, runtimeStatus, numNewEvents, totalEventCount, eventNames, eventType, episode); } - this.etw?.InstanceUpdated(this.account, this.taskHub, this.partitionId, instanceId, executionId, workItemId, numNewEvents, totalEventCount, eventNames, eventType, episode, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.InstanceUpdated(this.account, this.taskHub, this.partitionId, instanceId, executionId, partitionEventId, runtimeStatus.ToString(), numNewEvents, totalEventCount, eventNames, eventType, episode, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } - public void TraceFetchedInstanceStatus(PartitionReadEvent evt, string instanceId, string executionId, string status, double latencyMs) + public void TraceFetchedInstanceStatus(PartitionReadEvent evt, string instanceId, string executionId, string runtimeStatus, double latencyMs) { if (this.logLevelLimit <= LogLevel.Debug) { @@ -137,11 +138,11 @@ public void TraceFetchedInstanceStatus(PartitionReadEvent evt, string instanceId (long commitLogPosition, string eventId) = EventTraceContext.Current; string prefix = commitLogPosition > 0 ? $".{commitLogPosition:D10} " : ""; - this.logger.LogDebug("Part{partition:D2}{prefix} Fetched instance status instanceId={instanceId} executionId={executionId} status={status} eventId={eventId} latencyMs={latencyMs:F0}", - this.partitionId, prefix, instanceId, executionId, status, evt.EventIdString, latencyMs); + this.logger.LogDebug("Part{partition:D2}{prefix} Fetched instance status instanceId={instanceId} executionId={executionId} runtimeStatus={runtimeStatus} eventId={eventId} latencyMs={latencyMs:F0}", + this.partitionId, prefix, instanceId, executionId, runtimeStatus, evt.EventIdString, latencyMs); } - this.etw?.InstanceStatusFetched(this.account, this.taskHub, this.partitionId, instanceId, executionId, status, evt.EventIdString, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.InstanceStatusFetched(this.account, this.taskHub, this.partitionId, instanceId, executionId, runtimeStatus, evt.EventIdString, latencyMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } diff --git a/src/DurableTask.Netherite/Tracing/OrchestrationServiceTraceHelper.cs b/src/DurableTask.Netherite/Tracing/OrchestrationServiceTraceHelper.cs new file mode 100644 index 00000000..72c23daf --- /dev/null +++ b/src/DurableTask.Netherite/Tracing/OrchestrationServiceTraceHelper.cs @@ -0,0 +1,122 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite +{ + using Microsoft.Extensions.Logging; + using System; + using System.Collections.Generic; + using System.Text; + + class OrchestrationServiceTraceHelper + { + public ILogger Logger { get; private set; } + + public string StorageAccountName { private get; set; } = string.Empty; + + readonly string taskHub; + readonly LogLevel logLevelLimit; + readonly string workerName; + readonly Guid serviceInstanceId; + + + public OrchestrationServiceTraceHelper(ILoggerFactory loggerFactory, LogLevel logLevelLimit, string workerName, string taskHubName) + { + this.Logger = loggerFactory.CreateLogger(NetheriteOrchestrationService.LoggerCategoryName); + this.taskHub = taskHubName; + this.logLevelLimit = logLevelLimit; + this.workerName = workerName; + this.serviceInstanceId = Guid.NewGuid(); + } + + public void TraceCreated(int processorCount, TransportConnectionString.TransportChoices transport, TransportConnectionString.StorageChoices storage) + { + if (this.logLevelLimit <= LogLevel.Information) + { + if (this.Logger.IsEnabled(LogLevel.Information)) + { + this.Logger.LogInformation("NetheriteOrchestrationService created, workerId={workerId}, processorCount={processorCount}, transport={transport}, storage={storage}", this.workerName, Environment.ProcessorCount, transport, storage); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.OrchestrationServiceCreated(this.serviceInstanceId, transport.ToString(), storage.ToString(), this.StorageAccountName, this.taskHub, this.workerName, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + + public void TraceStopped() + { + if (this.logLevelLimit <= LogLevel.Information) + { + if (this.Logger.IsEnabled(LogLevel.Information)) + { + this.Logger.LogInformation("NetheriteOrchestrationService stopped, workerId={workerId}", this.workerName); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.OrchestrationServiceStopped(this.serviceInstanceId, this.StorageAccountName, this.taskHub, this.workerName, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + + public void TraceProgress(string details) + { + if (this.logLevelLimit <= LogLevel.Information) + { + if (this.Logger.IsEnabled(LogLevel.Information)) + { + this.Logger.LogInformation("NetheriteOrchestrationService {details}", details); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.OrchestrationServiceProgress(this.StorageAccountName, details, this.taskHub, this.workerName, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + + public void TraceWarning(string details) + { + if (this.logLevelLimit <= LogLevel.Warning) + { + if (this.Logger.IsEnabled(LogLevel.Warning)) + { + this.Logger.LogInformation("NetheriteOrchestrationService {details}", details); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.OrchestrationServiceWarning(this.StorageAccountName, details, this.taskHub, this.workerName, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + + public void TraceError(string message, Exception exception) + { + if (this.logLevelLimit <= LogLevel.Error) + { + if (this.Logger.IsEnabled(LogLevel.Error)) + { + this.Logger.LogError("NetheriteOrchestrationService !!! {message}: {exception}", message, exception); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.OrchestrationServiceError(this.StorageAccountName, message, exception.ToString(), this.taskHub, this.workerName, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + + public void TraceScaleRecommendation(string scaleRecommendation, int workerCount, string reason) + { + if (this.logLevelLimit <= LogLevel.Information) + { + if (this.Logger.IsEnabled(LogLevel.Information)) + { + this.Logger.LogInformation("NetheriteOrchestrationService autoscaler recommends: {scaleRecommendation} from: {workerCount} because: {reason}", scaleRecommendation, workerCount, reason); + } + if (EtwSource.Log.IsEnabled()) + { + EtwSource.Log.NetheriteScaleRecommendation(this.StorageAccountName, this.taskHub, scaleRecommendation, workerCount, reason, TraceUtils.AppName, TraceUtils.ExtensionVersion); + } + } + } + } +} \ No newline at end of file diff --git a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs index 160038fc..4fcc45ba 100644 --- a/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs +++ b/src/DurableTask.Netherite/Tracing/WorkItemTraceHelper.cs @@ -16,9 +16,9 @@ class WorkItemTraceHelper { readonly ILogger logger; readonly LogLevel logLevelLimit; - readonly string account; readonly string taskHub; readonly EtwSource etw; + public string StorageAccountName { private get; set; } = string.Empty; public static string FormatMessageId(TaskMessage message, string workItem) => $"{workItem}M{message.SequenceNumber}"; @@ -74,11 +74,10 @@ public enum ActivityStatus } - public WorkItemTraceHelper(ILoggerFactory loggerFactory, LogLevel logLevelLimit, string account, string taskHub) + public WorkItemTraceHelper(ILoggerFactory loggerFactory, LogLevel logLevelLimit, string taskHub) { this.logger = loggerFactory.CreateLogger($"{NetheriteOrchestrationService.LoggerCategoryName}.WorkItems"); this.logLevelLimit = logLevelLimit; - this.account = account; this.taskHub = taskHub; this.etw = EtwSource.Log.IsEnabled() ? EtwSource.Log : null; } @@ -93,7 +92,7 @@ public void TraceWorkItemQueued(uint partitionId, WorkItemType workItemType, str partitionId, workItemType, workItemId, instanceId, executionType, consumedMessageIds); } - this.etw?.WorkItemQueued(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, executionType, consumedMessageIds, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.WorkItemQueued(this.StorageAccountName, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, executionType, consumedMessageIds, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -107,7 +106,7 @@ public void TraceWorkItemStarted(uint partitionId, WorkItemType workItemType, st partitionId, workItemType, workItemId, instanceId, executionType, consumedMessageIds); } - this.etw?.WorkItemStarted(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, executionType, consumedMessageIds, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.WorkItemStarted(this.StorageAccountName, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, executionType, consumedMessageIds, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -124,7 +123,7 @@ public void TraceWorkItemDiscarded(uint partitionId, WorkItemType workItemType, partitionId, prefix, workItemType, workItemId, details, instanceId, replacedBy); } - this.etw?.WorkItemDiscarded(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, details, replacedBy ?? "", TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.WorkItemDiscarded(this.StorageAccountName, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, details, replacedBy ?? "", TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -138,7 +137,7 @@ public void TraceWorkItemCompleted(uint partitionId, WorkItemType workItemType, partitionId, workItemType, workItemId, instanceId, status, latencyMs, producedMessages); } - this.etw?.WorkItemCompleted(this.account, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, status.ToString(), latencyMs, producedMessages, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.WorkItemCompleted(this.StorageAccountName, this.taskHub, (int)partitionId, workItemType.ToString(), workItemId, instanceId, status.ToString(), latencyMs, producedMessages, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -158,7 +157,7 @@ public void TraceTaskMessageReceived(uint partitionId, TaskMessage message, stri partitionId, prefix, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId, queuePosition); } - this.etw?.TaskMessageReceived(this.account, this.taskHub, (int)partitionId, commitLogPosition, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", queuePosition, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.TaskMessageReceived(this.StorageAccountName, this.taskHub, (int)partitionId, commitLogPosition, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", queuePosition, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -166,7 +165,6 @@ public void TraceTaskMessageSent(uint partitionId, TaskMessage message, string w { if (this.logLevelLimit <= LogLevel.Trace) { - string messageId = FormatMessageId(message, workItemId); string persistenceDelayMs = persistenceDelay.HasValue ? persistenceDelay.Value.ToString("F2") : string.Empty; string sendDelayMs = sendDelay.HasValue ? sendDelay.Value.ToString("F2") : string.Empty; @@ -180,7 +178,7 @@ public void TraceTaskMessageSent(uint partitionId, TaskMessage message, string w partitionId, prefix, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId, persistenceDelayMs, sendDelayMs); } - this.etw?.TaskMessageSent(this.account, this.taskHub, (int)partitionId, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", persistenceDelayMs, sendDelayMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.TaskMessageSent(this.StorageAccountName, this.taskHub, (int)partitionId, messageId, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", persistenceDelayMs, sendDelayMs, TraceUtils.AppName, TraceUtils.ExtensionVersion); } } @@ -199,7 +197,7 @@ public void TraceTaskMessageDiscarded(uint partitionId, TaskMessage message, str partitionId, prefix, messageId, details, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId); } - this.etw?.TaskMessageDiscarded(this.account, this.taskHub, (int)partitionId, messageId, details, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", TraceUtils.AppName, TraceUtils.ExtensionVersion); + this.etw?.TaskMessageDiscarded(this.StorageAccountName, this.taskHub, (int)partitionId, messageId, details, message.Event.EventType.ToString(), TraceUtils.GetTaskEventId(message.Event), message.OrchestrationInstance.InstanceId, message.OrchestrationInstance.ExecutionId ?? "", TraceUtils.AppName, TraceUtils.ExtensionVersion); } } }