Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master to develop #44

Merged
merged 9 commits into from
Nov 13, 2017
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
[TestFixture]
public class IntegrationTests
{
const string EndpointName = "PerfCountersIntegrationTests";
static ManualResetEvent ManualResetEvent = new ManualResetEvent(false);

[Test]
public async Task Ensure_counters_are_written()
{
string message = null;

var endpointName = "PerfCountersIntegrationTests";
var endpointConfiguration = EndpointConfigBuilder.BuildEndpoint(endpointName);
var endpointConfiguration = EndpointConfigBuilder.BuildEndpoint(EndpointName);
endpointConfiguration.DefineCriticalErrorAction(
context =>
{
Expand All @@ -31,6 +31,17 @@ public async Task Ensure_counters_are_written()
var endpoint = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);

var criticalTime = GetCounter(PerformanceCountersFeature.CriticalTimeCounterName);
var processingTime = GetCounter(PerformanceCountersFeature.ProcessingTimeCounterName);

Assert.AreEqual(0, criticalTime.RawValue);
Assert.AreEqual(0, processingTime.RawValue);

var cancellation = new CancellationTokenSource();

var criticalTimeReading = ReadNonZero(criticalTime, cancellation);
var processingTimeReading = ReadNonZero(processingTime, cancellation);

await endpoint.SendLocal(new MyMessage())
.ConfigureAwait(false);

Expand All @@ -40,14 +51,13 @@ await Task.Delay(1500)
await endpoint.Stop()
.ConfigureAwait(false);

var criticalTimePerfCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.CriticalTimeCounterName, endpointName, true);
//var processingTimePerfCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.ProcessingTimeCounterName, endpointName, true);
var slaPerCounter = new PerformanceCounter("NServiceBus", SLAMonitoringFeature.CounterName, endpointName, true);
var messagesFailuresPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesFailuresPerSecondCounterName, endpointName, true);
var messagesProcessedPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesProcessedPerSecondCounterName, endpointName, true);
var messagesPulledPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesPulledPerSecondCounterName, endpointName, true);
Assert.AreNotEqual(0, criticalTimePerfCounter.RawValue);
//Assert.AreNotEqual(0, processingTimePerfCounter.RawValue);
cancellation.Cancel();
var slaPerCounter = GetCounter(SLAMonitoringFeature.CounterName);
var messagesFailuresPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesFailuresPerSecondCounterName);
var messagesProcessedPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesProcessedPerSecondCounterName);
var messagesPulledPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesPulledPerSecondCounterName);
Assert.True(await criticalTimeReading);
Assert.True(await processingTimeReading);
Assert.AreNotEqual(0, slaPerCounter.RawValue);
Assert.AreEqual(0, messagesFailuresPerSecondCounter.RawValue);
Assert.AreNotEqual(0, messagesProcessedPerSecondCounter.RawValue);
Expand All @@ -56,6 +66,23 @@ await endpoint.Stop()
Assert.IsNull(message);
}

static async Task<bool> ReadNonZero(PerformanceCounter counter, CancellationTokenSource cancellation)
{
while (counter.RawValue == 0)
{
if (cancellation.IsCancellationRequested)
{
return false;
}

await Task.Delay(TimeSpan.FromMilliseconds(10));
}

return true;
}

static PerformanceCounter GetCounter(string counterName) => new PerformanceCounter("NServiceBus", counterName, EndpointName, true);

public class MyHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ApprovalUtilities.Utilities;
using NServiceBus;
using NUnit.Framework;
Expand Down Expand Up @@ -156,6 +158,73 @@ static long CalculateAverageTimerCounterUpdate(TimeSpan d)
{
return d.Ticks * Stopwatch.Frequency / TimeSpan.TicksPerSecond;
}
[Test]
public async Task Durations_should_be_reported_as_zeros_after_specific_period_from_last_observed_pipeline_completion()
{
const string endpoint = "Sender@af016c07";
var resetTimersAfter = TimeSpan.FromMilliseconds(100);

var cache = new MockPerformanceCountersCache();
var updater = new PerformanceCounterUpdater(cache, new Dictionary<string, CounterInstanceName?>(), endpoint, resetTimersAfter);

var durationProbes = new[]
{
new MockDurationProbe("Critical Time"),
new MockDurationProbe("Processing Time")
};

// update before timeout
updater.Observe(new ProbeContext(durationProbes, new ISignalProbe[0]));

updater.Start();

durationProbes[0].Raise(TimeSpan.FromSeconds(11));
durationProbes[1].Raise(TimeSpan.FromSeconds(22));

await Task.Delay(TimeSpan.FromTicks(resetTimersAfter.Ticks * 2));

var performanceCounterOne = cache.Get(new CounterInstanceName("Critical Time", endpoint));
var performanceCounterTwo = cache.Get(new CounterInstanceName("Processing Time", endpoint));

await updater.Stop();

Assert.AreEqual(0, performanceCounterOne.RawValue);
Assert.AreEqual(0, performanceCounterTwo.RawValue);
}

