Skip to content

Commit

Permalink
add eventhub name into load balancer logs (#47271)
Browse files Browse the repository at this point in the history
  • Loading branch information
tovyhnal authored Nov 22, 2024
1 parent 42a6b1f commit f0a9a8d
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ protected PartitionLoadBalancerEventSource() : base(EventSourceName)
/// </summary>
///
/// <param name="count"> Minimum partitions per event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}'.")]
public virtual void MinimumPartitionsPerEventProcessor(int count)
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}' for Event Hub '{1}'.")]
public virtual void MinimumPartitionsPerEventProcessor(int count, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(1, count);
WriteEvent(1, count, eventHubName);
}
}

Expand All @@ -59,14 +60,16 @@ public virtual void MinimumPartitionsPerEventProcessor(int count)
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="count"> Current ownership count.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0}. (Identifier: '{1}')")]
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0} for Event Hub '{2}'. (Identifier: '{1}')")]
public virtual void CurrentOwnershipCount(int count,
string identifier)
string identifier,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(2, count, identifier ?? string.Empty);
WriteEvent(2, count, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -75,13 +78,14 @@ public virtual void CurrentOwnershipCount(int count,
/// </summary>
///
/// <param name="unclaimedPartitions">List of unclaimed partitions.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}'.")]
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}' for Event Hub '{1}'.")]
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(3, string.Join(", ", unclaimedPartitions));
WriteEvent(3, string.Join(", ", unclaimedPartitions), eventHubName);
}
}

Expand All @@ -90,13 +94,14 @@ public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose ownership claim attempt is starting.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}'.")]
public virtual void ClaimOwnershipStart(string partitionId)
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}' for Event Hub '{1}'.")]
public virtual void ClaimOwnershipStart(string partitionId, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(4, partitionId ?? string.Empty);
WriteEvent(4, partitionId ?? string.Empty, eventHubName);
}
}

Expand All @@ -106,14 +111,16 @@ public virtual void ClaimOwnershipStart(string partitionId)
///
/// <param name="partitionId">The identifier of the Event Hub partition.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}'. (ErrorMessage: '{1}')")]
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}' for Event Hub '{2}'. (ErrorMessage: '{1}')")]
public virtual void ClaimOwnershipError(string partitionId,
string errorMessage)
string errorMessage,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty);
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}

Expand All @@ -122,13 +129,14 @@ public virtual void ClaimOwnershipError(string partitionId,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition. (Identifier: '{0}')")]
public virtual void ShouldStealPartition(string identifier)
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void ShouldStealPartition(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(6, identifier ?? string.Empty);
WriteEvent(6, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -139,15 +147,17 @@ public virtual void ShouldStealPartition(string identifier)
/// <param name="partitionId">The identifier of the partition that was selected to be stolen.</param>
/// <param name="stolenFrom">The identifier of the event processor that is being stolen from.</param>
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}'. (Identifier: '{2}')")]
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}' for Event Hub '{3}'. (Identifier: '{2}').")]
public virtual void StealPartition(string partitionId,
string stolenFrom,
string identifier)
string identifier,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty);
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -156,13 +166,14 @@ public virtual void StealPartition(string partitionId,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership. (Identifier: '{0}')")]
public virtual void RenewOwnershipStart(string identifier)
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void RenewOwnershipStart(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(8, identifier ?? string.Empty);
WriteEvent(8, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -172,14 +183,16 @@ public virtual void RenewOwnershipStart(string identifier)
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership. (Identifier: '{0}'; ErrorMessage: '{0}')")]
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership for Event Hub '{2}'. (Identifier: '{0}'; ErrorMessage: '{1}')")]
public virtual void RenewOwnershipError(string identifier,
string errorMessage)
string errorMessage,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty);
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}

Expand All @@ -188,13 +201,14 @@ public virtual void RenewOwnershipError(string identifier,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed. (Identifier: '{0}')")]
public virtual void RenewOwnershipComplete(string identifier)
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void RenewOwnershipComplete(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(10, identifier ?? string.Empty);
WriteEvent(10, identifier ?? string.Empty, eventHubName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio

var unevenPartitionDistribution = (partitionCount % ActiveOwnershipWithDistribution.Keys.Count) > 0;
var minimumOwnedPartitionsCount = partitionCount / ActiveOwnershipWithDistribution.Keys.Count;
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount);
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount, EventHubName);

var ownedPartitionsCount = ActiveOwnershipWithDistribution[OwnerIdentifier].Count;
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier);
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier, EventHubName);

// There are two possible situations in which we may need to claim a partition ownership:
//
Expand All @@ -374,7 +374,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
{
// Look for unclaimed partitions. If any, randomly pick one of them to claim.

Logger.UnclaimedPartitions(unclaimedPartitions);
Logger.UnclaimedPartitions(unclaimedPartitions, EventHubName);

if (unclaimedPartitions.Count > 0)
{
Expand Down Expand Up @@ -423,7 +423,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
if ((ownedPartitionsCount < minimumOwnedPartitionsCount)
|| (ownedPartitionsCount < maximumOwnedPartitionsCount && partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
{
Logger.ShouldStealPartition(OwnerIdentifier);
Logger.ShouldStealPartition(OwnerIdentifier, EventHubName);

// Prefer stealing from a processor that owns more than the maximum number of partitions.

Expand All @@ -444,7 +444,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}

Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);

var returnTask = ClaimOwnershipAsync(
partitionToSteal,
Expand Down Expand Up @@ -474,7 +474,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}

Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);

var returnTask = ClaimOwnershipAsync(
partitionToSteal,
Expand All @@ -501,7 +501,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

Logger.RenewOwnershipStart(OwnerIdentifier);
Logger.RenewOwnershipStart(OwnerIdentifier, EventHubName);

var utcNow = GetDateTimeOffsetNow();

Expand Down Expand Up @@ -543,7 +543,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
// If ownership renewal fails just give up and try again in the next cycle. The processor may
// end up losing some of its ownership.

Logger.RenewOwnershipError(OwnerIdentifier, ex.Message);
Logger.RenewOwnershipError(OwnerIdentifier, ex.Message, EventHubName);

// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
Expand All @@ -553,7 +553,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
}
finally
{
Logger.RenewOwnershipComplete(OwnerIdentifier);
Logger.RenewOwnershipComplete(OwnerIdentifier, EventHubName);
}
}

Expand All @@ -572,7 +572,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.ClaimOwnershipStart(partitionId);
Logger.ClaimOwnershipStart(partitionId, EventHubName);

// We need the eTag from the most recent ownership of this partition, even if it's expired. We want to keep the offset and
// the sequence number as well.
Expand Down Expand Up @@ -602,7 +602,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)

// If ownership claim fails, just treat it as a usual ownership claim failure.

Logger.ClaimOwnershipError(partitionId, ex.Message);
Logger.ClaimOwnershipError(partitionId, ex.Message, EventHubName);

// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealFromItself()

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -597,7 +597,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenLessPartitionsThanProcess

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -678,7 +678,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenTheLoadIsBalanced(int[] a

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -902,14 +902,14 @@ public async Task VerifiesEventProcessorLogs()

await loadbalancer.RelinquishOwnershipAsync(CancellationToken.None);

mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p))));
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount));
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item)))));
mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p)), loadbalancer.EventHubName));
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount, loadbalancer.EventHubName));
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item))), loadbalancer.EventHubName));
}

/// <summary>
Expand Down

0 comments on commit f0a9a8d

Please sign in to comment.