Skip to content

Commit

Permalink
Merge pull request #44 from Particular/master-to-develop
Browse files Browse the repository at this point in the history
Master to develop
  • Loading branch information
Scooletz authored Nov 13, 2017
2 parents 3821d5f + 67560a7 commit 6a019bf
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
[TestFixture]
public class IntegrationTests
{
const string EndpointName = "PerfCountersIntegrationTests";
static ManualResetEvent ManualResetEvent = new ManualResetEvent(false);

[Test]
public async Task Ensure_counters_are_written()
{
string message = null;

var endpointName = "PerfCountersIntegrationTests";
var endpointConfiguration = EndpointConfigBuilder.BuildEndpoint(endpointName);
var endpointConfiguration = EndpointConfigBuilder.BuildEndpoint(EndpointName);
endpointConfiguration.DefineCriticalErrorAction(
context =>
{
Expand All @@ -31,6 +31,17 @@ public async Task Ensure_counters_are_written()
var endpoint = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);

var criticalTime = GetCounter(PerformanceCountersFeature.CriticalTimeCounterName);
var processingTime = GetCounter(PerformanceCountersFeature.ProcessingTimeCounterName);

Assert.AreEqual(0, criticalTime.RawValue);
Assert.AreEqual(0, processingTime.RawValue);

var cancellation = new CancellationTokenSource();

var criticalTimeReading = ReadNonZero(criticalTime, cancellation);
var processingTimeReading = ReadNonZero(processingTime, cancellation);

await endpoint.SendLocal(new MyMessage())
.ConfigureAwait(false);

Expand All @@ -40,14 +51,13 @@ await Task.Delay(1500)
await endpoint.Stop()
.ConfigureAwait(false);

var criticalTimePerfCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.CriticalTimeCounterName, endpointName, true);
//var processingTimePerfCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.ProcessingTimeCounterName, endpointName, true);
var slaPerCounter = new PerformanceCounter("NServiceBus", SLAMonitoringFeature.CounterName, endpointName, true);
var messagesFailuresPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesFailuresPerSecondCounterName, endpointName, true);
var messagesProcessedPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesProcessedPerSecondCounterName, endpointName, true);
var messagesPulledPerSecondCounter = new PerformanceCounter("NServiceBus", PerformanceCountersFeature.MessagesPulledPerSecondCounterName, endpointName, true);
Assert.AreNotEqual(0, criticalTimePerfCounter.RawValue);
//Assert.AreNotEqual(0, processingTimePerfCounter.RawValue);
cancellation.Cancel();
var slaPerCounter = GetCounter(SLAMonitoringFeature.CounterName);
var messagesFailuresPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesFailuresPerSecondCounterName);
var messagesProcessedPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesProcessedPerSecondCounterName);
var messagesPulledPerSecondCounter = GetCounter(PerformanceCountersFeature.MessagesPulledPerSecondCounterName);
Assert.True(await criticalTimeReading);
Assert.True(await processingTimeReading);
Assert.AreNotEqual(0, slaPerCounter.RawValue);
Assert.AreEqual(0, messagesFailuresPerSecondCounter.RawValue);
Assert.AreNotEqual(0, messagesProcessedPerSecondCounter.RawValue);
Expand All @@ -56,6 +66,23 @@ await endpoint.Stop()
Assert.IsNull(message);
}

static async Task<bool> ReadNonZero(PerformanceCounter counter, CancellationTokenSource cancellation)
{
while (counter.RawValue == 0)
{
if (cancellation.IsCancellationRequested)
{
return false;
}

await Task.Delay(TimeSpan.FromMilliseconds(10));
}

return true;
}

static PerformanceCounter GetCounter(string counterName) => new PerformanceCounter("NServiceBus", counterName, EndpointName, true);

public class MyHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ApprovalUtilities.Utilities;
using NServiceBus;
using NUnit.Framework;
Expand Down Expand Up @@ -156,6 +158,73 @@ static long CalculateAverageTimerCounterUpdate(TimeSpan d)
{
return d.Ticks * Stopwatch.Frequency / TimeSpan.TicksPerSecond;
}
[Test]
public async Task Durations_should_be_reported_as_zeros_after_specific_period_from_last_observed_pipeline_completion()
{
const string endpoint = "Sender@af016c07";
var resetTimersAfter = TimeSpan.FromMilliseconds(100);

var cache = new MockPerformanceCountersCache();
var updater = new PerformanceCounterUpdater(cache, new Dictionary<string, CounterInstanceName?>(), endpoint, resetTimersAfter);

var durationProbes = new[]
{
new MockDurationProbe("Critical Time"),
new MockDurationProbe("Processing Time")
};

// update before timeout
updater.Observe(new ProbeContext(durationProbes, new ISignalProbe[0]));

updater.Start();

durationProbes[0].Raise(TimeSpan.FromSeconds(11));
durationProbes[1].Raise(TimeSpan.FromSeconds(22));

await Task.Delay(TimeSpan.FromTicks(resetTimersAfter.Ticks * 2));

var performanceCounterOne = cache.Get(new CounterInstanceName("Critical Time", endpoint));
var performanceCounterTwo = cache.Get(new CounterInstanceName("Processing Time", endpoint));

await updater.Stop();

Assert.AreEqual(0, performanceCounterOne.RawValue);
Assert.AreEqual(0, performanceCounterTwo.RawValue);
}