[Test]
public void Durations_should_be_reported_properly_after_observed_pipeline_completion()
{
const string endpoint = "Sender@af016c07";
var resetTimersAfter = TimeSpan.FromMilliseconds(100);

var cache = new MockPerformanceCountersCache();
var updater = new PerformanceCounterUpdater(cache, new Dictionary<string, CounterInstanceName?>(), endpoint, resetTimersAfter);

var durationProbes = new[]
{
new MockDurationProbe("Critical Time"),
new MockDurationProbe("Processing Time")
};

// update before timeout
updater.Observe(new ProbeContext(durationProbes, new ISignalProbe[0]));

updater.Start();

Thread.Sleep(TimeSpan.FromTicks(resetTimersAfter.Ticks * 2));

updater.OnReceivePipelineCompleted();

durationProbes[0].Raise(TimeSpan.FromSeconds(11));
durationProbes[1].Raise(TimeSpan.FromSeconds(22));

var performanceCounterOne = cache.Get(new CounterInstanceName("Critical Time", endpoint));
var performanceCounterTwo = cache.Get(new CounterInstanceName("Processing Time", endpoint));

Assert.AreEqual(11, performanceCounterOne.RawValue);
Assert.AreEqual(22, performanceCounterTwo.RawValue);
}
}

class MockPerformanceCountersCache : PerformanceCountersCache
Expand Down Expand Up @@ -197,6 +266,12 @@ public void Register(OnEvent<DurationEvent> observer)
Observers += observer;
}

public void Raise(TimeSpan timespan, string messageType = null)
{
var duration = new DurationEvent(timespan, messageType);
Observers(ref duration);
}

public string Name { get; }
public string Description => string.Empty;
public OnEvent<DurationEvent> Observers = (ref DurationEvent d) => { };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Metrics.PerformanceCounters;

