Skip to content

Commit

Permalink
[sdk-metrics] Exemplar concurrency updates (#5465)
Browse files Browse the repository at this point in the history
Co-authored-by: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com>
  • Loading branch information
CodeBlanch and utpilla authored Mar 26, 2024
1 parent b10f84a commit 1a607c8
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 37 deletions.
1 change: 1 addition & 0 deletions OpenTelemetry.sln
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{A49299
src\Shared\StatusHelper.cs = src\Shared\StatusHelper.cs
src\Shared\TagTransformer.cs = src\Shared\TagTransformer.cs
src\Shared\TagTransformerJsonHelper.cs = src\Shared\TagTransformerJsonHelper.cs
src\Shared\ThreadSafeRandom.cs = src\Shared\ThreadSafeRandom.cs
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "DiagnosticSourceInstrumentation", "DiagnosticSourceInstrumentation", "{28F3EC79-660C-4659-8B73-F90DC1173316}"
Expand Down
47 changes: 47 additions & 0 deletions src/OpenTelemetry/Metrics/Exemplar/Exemplar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct Exemplar
private int tagCount;
private KeyValuePair<string, object?>[]? tagStorage;
private MetricPointValueStorage valueStorage;
private int isCriticalSectionOccupied;

/// <summary>
/// Gets the timestamp.
Expand Down Expand Up @@ -103,6 +104,17 @@ public readonly ReadOnlyFilteredTagCollection FilteredTags
internal void Update<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
// Note: If we reached here it means some other thread is already
// updating the exemplar. Instead of spinning, we abort. The idea is
// for two exemplars offered at more or less the same time there
// really isn't a difference which one is stored so it is an
// optimization to let the losing thread(s) get back to work instead
// of spinning.
return;
}

this.Timestamp = DateTimeOffset.UtcNow;

if (typeof(T) == typeof(long))
Expand Down Expand Up @@ -135,6 +147,8 @@ internal void Update<T>(in ExemplarMeasurement<T> measurement)
{
this.StoreRawTags(measurement.Tags);
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal void Reset()
Expand All @@ -147,6 +161,29 @@ internal readonly bool IsUpdated()
return this.Timestamp != default;
}

internal void Collect(ref Exemplar destination, bool reset)
{
if (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0)
{
this.AcquireLockRare();
}

if (this.IsUpdated())
{
this.Copy(ref destination);
if (reset)
{
this.Reset();
}
}
else
{
destination.Reset();
}

Interlocked.Exchange(ref this.isCriticalSectionOccupied, 0);
}

internal readonly void Copy(ref Exemplar destination)
{
destination.Timestamp = this.Timestamp;
Expand Down Expand Up @@ -179,4 +216,14 @@ private void StoreRawTags(ReadOnlySpan<KeyValuePair<string, object?>> tags)

tags.CopyTo(this.tagStorage);
}

private void AcquireLockRare()
{
SpinWait spinWait = default;
do
{
spinWait.SpinOnce();
}
while (Interlocked.Exchange(ref this.isCriticalSectionOccupied, 1) != 0);
}
}
35 changes: 6 additions & 29 deletions src/OpenTelemetry/Metrics/Exemplar/FixedSizeExemplarReservoir.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,13 @@ public sealed override ReadOnlyExemplarCollection Collect()
{
var runningExemplars = this.runningExemplars;

if (this.ResetOnCollect)
for (int i = 0; i < runningExemplars.Length; i++)
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
running.Reset();
}
else
{
this.snapshotExemplars[i].Reset();
}
}
}
else
{
for (int i = 0; i < runningExemplars.Length; i++)
{
ref var running = ref runningExemplars[i];
if (running.IsUpdated())
{
running.Copy(ref this.snapshotExemplars[i]);
}
else
{
this.snapshotExemplars[i].Reset();
}
}
ref var running = ref runningExemplars[i];

running.Collect(
ref this.snapshotExemplars[i],
reset: this.ResetOnCollect);
}

this.OnCollected();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using OpenTelemetry.Internal;

namespace OpenTelemetry.Metrics;

/// <summary>
Expand All @@ -12,9 +14,8 @@ namespace OpenTelemetry.Metrics;
/// </remarks>
internal sealed class SimpleFixedSizeExemplarReservoir : FixedSizeExemplarReservoir
{
private readonly Random random = new();

private int measurementsSeen;
private const int DefaultMeasurementState = -1;
private int measurementState = DefaultMeasurementState;

public SimpleFixedSizeExemplarReservoir(int poolSize)
: base(poolSize)
Expand All @@ -36,21 +37,21 @@ protected override void OnCollected()
// Reset internal state irrespective of temporality.
// This ensures incoming measurements have fair chance
// of making it to the reservoir.
this.measurementsSeen = 0;
this.measurementState = DefaultMeasurementState;
}

private void Offer<T>(in ExemplarMeasurement<T> measurement)
where T : struct
{
var measurementNumber = this.measurementsSeen++;
var measurementState = Interlocked.Increment(ref this.measurementState);

if (measurementNumber < this.Capacity)
if (measurementState < this.Capacity)
{
this.UpdateExemplar(measurementNumber, in measurement);
this.UpdateExemplar(measurementState, in measurement);
}
else
{
var index = this.random.Next(0, measurementNumber);
int index = ThreadSafeRandom.Next(0, measurementState);
if (index < this.Capacity)
{
this.UpdateExemplar(index, in measurement);
Expand Down
1 change: 1 addition & 0 deletions src/OpenTelemetry/OpenTelemetry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<Compile Include="$(RepoRoot)\src\Shared\Options\*.cs" Link="Includes\Options\%(Filename).cs" />
<Compile Include="$(RepoRoot)\src\Shared\ResourceSemanticConventions.cs" Link="Includes\ResourceSemanticConventions.cs" />
<Compile Include="$(RepoRoot)\src\Shared\Shims\NullableAttributes.cs" Link="Includes\Shims\NullableAttributes.cs" />
<Compile Include="$(RepoRoot)\src\Shared\ThreadSafeRandom.cs" Link="Includes\ThreadSafeRandom.cs" />
</ItemGroup>

</Project>
37 changes: 37 additions & 0 deletions src/Shared/ThreadSafeRandom.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

namespace OpenTelemetry.Internal;

// Note: Inspired by https://devblogs.microsoft.com/pfxteam/getting-random-numbers-in-a-thread-safe-way/
internal static class ThreadSafeRandom
{
#if NET6_0_OR_GREATER
public static int Next(int min, int max)
{
return Random.Shared.Next(min, max);
}
#else
private static readonly Random GlobalRandom = new();

[ThreadStatic]
private static Random? localRandom;

public static int Next(int min, int max)
{
var local = localRandom;
if (local == null)
{
int seed;
lock (GlobalRandom)
{
seed = GlobalRandom.Next();
}

localRandom = local = new Random(seed);
}

return local.Next(min, max);
}
#endif
}

0 comments on commit 1a607c8

Please sign in to comment.