Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HttpStress: Optimize LogHttpEventListener #56983

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,47 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.Buffers;

namespace HttpStress
{
public sealed class LogHttpEventListener : EventListener
{
private int _lastLogNumber = 0;
private StreamWriter _log;
private FileStream _log;
private Channel<string> _messagesChannel = Channel.CreateUnbounded<string>();
private Task _processMessages;
private CancellationTokenSource _stopProcessing;
private CancellationTokenSource _stopProcessing = new CancellationTokenSource();
private const string LogDirectory = "./clientlog";

private FileStream CreateNextLogFileStream()
{
string fn = Path.Combine(LogDirectory, $"client_{++_lastLogNumber:000}.log");
if (File.Exists(fn))
{
File.Delete(fn);
}
return new FileStream(fn, FileMode.CreateNew, FileAccess.Write);
}

public LogHttpEventListener()
{
foreach (var filename in Directory.GetFiles(".", "client*.log"))
if (!Directory.Exists(LogDirectory))
{
Directory.CreateDirectory(LogDirectory);
}

foreach (string filename in Directory.GetFiles(LogDirectory, "client*.log"))
{
try
{
File.Delete(filename);
} catch {}
}
_log = new StreamWriter("client.log", false) { AutoFlush = true };

_log = CreateNextLogFileStream();
_messagesChannel = Channel.CreateUnbounded<string>();
_processMessages = ProcessMessagesAsync();
_stopProcessing = new CancellationTokenSource();
}

protected override void OnEventSourceCreated(EventSource eventSource)
Expand All @@ -46,40 +62,51 @@ protected override void OnEventSourceCreated(EventSource eventSource)

private async Task ProcessMessagesAsync()
{
await Task.Yield();

try
{
byte[] buffer = new byte[8192];
var encoding = Encoding.ASCII;

int i = 0;
await foreach (string message in _messagesChannel.Reader.ReadAllAsync(_stopProcessing.Token))
{
if ((++i % 10_000) == 0)
{
RotateFiles();
await RotateFiles();
}

_log.WriteLine(message);
int maxLen = encoding.GetMaxByteCount(message.Length);
if (maxLen > buffer.Length)
{
buffer = new byte[maxLen];
}
int byteCount = encoding.GetBytes(message, buffer);

await _log.WriteAsync(buffer.AsMemory(0, byteCount), _stopProcessing.Token);
}
}
catch (OperationCanceledException)
{
return;
}

void RotateFiles()
async ValueTask RotateFiles()
{
await _log.FlushAsync(_stopProcessing.Token);
// Rotate the log if it reaches 50 MB size.
if (_log.BaseStream.Length > (50 << 20))
if (_log.Length > (100 << 20))
{
_log.Close();
_log = new StreamWriter($"client_{++_lastLogNumber:000}.log", false) { AutoFlush = true };
await _log.DisposeAsync();
_log = CreateNextLogFileStream();
}
}
}

private StringBuilder? _cachedStringBuilder;

protected override async void OnEventWritten(EventWrittenEventArgs eventData)
{
var sb = new StringBuilder().Append($"{eventData.TimeStamp:HH:mm:ss.fffffff}[{eventData.EventName}] ");
StringBuilder sb = Interlocked.Exchange(ref _cachedStringBuilder, null) ?? new StringBuilder();
sb.Append($"{eventData.TimeStamp:HH:mm:ss.fffffff}[{eventData.EventName}] ");
for (int i = 0; i < eventData.Payload?.Count; i++)
{
if (i > 0)
Expand All @@ -88,13 +115,18 @@ protected override async void OnEventWritten(EventWrittenEventArgs eventData)
}
sb.Append(eventData.PayloadNames?[i]).Append(": ").Append(eventData.Payload[i]);
}
await _messagesChannel.Writer.WriteAsync(sb.ToString());
sb.Append(Environment.NewLine);
string s = sb.ToString();
sb.Clear();
Interlocked.Exchange(ref _cachedStringBuilder, sb);
await _messagesChannel.Writer.WriteAsync(s, _stopProcessing.Token);

}

public override void Dispose()
{
base.Dispose();

_log.Flush();
if (!_processMessages.Wait(TimeSpan.FromSeconds(30)))
{
_stopProcessing.Cancel();
Expand Down