From c8b46b44ee0f2f1b4f1f88e624d5b611aa10912a Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 27 Jul 2022 21:29:37 +0700 Subject: [PATCH] Fix faulty AddLogger in LoggingBus (#6028) * Fix faulty AddLogger in LoggingBus * Ignore logger async start setting, not relevant anymore * Simplify unit test * Send Stop instead of PoisonPill * Make loggers load asynchronously by default * Change logging back to exceptions * Remove _startupState, use closure instead Co-authored-by: Aaron Stannard --- .../Akka.Tests/Loggers/LoggerStartupSpec.cs | 224 ++++++++++++++++-- src/core/Akka/Event/LoggingBus.cs | 135 ++++++----- 2 files changed, 285 insertions(+), 74 deletions(-) diff --git a/src/core/Akka.Tests/Loggers/LoggerStartupSpec.cs b/src/core/Akka.Tests/Loggers/LoggerStartupSpec.cs index 0afa7b3aa48..08fae4e2dc1 100644 --- a/src/core/Akka.Tests/Loggers/LoggerStartupSpec.cs +++ b/src/core/Akka.Tests/Loggers/LoggerStartupSpec.cs @@ -9,47 +9,229 @@ using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; +using Akka.Dispatch; using Akka.Event; +using Akka.TestKit; using FluentAssertions; using Xunit; +using Xunit.Abstractions; namespace Akka.Tests.Loggers { public class LoggerStartupSpec : TestKit.Xunit2.TestKit { - private const int LoggerResponseDelayMs = 10_000; + private const int LoggerResponseDelayMs = 1_000; + + public LoggerStartupSpec(ITestOutputHelper helper) : base(nameof(LoggerStartupSpec), helper) + { + XUnitOutLogger.Helper = helper; + } - [Fact] - public async Task Logger_async_start_configuration_helps_to_ignore_hanging_loggers() + [Theory(DisplayName = "ActorSystem should start with loggers timing out")] + [InlineData(false)] + [InlineData(true)] + public async Task ActorSystem_should_start_with_loggers_timing_out(bool useAsync) { - var loggerAsyncStartDisabledConfig = ConfigurationFactory.ParseString($"akka.logger-async-start = false"); - var loggerAsyncStartEnabledConfig = ConfigurationFactory.ParseString($"akka.logger-async-start = true"); - - var slowLoggerConfig = ConfigurationFactory.ParseString($"akka.loggers = [\"{typeof(SlowLoggerActor).FullName}, {typeof(SlowLoggerActor).Assembly.GetName().Name}\"]"); + var slowLoggerConfig = ConfigurationFactory.ParseString($@" +akka.loglevel = DEBUG +akka.stdout-logger-class = ""{typeof(XUnitOutLogger).FullName}, {typeof(XUnitOutLogger).Assembly.GetName().Name}"" +akka.loggers = [ + ""Akka.Event.DefaultLogger"", + ""{typeof(SlowLoggerActor).FullName}, {typeof(SlowLoggerActor).Assembly.GetName().Name}"" +] +akka.logger-startup-timeout = 100ms").WithFallback(DefaultConfig); - // Without logger async start, ActorSystem creation will hang - this.Invoking(_ => ActorSystem.Create("handing", slowLoggerConfig.WithFallback(loggerAsyncStartDisabledConfig))).Should() - .Throw("System can not start - logger timed out"); - - // With logger async start, ActorSystem is created without issues - // Created without timeouts - var system = ActorSystem.Create("working", slowLoggerConfig.WithFallback(loggerAsyncStartEnabledConfig)); + var config = ConfigurationFactory.ParseString($"akka.logger-async-start = {(useAsync ? "true" : "false")}") + .WithFallback(slowLoggerConfig); + + ActorSystem sys = null; + try + { + this.Invoking(_ => sys = ActorSystem.Create("test", config)).Should() + .NotThrow("System should not fail to start when a logger timed out"); + var probe = CreateTestProbe(sys); + SlowLoggerActor.Probe = probe; + var logProbe = CreateTestProbe(sys); + sys.EventStream.Subscribe(logProbe, typeof(LogEvent)); + + // Logger actor should eventually initialize + await AwaitAssertAsync(() => + { + var dbg = logProbe.ExpectMsg(); + dbg.Message.ToString().Should().Contain(nameof(SlowLoggerActor)).And.Contain("started"); + }); + + var logger = Logging.GetLogger(sys, this); + logger.Error("TEST"); + await AwaitAssertAsync(() => + { + probe.ExpectMsg().Should().Be("TEST"); + }); + } + finally + { + if(sys != null) + Shutdown(sys); + } + } + + [Theory(DisplayName = "ActorSystem should start with loggers throwing exceptions")] + [InlineData("ctor")] + [InlineData("PreStart")] + [InlineData("Receive")] + public void ActorSystem_should_start_with_logger_exception(string when) + { + var config = ConfigurationFactory.ParseString($@" +akka.loglevel = DEBUG +akka.stdout-logger-class = ""{typeof(XUnitOutLogger).FullName}, {typeof(XUnitOutLogger).Assembly.GetName().Name}"" +akka.loggers = [ + ""Akka.Event.DefaultLogger"", + ""{typeof(ThrowingLogger).FullName}, {typeof(ThrowingLogger).Assembly.GetName().Name}"" +] +akka.logger-startup-timeout = 100ms").WithFallback(DefaultConfig); + + ThrowingLogger.ThrowWhen = when; + ActorSystem sys = null; + try + { + this.Invoking(_ => sys = ActorSystem.Create("test", config)).Should() + .NotThrow("System should not fail to start when a logger throws an exception"); + } + finally + { + if(sys != null) + Shutdown(sys); + } } - public class SlowLoggerActor : ReceiveActor + [Fact(DisplayName = "ActorSystem should throw and fail to start on invalid logger FQCN entry")] + public void ActorSystem_should_fail_on_invalid_logger() { - public SlowLoggerActor() + var config = ConfigurationFactory.ParseString($@" +akka.loglevel = DEBUG +akka.stdout-logger-class = ""{typeof(XUnitOutLogger).FullName}, {typeof(XUnitOutLogger).Assembly.GetName().Name}"" +akka.loggers = [ + ""Akka.Event.InvalidLogger, NonExistantAssembly"" +] +akka.logger-startup-timeout = 100ms").WithFallback(DefaultConfig); + + ActorSystem sys = null; + try + { + this.Invoking(_ => sys = ActorSystem.Create("test", config)).Should() + .ThrowExactly("System should fail to start with invalid logger FQCN"); + } + finally + { + if(sys != null) + Shutdown(sys); + } + } + + private class TestException: Exception + { + public TestException(string message) : base(message) + { } + } + + private class ThrowingLogger : ActorBase, IRequiresMessageQueue + { + public static string ThrowWhen = "Receive"; + + public ThrowingLogger() { - ReceiveAsync(async _ => + if(ThrowWhen == "ctor") + throw new TestException(".ctor BOOM!"); + } + + protected override void PreStart() + { + base.PreStart(); + if(ThrowWhen == "PreStart") + throw new TestException("PreStart BOOM!"); + } + + protected override bool Receive(object message) + { + if(message is InitializeLogger) { - // Ooops... Logger is responding too slow - await Task.Delay(LoggerResponseDelayMs); + if(ThrowWhen == "Receive") + throw new TestException("Receive BOOM!"); + Sender.Tell(new LoggerInitialized()); - }); + return true; + } + + return false; + } + } + + private class SlowLoggerActor : ActorBase, IWithUnboundedStash, IRequiresMessageQueue + { + public static TestProbe Probe; + + public SlowLoggerActor() + { + Become(Starting); } - private void Log(LogLevel level, string str) + private bool Starting(object message) + { + switch (message) + { + case InitializeLogger _: + var sender = Sender; + Task.Delay(LoggerResponseDelayMs).PipeTo(Self, sender, success: () => Done.Instance); + Become(Initializing); + return true; + default: + Stash.Stash(); + return true; + } + } + + private bool Initializing(object message) + { + switch (message) + { + case Done _: + Sender.Tell(new LoggerInitialized()); + Become(Initialized); + Stash.UnstashAll(); + return true; + default: + Stash.Stash(); + return true; + } + } + + private bool Initialized(object message) + { + switch (message) + { + case LogEvent evt: + Probe.Tell(evt.Message.ToString()); + return true; + default: + return false; + } + } + + protected override bool Receive(object message) + { + throw new NotImplementedException(); + } + + public IStash Stash { get; set; } + } + + public class XUnitOutLogger : MinimalLogger + { + public static ITestOutputHelper Helper; + + protected override void Log(object message) { + Helper.WriteLine(message.ToString()); } } } diff --git a/src/core/Akka/Event/LoggingBus.cs b/src/core/Akka/Event/LoggingBus.cs index fbe8e70149b..b5f9ab8d8c1 100644 --- a/src/core/Akka/Event/LoggingBus.cs +++ b/src/core/Akka/Event/LoggingBus.cs @@ -9,7 +9,6 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using Akka.Actor; @@ -23,10 +22,28 @@ namespace Akka.Event /// public class LoggingBus : ActorEventBus { + private sealed class LoggerStartInfo + { + private readonly string _name; + + public LoggerStartInfo(string loggerName, Type loggerType, IActorRef actorRef) + { + _name = $"{loggerName} [{loggerType.FullName}]"; + ActorRef = actorRef; + } + + public IActorRef ActorRef { get; } + + public override string ToString() => _name; + } + private static readonly LogLevel[] AllLogLevels = Enum.GetValues(typeof(LogLevel)).Cast().ToArray(); private static int _loggerId; private readonly List _loggers = new List(); + + // Async load support + private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource(); /// /// The minimum log level that this bus will subscribe to, any LogEvents with a log level below will not be subscribed to. @@ -88,12 +105,13 @@ protected override Type GetClassifier(object @event) internal void StartDefaultLoggers(ActorSystemImpl system) { var logName = SimpleName(this) + "(" + system.Name + ")"; - var logLevel = Logging.LogLevelFor(system.Settings.LogLevel); var loggerTypes = system.Settings.Loggers; var timeout = system.Settings.LoggerStartTimeout; - var asyncStart = system.Settings.LoggerAsyncStart; var shouldRemoveStandardOutLogger = true; + LogLevel = Logging.LogLevelFor(system.Settings.LogLevel); + + var taskInfos = new Dictionary(); foreach (var strLoggerType in loggerTypes) { var loggerType = Type.GetType(strLoggerType); @@ -108,33 +126,24 @@ internal void StartDefaultLoggers(ActorSystemImpl system) continue; } - if (asyncStart) - { - // Not awaiting for result, and not depending on current thread context - Task.Run(() => AddLogger(system, loggerType, logLevel, logName, timeout)) - .ContinueWith(t => - { - if (t.Exception != null) - { - Console.WriteLine($"Logger [{strLoggerType}] specified in config cannot be loaded: {t.Exception}"); - } - }); - } - else + var (task, name) = AddLogger(system, loggerType, logName); + taskInfos[task] = name; + } + + if (!Task.WaitAll(taskInfos.Keys.ToArray(), timeout)) + { + foreach (var kvp in taskInfos) { - try - { - AddLogger(system, loggerType, logLevel, logName, timeout); - } - catch (Exception ex) + if (!kvp.Key.IsCompleted) { - throw new ConfigurationException($"Logger [{strLoggerType}] specified in config cannot be loaded: {ex}", ex); + Publish(new Warning(logName, GetType(), + $"Logger {kvp.Value} did not respond within {timeout} to InitializeLogger(bus), " + + $"{nameof(LoggingBus)} will try and wait until it is ready. Since it start up is delayed, " + + "this logger may not capture all startup events correctly.")); } } } - LogLevel = logLevel; - if (system.Settings.DebugUnhandledMessage) { var forwarder = system.SystemActorOf(Props.Create(typeof(UnhandledMessageForwarder)), "UnhandledMessageForwarder"); @@ -164,46 +173,66 @@ internal void StopDefaultLoggers(ActorSystem system) Publish(new Debug(SimpleName(this), GetType(), $"Shutting down: {Logging.SimpleName(system.Settings.StdoutLogger)} started")); } + // Cancel all pending logger initialization tasks + _shutdownCts.Cancel(false); + _shutdownCts.Dispose(); + + // Stop all currently running loggers foreach (var logger in _loggers) { - if (!(logger is MinimalLogger)) - { - Unsubscribe(logger); - if (logger is IInternalActorRef internalActorRef) - { - internalActorRef.Stop(); - } - } + RemoveLogger(logger); } Publish(new Debug(SimpleName(this), GetType(), "All default loggers stopped")); } - private void AddLogger(ActorSystemImpl system, Type loggerType, LogLevel logLevel, string loggingBusName, TimeSpan timeout) + private void RemoveLogger(IActorRef logger) { - var loggerName = CreateLoggerName(loggerType); - var logger = system.SystemActorOf(Props.Create(loggerType).WithDispatcher(system.Settings.LoggersDispatcher), loggerName); - var askTask = logger.Ask(new InitializeLogger(this), timeout); - - object response = null; - try + if (!(logger is MinimalLogger)) { - response = askTask.Result; - } - catch (Exception ex) when (ex is TaskCanceledException || ex is AskTimeoutException) - { - Publish(new Warning(loggingBusName, GetType(), - string.Format("Logger {0} [{2}] did not respond within {1} to InitializeLogger(bus)", loggerName, timeout, loggerType.FullName))); + Unsubscribe(logger); + if (logger is IInternalActorRef internalActorRef) + { + internalActorRef.Stop(); + } } - - if (!(response is LoggerInitialized)) - throw new LoggerInitializationException($"Logger {loggerName} [{loggerType.FullName}] did not respond with LoggerInitialized, sent instead {response}"); - + } - _loggers.Add(logger); - SubscribeLogLevelAndAbove(logLevel, logger); - Publish(new Debug(loggingBusName, GetType(), $"Logger {loggerName} [{loggerType.Name}] started")); + private (Task task, string name) AddLogger(ActorSystemImpl system, Type loggerType, string loggingBusName) + { + var loggerName = CreateLoggerName(loggerType); + var fullLoggerName = $"{loggerName} [{loggerType.FullName}]"; + var logger = system.SystemActorOf(Props.Create(loggerType).WithDispatcher(system.Settings.LoggersDispatcher), loggerName); + var askTask = logger.Ask(new InitializeLogger(this), Timeout.InfiniteTimeSpan, _shutdownCts.Token); + askTask.ContinueWith(t => + { + // _shutdownCts was cancelled while this logger is still loading + if (t.IsCanceled) + { + Publish(new Warning(loggingBusName, GetType(), + $"Logger {fullLoggerName} startup have been cancelled because of system shutdown. Stopping logger.")); + RemoveLogger(logger); + return; + } + + // Task ran to completion successfully + var response = t.Result; + if (!(response is LoggerInitialized)) + { + // Malformed logger, logger did not send a proper ack. + Publish(new Error(null, loggingBusName, GetType(), + $"Logger {fullLoggerName} did not respond with {nameof(LoggerInitialized)}, sent instead {response.GetType()}. Stopping logger.")); + RemoveLogger(logger); + return; + } + + // Logger initialized successfully + _loggers.Add(logger); + SubscribeLogLevelAndAbove(LogLevel, logger); + Publish(new Debug(loggingBusName, GetType(), $"Logger {fullLoggerName} started")); + }); + return (askTask, fullLoggerName); } private string CreateLoggerName(Type actorClass) @@ -240,7 +269,7 @@ public void SetLogLevel(LogLevel logLevel) foreach (var logger in _loggers) { //subscribe to given log level and above - SubscribeLogLevelAndAbove(logLevel, logger); + SubscribeLogLevelAndAbove(LogLevel, logger); //unsubscribe to all levels below log level foreach (var level in AllLogLevels.Where(l => l < logLevel))