-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathRemoting.cs
463 lines (406 loc) · 17.3 KB
/
Remoting.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
//-----------------------------------------------------------------------
// <copyright file="Remoting.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Remote.Transport;
using Akka.Util.Internal;
using Akka.Configuration;
namespace Akka.Remote
{
/// <summary>
/// INTERNAL API
/// </summary>
internal static class AddressUrlEncoder
{
/// <summary>
/// URL-encodes an actor <see cref="Address"/>. Used when generating the names
/// of some system remote actors.
/// </summary>
/// <param name="address">TBD</param>
/// <returns>TBD</returns>
public static string Encode(Address address)
{
return WebUtility.UrlEncode(address.ToString());
}
}
/// <summary>
/// INTERNAL API
///
/// (used for forcing all /system level remoting actors onto a dedicated dispatcher)
/// </summary>
// ReSharper disable once InconsistentNaming
internal sealed class RARP : ExtensionIdProvider<RARP>, IExtension
{
//this is why this extension is called "RARP"
private readonly IRemoteActorRefProvider _provider;
/// <summary>
/// Used as part of the <see cref="ExtensionIdProvider{RARP}"/>
/// </summary>
public RARP() { }
private RARP(IRemoteActorRefProvider provider)
{
_provider = provider;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="props">TBD</param>
/// <returns>TBD</returns>
public Props ConfigureDispatcher(Props props)
{
return _provider.RemoteSettings.ConfigureDispatcher(props);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <returns>TBD</returns>
public override RARP CreateExtension(ExtendedActorSystem system)
{
return new RARP((IRemoteActorRefProvider)system.Provider);
}
/// <summary>
/// The underlying remote actor reference provider.
/// </summary>
public IRemoteActorRefProvider Provider
{
get { return _provider; }
}
#region Static methods
/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <returns>TBD</returns>
public static RARP For(ActorSystem system)
{
return system.WithExtension<RARP, RARP>();
}
#endregion
}
/// <summary>
/// INTERNAL API
/// Messages marked with this interface will be sent before other messages when buffering is active.
/// This means that these messages don't obey normal message ordering.
/// It is used for failure detector heartbeat messages.
/// </summary>
internal interface IPriorityMessage { }
/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class Remoting : RemoteTransport
{
private readonly ILoggingAdapter _log;
private volatile IDictionary<string, HashSet<ProtocolTransportAddressPair>> _transportMapping;
private volatile IActorRef _endpointManager;
// This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception
// handling.
private volatile HashSet<Address> _addresses;
// This variable has the same semantics as the addresses variable, in the sense it is written once, and emulates
// a lazy val
private volatile Address _defaultAddress;
private readonly IActorRef _transportSupervisor;
private readonly EventPublisher _eventPublisher;
/// <summary>
/// TBD
/// </summary>
/// <param name="system">TBD</param>
/// <param name="provider">TBD</param>
public Remoting(ExtendedActorSystem system, RemoteActorRefProvider provider)
: base(system, provider)
{
_log = Logging.GetLogger(system, "remoting");
_eventPublisher = new EventPublisher(system, _log, Logging.LogLevelFor(provider.RemoteSettings.RemoteLifecycleEventsLogLevel));
_transportSupervisor = system.SystemActorOf(Props.Create<TransportSupervisor>(), "transports");
}
#region RemoteTransport overrides
/// <summary>
/// TBD
/// </summary>
public override ISet<Address> Addresses
{
get { return _addresses; }
}
/// <summary>
/// TBD
/// </summary>
public override Address DefaultAddress
{
get { return _defaultAddress; }
}
/// <summary>
/// Start assumes that it cannot be followed by another Start() without having a Shutdown() first
/// </summary>
/// <exception cref="ConfigurationException">
/// This exception is thrown when no transports are enabled under the "akka.remote.enabled-transports" configuration setting.
/// </exception>
/// <exception cref="TaskCanceledException">
/// This exception is thrown when startup is canceled due to a timeout.
/// </exception>
/// <exception cref="TimeoutException">
/// This exception is thrown when startup times out.
/// </exception>
/// <exception cref="Exception">
/// This exception is thrown when a general error occurs during startup.
/// </exception>
public override void Start()
{
if (_endpointManager == null)
{
_log.Info("Starting remoting");
_endpointManager =
System.SystemActorOf(RARP.For(System).ConfigureDispatcher(
Props.Create(() => new EndpointManager(System.Settings.Config, _log)).WithDeploy(Deploy.Local)),
EndpointManagerName);
try
{
var addressPromise = new TaskCompletionSource<IList<ProtocolTransportAddressPair>>();
// tells the EndpointManager to start all transports and bind them to listenable addresses, and then set the results
// of this promise to include them.
_endpointManager.Tell(new EndpointManager.Listen(addressPromise));
addressPromise.Task.Wait(Provider.RemoteSettings.StartupTimeout);
var akkaProtocolTransports = addressPromise.Task.Result;
if(akkaProtocolTransports.Count==0)
throw new ConfigurationException(@"No transports enabled under ""akka.remote.enabled-transports""");
_addresses = new HashSet<Address>(akkaProtocolTransports.Select(a => a.Address));
IEnumerable<IGrouping<string, ProtocolTransportAddressPair>> tmp =
akkaProtocolTransports.GroupBy(t => t.ProtocolTransport.SchemeIdentifier);
_transportMapping = new Dictionary<string, HashSet<ProtocolTransportAddressPair>>();
foreach (var g in tmp)
{
var set = new HashSet<ProtocolTransportAddressPair>(g);
_transportMapping.Add(g.Key, set);
}
_defaultAddress = akkaProtocolTransports.Head().Address;
_addresses = new HashSet<Address>(akkaProtocolTransports.Select(x => x.Address));
_log.Info("Remoting started; listening on addresses : [{0}]", string.Join(",", _addresses.Select(x => x.ToString())));
_endpointManager.Tell(new EndpointManager.StartupFinished());
_eventPublisher.NotifyListeners(new RemotingListenEvent(_addresses.ToList()));
}
catch (TaskCanceledException ex)
{
NotifyError("Startup was canceled due to timeout", ex);
throw;
}
catch (TimeoutException ex)
{
NotifyError("Startup timed out", ex);
throw;
}
catch (Exception ex)
{
NotifyError("Startup failed", ex);
throw;
}
}
else
{
_log.Warning("Remoting was already started. Ignoring start attempt.");
}
}
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public override Task Shutdown()
{
if (_endpointManager == null)
{
_log.Warning("Remoting is not running. Ignoring shutdown attempt");
return Task.FromResult(true);
}
else
{
var timeout = Provider.RemoteSettings.ShutdownTimeout;
void Action()
{
_eventPublisher.NotifyListeners(new RemotingShutdownEvent());
_endpointManager = null;
}
return _endpointManager.Ask<bool>(new EndpointManager.ShutdownAndFlush(), timeout).ContinueWith(result =>
{
if (result.IsFaulted || result.IsCanceled) //Shutdown was not successful
{
NotifyError("Failure during shutdown of remoting", result.Exception);
Action();
}
else
{
if (!result.Result)
{
_log.Warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " +
"Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.");
}
Action();
}
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <param name="sender">TBD</param>
/// <param name="recipient">TBD</param>
/// <exception cref="RemoteTransportException">TBD</exception>
public override void Send(object message, IActorRef sender, RemoteActorRef recipient)
{
if (_endpointManager == null)
{
throw new RemoteTransportException("Attempted to send remote message but Remoting is not running.", null);
}
_endpointManager.Tell(new EndpointManager.Send(message, recipient, sender), sender ?? ActorRefs.NoSender);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="cmd">TBD</param>
/// <exception cref="RemoteTransportException">TBD</exception>
/// <returns>TBD</returns>
public override async Task<bool> ManagementCommand(object cmd)
=> await ManagementCommand(cmd, CancellationToken.None);
public override async Task<bool> ManagementCommand(object cmd, CancellationToken cancellationToken)
{
if (_endpointManager == null)
throw new RemoteTransportException("Attempted to send management command but Remoting is not running.", null);
var result = await _endpointManager.Ask<EndpointManager.ManagementCommandAck>(
message: new EndpointManager.ManagementCommand(cmd),
timeout: Provider.RemoteSettings.CommandAckTimeout,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return result.Status;
}
/// <summary>
/// TBD
/// </summary>
/// <param name="remote">TBD</param>
/// <returns>TBD</returns>
public override Address LocalAddressForRemote(Address remote)
{
return Remoting.LocalAddressForRemote(_transportMapping, remote);
}
/// <summary>
/// Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses.
/// </summary>
/// <param name="address">The address of the remote system to be quarantined</param>
/// <param name="uid">The UID of the remote system; if the uid is not defined it will not be a strong quarantine but the current
/// endpoint writer will be stopped (dropping system messages) and the address will be gated.</param>
/// <exception cref="RemoteTransportException">
/// This exception is thrown when trying to quarantine a system but remoting is not running.
/// </exception>
public override void Quarantine(Address address, int? uid)
{
if (_endpointManager == null)
{
throw new RemoteTransportException($"Attempted to quarantine address {address} with uid {uid} but Remoting is not running");
}
_endpointManager.Tell(new EndpointManager.Quarantine(address, uid));
}
#endregion
#region Internal methods
private void NotifyError(string msg, Exception cause)
{
_eventPublisher.NotifyListeners(new RemotingErrorEvent(new RemoteTransportException(msg, cause)));
}
#endregion
#region Static methods
/// <summary>
/// TBD
/// </summary>
public const string EndpointManagerName = "endpointManager";
/// <summary>
/// TBD
/// </summary>
/// <param name="transportMapping">TBD</param>
/// <param name="remote">TBD</param>
/// <exception cref="RemoteTransportException">TBD</exception>
/// <returns>TBD</returns>
internal static Address LocalAddressForRemote(
IDictionary<string, HashSet<ProtocolTransportAddressPair>> transportMapping, Address remote)
{
if (transportMapping.TryGetValue(remote.Protocol, out var transports))
{
ProtocolTransportAddressPair[] responsibleTransports =
transports.Where(t => t.ProtocolTransport.IsResponsibleFor(remote)).ToArray();
if (responsibleTransports.Length == 0)
throw new RemoteTransportException(
"No transport is responsible for address:[" + remote + "] although protocol [" + remote.Protocol +
"] is available." +
" Make sure at least one transport is configured to be responsible for the address.",
null);
if (responsibleTransports.Length == 1)
return responsibleTransports.First().Address;
throw new RemoteTransportException(
"Multiple transports are available for [" + remote + ": " +
string.Join(",", responsibleTransports.Select(t => t.ToString())) + "] " +
"Remoting cannot decide which transport to use to reach the remote system. Change your configuration " +
"so that only one transport is responsible for the address.",
null);
}
throw new RemoteTransportException(
"No transport is loaded for protocol: [" + remote.Protocol + "], available protocols: [" +
string.Join(",", transportMapping.Keys.Select(t => t.ToString())) + "]", null);
}
#endregion
}
/// <summary>
/// Message type used to provide both <see cref="Props"/> and a name for a new transport actor
/// </summary>
internal sealed class RegisterTransportActor : INoSerializationVerificationNeeded
{
/// <summary>
/// TBD
/// </summary>
/// <param name="props">TBD</param>
/// <param name="name">TBD</param>
public RegisterTransportActor(Props props, string name)
{
Props = props;
Name = name;
}
/// <summary>
/// TBD
/// </summary>
public Props Props { get; private set; }
/// <summary>
/// TBD
/// </summary>
public string Name { get; private set; }
}
/// <summary>
/// Actor responsible for supervising the creation of all transport actors
/// </summary>
internal sealed class TransportSupervisor : ReceiveActor
{
private readonly SupervisorStrategy _strategy = new OneForOneStrategy(_ => Directive.Restart);
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
protected override SupervisorStrategy SupervisorStrategy()
{
return _strategy;
}
/// <summary>
/// TBD
/// </summary>
public TransportSupervisor()
{
Receive<RegisterTransportActor>(
r =>
Sender.Tell(
Context.ActorOf(RARP.For(Context.System).ConfigureDispatcher(r.Props.WithDeploy(Deploy.Local)),
r.Name)));
}
}
}