Skip to content

Commit

Permalink
Report cause for Akka/IO TCP CommandFailed events
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Nov 1, 2022
1 parent cdc2b85 commit efc6069
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3525,8 +3530,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3318,6 +3318,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3532,8 +3537,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3525,8 +3530,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
32 changes: 24 additions & 8 deletions src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
using System.Linq;
using System.Net;
using Akka.Actor;
using Akka.Annotations;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
using Akka.IO.Buffers;
using Akka.Util;

namespace Akka.IO
{
Expand Down Expand Up @@ -85,7 +87,7 @@ private SocketConnected() { }
public class Message : INoSerializationVerificationNeeded { }

#region user commands

// COMMANDS
/// <summary>
/// TBD
Expand Down Expand Up @@ -733,7 +735,7 @@ public override string ToString() =>
#endregion

#region user events

/// <summary>
/// Common interface for all events generated by the TCP layer actors.
/// </summary>
Expand Down Expand Up @@ -808,18 +810,32 @@ public sealed class CommandFailed : Event
/// TBD
/// </summary>
/// <param name="cmd">TBD</param>
public CommandFailed(Command cmd)
{
Cmd = cmd;
}
public CommandFailed(Command cmd) => Cmd = cmd;

/// <summary>
/// TBD
/// </summary>
public Command Cmd { get; }

public override string ToString() =>
$"CommandFailed({Cmd})";
/// <summary>
/// Optionally contains the cause why the command failed.
/// </summary>
public Option<Exception> Cause { get; private set; } = Option<Exception>.None;

/// <summary>
/// Creates a copy of this object with a new cause set.
/// </summary>
[InternalApi]
public CommandFailed WithCause(Exception cause)
{
// Needs to be added with a mutable property for compatibility reasons
return new CommandFailed(Cmd) { Cause = cause };
}

[InternalApi]
public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : "";

public override string ToString() => $"CommandFailed({Cmd}){CauseString}";
}

/// <summary>
Expand Down
10 changes: 8 additions & 2 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ enum ConnectionStatus

private IActorRef _watchedActor = Context.System.DeadLetters;

private readonly IOException droppingWriteBecauseWritingIsSuspendedException =
new IOException("Dropping write because writing is suspended");

private readonly IOException droppingWriteBecauseQueueIsFullException =
new IOException("Dropping write because queue is full");

protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option<int> writeCommandsBufferMaxSize)
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
Expand Down Expand Up @@ -328,7 +334,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
if (HasStatus(ConnectionStatus.WritingSuspended))
{
if (_traceLogging) Log.Debug("Dropping write because writing is suspended");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException));
}

if (HasStatus(ConnectionStatus.Sending))
Expand Down Expand Up @@ -405,7 +411,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
private void DropWrite(ConnectionInfo info, WriteCommand write)
{
if (_traceLogging) Log.Debug("Dropping write because queue is full");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException));
if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private Receive Initializing() => message =>
return true;

case Status.Failure fail:
_bindCommander.Tell(_bind.FailureMessage);
_bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause));
_log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress);
Context.Stop(Self);
_binding = false;
Expand Down
44 changes: 27 additions & 17 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Annotations;
using Akka.Util;

namespace Akka.IO
{
/// <summary>
/// TBD
/// An actor handling the connection state machine for an outgoing connection
/// to be established.
/// </summary>
internal sealed class TcpOutgoingConnection : TcpConnection
{
Expand All @@ -26,6 +28,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection

private SocketAsyncEventArgs _connectArgs;

private readonly ConnectException finishConnectNeverReturnedTrueException =
new ConnectException("Could not establish connection because finishConnect never returned true");

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(
tcp,
Expand Down Expand Up @@ -61,11 +66,11 @@ private void ReleaseConnectionSocketArgs()
}
}

private void Stop()
private void Stop(Exception cause)
{
ReleaseConnectionSocketArgs();

StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage));
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage.WithCause(cause)));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -77,30 +82,29 @@ private void ReportConnectFailure(Action thunk)
}
catch (Exception e)
{
Log.Error(e, "Could not establish connection to [{0}].", _connect.RemoteAddress);
Stop();
Log.Debug(e, "Could not establish connection to [{0}] due to {1}", _connect.RemoteAddress, e.Message);
Stop(e);
}
}

protected override void PreStart()
{
ReportConnectFailure(() =>
{
if (_connect.RemoteAddress is DnsEndPoint)
if (_connect.RemoteAddress is DnsEndPoint remoteAddress)
{
var remoteAddress = (DnsEndPoint) _connect.RemoteAddress;
Log.Debug("Resolving {0} before connecting", remoteAddress.Host);
var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self);
if (resolved == null)
Become(Resolving(remoteAddress));
else if(resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port));
else // one or the other
Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null);
}
else if(_connect.RemoteAddress is IPEndPoint)
else if (_connect.RemoteAddress is IPEndPoint point)
{
Register((IPEndPoint)_connect.RemoteAddress, null);
Register(point, null);
}
else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported");
});
Expand All @@ -123,8 +127,7 @@ private Receive Resolving(DnsEndPoint remoteAddress)
{
return message =>
{
var resolved = message as Dns.Resolved;
if (resolved != null)
if (message is Dns.Resolved resolved)
{
if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses
{
Expand All @@ -144,7 +147,6 @@ private Receive Resolving(DnsEndPoint remoteAddress)
};
}


private void Register(IPEndPoint address, IPEndPoint fallbackAddress)
{
ReportConnectFailure(() =>
Expand All @@ -165,7 +167,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
{
return message =>
{
if (message is IO.Tcp.SocketConnected)
if (message is Tcp.SocketConnected)
{
if (args.SocketError == SocketError.Success)
{
Expand Down Expand Up @@ -202,19 +204,27 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
else
{
Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)");
Stop();
Stop(finishConnectNeverReturnedTrueException);
}
return true;
}
if (message is ReceiveTimeout)
{
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout
Log.Error("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop();
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
return true;
}
return false;
};
}
}

[InternalApi]
public class ConnectException : Exception
{
public ConnectException(string message)
: base(message)
{ }
}
}

0 comments on commit efc6069

Please sign in to comment.