[Test]
public void Durations_should_be_reported_properly_after_observed_pipeline_completion()
{
const string endpoint = "Sender@af016c07";
var resetTimersAfter = TimeSpan.FromMilliseconds(100);

var cache = new MockPerformanceCountersCache();
var updater = new PerformanceCounterUpdater(cache, new Dictionary<string, CounterInstanceName?>(), endpoint, resetTimersAfter);

var durationProbes = new[]
{
new MockDurationProbe("Critical Time"),
new MockDurationProbe("Processing Time")
};

// update before timeout
updater.Observe(new ProbeContext(durationProbes, new ISignalProbe[0]));

updater.Start();

Thread.Sleep(TimeSpan.FromTicks(resetTimersAfter.Ticks * 2));

updater.OnReceivePipelineCompleted();

durationProbes[0].Raise(TimeSpan.FromSeconds(11));
durationProbes[1].Raise(TimeSpan.FromSeconds(22));

var performanceCounterOne = cache.Get(new CounterInstanceName("Critical Time", endpoint));
var performanceCounterTwo = cache.Get(new CounterInstanceName("Processing Time", endpoint));

Assert.AreEqual(11, performanceCounterOne.RawValue);
Assert.AreEqual(22, performanceCounterTwo.RawValue);
}
}

class MockPerformanceCountersCache : PerformanceCountersCache
Expand Down Expand Up @@ -197,6 +266,12 @@ public void Register(OnEvent<DurationEvent> observer)
Observers += observer;
}

public void Raise(TimeSpan timespan, string messageType = null)
{
var duration = new DurationEvent(timespan, messageType);
Observers(ref duration);
}

public string Name { get; }
public string Description => string.Empty;
public OnEvent<DurationEvent> Observers = (ref DurationEvent d) => { };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Metrics.PerformanceCounters;