class PerformanceCounterUpdater
{
public PerformanceCounterUpdater(PerformanceCountersCache cache, Dictionary<string, CounterInstanceName?> legacyInstanceNameMap, string endpointName)
public PerformanceCounterUpdater(PerformanceCountersCache cache, Dictionary<string, CounterInstanceName?> legacyInstanceNameMap, string endpointName, TimeSpan? resetTimersAfter = null)
{
resetEvery = resetTimersAfter ?? TimeSpan.FromSeconds(2);
this.legacyInstanceNameMap = legacyInstanceNameMap;
this.endpointName = endpointName;
this.cache = cache;
cancellation = new CancellationTokenSource();
// initialize to an armed state
OnReceivePipelineCompleted();
}

public void Start()
{
cleaner = Task.Run(Cleanup);
}

public Task Stop()
{
cancellation.Cancel();
return cleaner;
}

public void Observe(ProbeContext context)
Expand All @@ -28,7 +46,9 @@ public void Observe(ProbeContext context)
{
if (durationProbe.Name == CounterNameConventions.ProcessingTime || durationProbe.Name == CounterNameConventions.CriticalTime)
{
var performanceCounterInstance = cache.Get(new CounterInstanceName(durationProbe.Name, endpointName));
var key = new CounterInstanceName(durationProbe.Name, endpointName);
var performanceCounterInstance = cache.Get(key);
resettable[key] = performanceCounterInstance;
durationProbe.Register((ref DurationEvent d) => performanceCounterInstance.RawValue = (long) d.Duration.TotalSeconds);
}

Expand All @@ -46,5 +66,34 @@ public void Observe(ProbeContext context)

readonly PerformanceCountersCache cache;
readonly Dictionary<string, CounterInstanceName?> legacyInstanceNameMap;
readonly ConcurrentDictionary<CounterInstanceName, IPerformanceCounterInstance> resettable = new ConcurrentDictionary<CounterInstanceName, IPerformanceCounterInstance>();
long lastCompleted;

public void OnReceivePipelineCompleted()
{
Volatile.Write(ref lastCompleted, NowTicks);
}

async Task Cleanup()
{
while (cancellation.IsCancellationRequested == false)
{
await Task.Delay(resetEvery).ConfigureAwait(false);

var idleFor = NowTicks - Volatile.Read(ref lastCompleted);
if (idleFor > resetEvery.Ticks)
{
foreach (var performanceCounter in resettable)
{
performanceCounter.Value.RawValue = 0;
}
}
}
}

static long NowTicks => DateTime.UtcNow.Ticks;
readonly TimeSpan resetEvery;
readonly string endpointName;
Task cleaner;
readonly CancellationTokenSource cancellation;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ protected override void Setup(FeatureConfigurationContext context)

context.RegisterStartupTask(new Cleanup(this));

context.Pipeline.OnReceivePipelineCompleted(_=>
{
updater.OnReceivePipelineCompleted();
return TaskExtensions.CompletedTask;
});

options.RegisterObservers(probeContext =>
{
updater.Observe(probeContext);
Expand Down Expand Up @@ -61,12 +67,13 @@ public void Dispose()

protected override Task OnStart(IMessageSession session)
{
feature.updater.Start();
return TaskExtensions.CompletedTask;
}

protected override Task OnStop(IMessageSession session)
{
return TaskExtensions.CompletedTask;
return feature.updater.Stop();
}

PerformanceCountersFeature feature;
Expand Down
88 changes: 88 additions & 0 deletions src/ScriptBuilder/CSharpCounterWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
namespace NServiceBus.Metrics.PerformanceCounters
{
using System.Collections.Generic;
using System.IO;
using System.Text;

static class CSharpCounterWriter
{
public static void WriteCode(string scriptPath, IEnumerable<TimerDefinition> timers, IEnumerable<MeterDefinition> meters, Dictionary<string, string> legacyInstanceNameMap)
{
var outputPath = Path.Combine(scriptPath, "Counters.g.cs");
using (var streamWriter = File.CreateText(outputPath))
{
var stringBuilder = new StringBuilder();

var slaCounterDefinition = @"new CounterCreationData(""SLA violation countdown"", ""Seconds until the SLA for this endpoint is breached."", PerformanceCounterType.NumberOfItems32),";
stringBuilder.AppendLine(slaCounterDefinition.PadLeft(slaCounterDefinition.Length + 8));

foreach (var timer in timers)
{
var timerDefinition = $@"new CounterCreationData(""{timer.Name}"", ""{timer.Description}"", PerformanceCounterType.NumberOfItems32),";
stringBuilder.AppendLine(timerDefinition.PadLeft(timerDefinition.Length + 8));
}

foreach (var meter in meters)
{
string instanceName;
legacyInstanceNameMap.TryGetValue(meter.Name, out instanceName);

var meterDefinition = $@"new CounterCreationData(""{instanceName ?? meter.Name}"", ""{meter.Description}"", PerformanceCounterType.RateOfCountsPerSecond32),";
stringBuilder.AppendLine(meterDefinition.PadLeft(meterDefinition.Length + 8));
}

streamWriter.Write(Template, stringBuilder);
}
}

const string Template = @"using System;
using System.Diagnostics;
using System.Security;
using System.Runtime.CompilerServices;

[CompilerGenerated]
public static class CounterCreator
{{
public static void Create()
{{
var counterCreationCollection = new CounterCreationDataCollection(Counters);
try
{{
var categoryName = ""NServiceBus"";

if (PerformanceCounterCategory.Exists(categoryName))
{{
foreach (CounterCreationData counter in counterCreationCollection)
{{
if (!PerformanceCounterCategory.CounterExists(counter.CounterName, categoryName))
{{
PerformanceCounterCategory.Delete(categoryName);
break;
}}
}}
}}

if (PerformanceCounterCategory.Exists(categoryName) == false)
{{
PerformanceCounterCategory.Create(
categoryName: categoryName,
categoryHelp: ""NServiceBus statistics"",
categoryType: PerformanceCounterCategoryType.MultiInstance,
counterData: counterCreationCollection);
}}

PerformanceCounter.CloseSharedResources();
}}
catch (Exception ex) when (ex is SecurityException || ex is UnauthorizedAccessException)
{{
throw new Exception(""Execution requires elevated permissions"", ex);
}}
}}

static CounterCreationData[] Counters = new CounterCreationData[]
{{
{0}
}};
}}";
}
}
Loading