class PerformanceCounterUpdater
{
public PerformanceCounterUpdater(PerformanceCountersCache cache, Dictionary<string, CounterInstanceName?> legacyInstanceNameMap, string endpointName)
public PerformanceCounterUpdater(PerformanceCountersCache cache, Dictionary<string, CounterInstanceName?> legacyInstanceNameMap, string endpointName, TimeSpan? resetTimersAfter = null)
{
resetEvery = resetTimersAfter ?? TimeSpan.FromSeconds(2);
this.legacyInstanceNameMap = legacyInstanceNameMap;
this.endpointName = endpointName;
this.cache = cache;
cancellation = new CancellationTokenSource();
// initialize to an armed state
OnReceivePipelineCompleted();
}

public void Start()
{
cleaner = Task.Run(Cleanup);
}

public Task Stop()
{
cancellation.Cancel();
return cleaner;
}

public void Observe(ProbeContext context)
Expand All @@ -28,7 +46,9 @@ public void Observe(ProbeContext context)
{
if (durationProbe.Name == CounterNameConventions.ProcessingTime || durationProbe.Name == CounterNameConventions.CriticalTime)
{
var performanceCounterInstance = cache.Get(new CounterInstanceName(durationProbe.Name, endpointName));
var key = new CounterInstanceName(durationProbe.Name, endpointName);
var performanceCounterInstance = cache.Get(key);
resettable[key] = performanceCounterInstance;
durationProbe.Register((ref DurationEvent d) => performanceCounterInstance.RawValue = (long) d.Duration.TotalSeconds);
}

Expand All @@ -46,5 +66,34 @@ public void Observe(ProbeContext context)

readonly PerformanceCountersCache cache;
readonly Dictionary<string, CounterInstanceName?> legacyInstanceNameMap;
readonly ConcurrentDictionary<CounterInstanceName, IPerformanceCounterInstance> resettable = new ConcurrentDictionary<CounterInstanceName, IPerformanceCounterInstance>();
long lastCompleted;

public void OnReceivePipelineCompleted()
{
Volatile.Write(ref lastCompleted, NowTicks);
}

async Task Cleanup()
{
while (cancellation.IsCancellationRequested == false)
{
await Task.Delay(resetEvery).ConfigureAwait(false);

var idleFor = NowTicks - Volatile.Read(ref lastCompleted);
if (idleFor > resetEvery.Ticks)
{
foreach (var performanceCounter in resettable)
{
performanceCounter.Value.RawValue = 0;
}
}
}
}

static long NowTicks => DateTime.UtcNow.Ticks;
readonly TimeSpan resetEvery;
readonly string endpointName;
Task cleaner;
readonly CancellationTokenSource cancellation;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ protected override void Setup(FeatureConfigurationContext context)

context.RegisterStartupTask(new Cleanup(this));

context.Pipeline.OnReceivePipelineCompleted(_=>
{
updater.OnReceivePipelineCompleted();
return TaskExtensions.CompletedTask;
});

options.RegisterObservers(probeContext =>
{
updater.Observe(probeContext);
Expand Down Expand Up @@ -61,12 +67,13 @@ public void Dispose()

protected override Task OnStart(IMessageSession session)
{
feature.updater.Start();
return TaskExtensions.CompletedTask;
}

protected override Task OnStop(IMessageSession session)
{
return TaskExtensions.CompletedTask;
return feature.updater.Stop();
}

PerformanceCountersFeature feature;
Expand Down
88 changes: 88 additions & 0 deletions src/ScriptBuilder/CSharpCounterWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
namespace NServiceBus.Metrics.PerformanceCounters
{
using System.Collections.Generic;
using System.IO;
using System.Text;

static class CSharpCounterWriter
{
public static void WriteCode(string scriptPath, IEnumerable<TimerDefinition> timers, IEnumerable<MeterDefinition> meters, Dictionary<string, string> legacyInstanceNameMap)
{
var outputPath = Path.Combine(scriptPath, "Counters.g.cs");
using (var streamWriter = File.CreateText(outputPath))
{
var stringBuilder = new StringBuilder();

var slaCounterDefinition = @"new CounterCreationData(""SLA violation countdown"", ""Seconds until the SLA for this endpoint is breached."", PerformanceCounterType.NumberOfItems32),";
stringBuilder.AppendLine(slaCounterDefinition.PadLeft(slaCounterDefinition.Length + 8));

foreach (var timer in timers)
{
var timerDefinition = $@"new CounterCreationData(""{timer.Name}"", ""{timer.Description}"", PerformanceCounterType.NumberOfItems32),";
stringBuilder.AppendLine(timerDefinition.PadLeft(timerDefinition.Length + 8));
}

foreach (var meter in meters)
{
string instanceName;
legacyInstanceNameMap.TryGetValue(meter.Name, out instanceName);

var meterDefinition = $@"new CounterCreationData(""{instanceName ?? meter.Name}"", ""{meter.Description}"", PerformanceCounterType.RateOfCountsPerSecond32),";
stringBuilder.AppendLine(meterDefinition.PadLeft(meterDefinition.Length + 8));
}

streamWriter.Write(Template, stringBuilder);
}
}

const string Template = @"using System;
using System.Diagnostics;
using System.Security;
using System.Runtime.CompilerServices;
[CompilerGenerated]
public static class CounterCreator
{{
public static void Create()
{{
var counterCreationCollection = new CounterCreationDataCollection(Counters);
try
{{
var categoryName = ""NServiceBus"";
if (PerformanceCounterCategory.Exists(categoryName))
{{
foreach (CounterCreationData counter in counterCreationCollection)
{{
if (!PerformanceCounterCategory.CounterExists(counter.CounterName, categoryName))
{{
PerformanceCounterCategory.Delete(categoryName);
break;
}}
}}
}}
if (PerformanceCounterCategory.Exists(categoryName) == false)
{{
PerformanceCounterCategory.Create(
categoryName: categoryName,
categoryHelp: ""NServiceBus statistics"",
categoryType: PerformanceCounterCategoryType.MultiInstance,
counterData: counterCreationCollection);
}}
PerformanceCounter.CloseSharedResources();
}}
catch (Exception ex) when (ex is SecurityException || ex is UnauthorizedAccessException)
{{
throw new Exception(""Execution requires elevated permissions"", ex);
}}
}}
static CounterCreationData[] Counters = new CounterCreationData[]
{{
{0}
}};
}}";
}
}
Loading

0 comments on commit 6a019bf

Please sign in to comment.