diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 00dde2d89d..c7761f5875 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -9,7 +9,7 @@ jobs: parameters: parts: 3 n: 2 - codecoverage: true + codecoverage: false - template: templates/build-template-window.yml parameters: parts: 3 diff --git a/protobuf/peer_service.proto b/protobuf/peer_service.proto index 8e5e45da20..3f614159e3 100644 --- a/protobuf/peer_service.proto +++ b/protobuf/peer_service.proto @@ -7,25 +7,27 @@ import "aelf/core.proto"; service PeerService { - rpc Ping (PingRequest) returns (PongReply) {} - rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {} - - rpc RequestBlock (BlockRequest) returns (BlockReply) {} - rpc RequestBlocks (BlocksRequest) returns (BlockList) {} + rpc Ping (PingRequest) returns (PongReply) {} + rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {} - rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {} - - rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {} - rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {} + rpc RequestBlock (BlockRequest) returns (BlockReply) {} + rpc RequestBlocks (BlocksRequest) returns (BlockList) {} - rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {} + rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {} - rpc GetNodes (NodesRequest) returns (NodeList) {} + rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {} + rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {} - rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {} - rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {} + rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {} - rpc Disconnect (DisconnectReason) returns (VoidReply) {} + rpc RequestByStream (stream StreamMessage) returns (stream StreamMessage) {} + + rpc GetNodes (NodesRequest) returns (NodeList) {} + + rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {} + rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {} + + rpc Disconnect (DisconnectReason) returns (VoidReply) {} } // **** No reply ***** @@ -45,3 +47,34 @@ message HealthCheckRequest { message HealthCheckReply { } +message StreamMessage { + StreamType stream_type = 1; + MessageType message_type = 2; + string request_id = 3; + bytes message = 4; + map meta = 5; +} + +enum StreamType { + UNKNOWN = 0; + REQUEST = 1; + REPLY = 2; +} + +enum MessageType { + ANY = 0; + + HAND_SHAKE = 1; + PING = 2; + CONFIRM_HAND_SHAKE = 3; + HEALTH_CHECK = 4; + REQUEST_BLOCK = 5; + REQUEST_BLOCKS = 6; + GET_NODES = 7; + + BLOCK_BROADCAST = 8; + TRANSACTION_BROADCAST = 9; + ANNOUNCEMENT_BROADCAST = 10; + LIB_ANNOUNCEMENT_BROADCAST = 11; + DISCONNECT = 12; +} diff --git a/src/AElf.Kernel.Core/Blockchain/Application/IBlockValidationProvider.cs b/src/AElf.Kernel.Core/Blockchain/Application/IBlockValidationProvider.cs index 17e72098c5..41d29c730e 100644 --- a/src/AElf.Kernel.Core/Blockchain/Application/IBlockValidationProvider.cs +++ b/src/AElf.Kernel.Core/Blockchain/Application/IBlockValidationProvider.cs @@ -157,7 +157,7 @@ await _transactionBlockIndexService.ValidateTransactionBlockIndexExistsInBranchA block.Header.PreviousBlockHash); if (!blockIndexExists) continue; - Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId); + Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId.ToHex()); return false; } diff --git a/src/AElf.Kernel.Core/SmartContract/Application/IBlockchainStateService.cs b/src/AElf.Kernel.Core/SmartContract/Application/IBlockchainStateService.cs index 4663df53c9..d9688d0a27 100644 --- a/src/AElf.Kernel.Core/SmartContract/Application/IBlockchainStateService.cs +++ b/src/AElf.Kernel.Core/SmartContract/Application/IBlockchainStateService.cs @@ -158,7 +158,7 @@ public async Task MergeBlockStateAsync(long lastIrreversibleBlockHeight, Hash la Logger.LogDebug( "Start merge lib height: {LastIrreversibleBlockHeight}, lib block hash: {LastIrreversibleBlockHash}, merge count: {BlockIndexesCount}", - lastIrreversibleBlockHeight, lastIrreversibleBlockHash, blockIndexes.Count); + lastIrreversibleBlockHeight, lastIrreversibleBlockHash.ToHex(), blockIndexes.Count); foreach (var blockIndex in blockIndexes) try diff --git a/src/AElf.Kernel.SmartContractExecution/Application/BlockchainExecutingService.cs b/src/AElf.Kernel.SmartContractExecution/Application/BlockchainExecutingService.cs index 219b1f4404..92d87bb47d 100644 --- a/src/AElf.Kernel.SmartContractExecution/Application/BlockchainExecutingService.cs +++ b/src/AElf.Kernel.SmartContractExecution/Application/BlockchainExecutingService.cs @@ -112,7 +112,7 @@ await _transactionResultService.GetTransactionResultAsync(transactionId, blockHa == null) { Logger.LogWarning( - $"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}"); + "Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}", blockHash.ToHex(), transactionId.ToHex()); return null; } diff --git a/src/AElf.Kernel.Types/Block/BlockHeader.cs b/src/AElf.Kernel.Types/Block/BlockHeader.cs index fef0d25a95..770eb0eb7f 100644 --- a/src/AElf.Kernel.Types/Block/BlockHeader.cs +++ b/src/AElf.Kernel.Types/Block/BlockHeader.cs @@ -35,7 +35,7 @@ public byte[] GetHashBytes() private byte[] GetSignatureData() { if (!VerifyFields()) - throw new InvalidOperationException($"Invalid block header: {this}."); + throw new InvalidOperationException($"Invalid block header: PreviousBlockHash={PreviousBlockHash?.ToHex()}, mtr={MerkleTreeRootOfTransactions?.ToHex()}, ChainId={ChainId}, Height={Height}, Time={Time}."); if (Signature.IsEmpty) return this.ToByteArray(); diff --git a/src/AElf.Kernel.Types/KernelConstants.cs b/src/AElf.Kernel.Types/KernelConstants.cs index 4e2ad4eb64..0df91dd46e 100644 --- a/src/AElf.Kernel.Types/KernelConstants.cs +++ b/src/AElf.Kernel.Types/KernelConstants.cs @@ -1,3 +1,4 @@ +using System; using Google.Protobuf.WellKnownTypes; namespace AElf.Kernel; @@ -16,4 +17,5 @@ public static class KernelConstants public const string SignaturePlaceholder = "SignaturePlaceholder"; public const string BlockExecutedDataKey = "BlockExecutedData"; public static Duration AllowedFutureBlockTimeSpan = new() { Seconds = 4 }; + public static string SupportStreamMinVersion = "1.4.0.0"; } \ No newline at end of file diff --git a/src/AElf.Launcher/AElf.Launcher.csproj b/src/AElf.Launcher/AElf.Launcher.csproj index 0f4c8d7fd4..663c4536de 100644 --- a/src/AElf.Launcher/AElf.Launcher.csproj +++ b/src/AElf.Launcher/AElf.Launcher.csproj @@ -5,8 +5,8 @@ true - - + + Always @@ -15,9 +15,9 @@ - - - + + + diff --git a/src/AElf.OS.Core/AElf.OS.Core.csproj b/src/AElf.OS.Core/AElf.OS.Core.csproj index 794614116f..47ea5135a5 100644 --- a/src/AElf.OS.Core/AElf.OS.Core.csproj +++ b/src/AElf.OS.Core/AElf.OS.Core.csproj @@ -1,5 +1,5 @@ - + net6.0 AElf.OS @@ -8,8 +8,8 @@ Core module for the OS layer. - - + + diff --git a/src/AElf.OS.Core/Network/Application/INetworkService.cs b/src/AElf.OS.Core/Network/Application/INetworkService.cs index af015e10ee..ec80fdd542 100644 --- a/src/AElf.OS.Core/Network/Application/INetworkService.cs +++ b/src/AElf.OS.Core/Network/Application/INetworkService.cs @@ -1,6 +1,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using AElf.Kernel; +using AElf.OS.Network.Infrastructure; using AElf.OS.Network.Types; using AElf.Types; @@ -28,4 +29,5 @@ Task RemovePeerByPubkeyAsync(string peerPubkey, Task CheckPeersHealthAsync(); void CheckNtpDrift(); bool IsPeerPoolFull(); + Task> GetNodesAsync(IPeer peer); } \ No newline at end of file diff --git a/src/AElf.OS.Core/Network/Application/NetworkService.cs b/src/AElf.OS.Core/Network/Application/NetworkService.cs index deab564cbd..ea7ad05d70 100644 --- a/src/AElf.OS.Core/Network/Application/NetworkService.cs +++ b/src/AElf.OS.Core/Network/Application/NetworkService.cs @@ -142,7 +142,7 @@ public Task BroadcastAnnounceAsync(BlockHeader blockHeader) Logger.LogInformation(ex, $"Could not broadcast announcement to {peer} " + $"- status {peer.ConnectionStatus}."); - await HandleNetworkException(peer, ex); + await HandleNetworkExceptionAsync(peer, ex); } }); } @@ -171,7 +171,7 @@ public Task BroadcastTransactionAsync(Transaction transaction) Logger.LogWarning(ex, $"Could not broadcast transaction to {peer} " + $"- status {peer.ConnectionStatus}."); - await HandleNetworkException(peer, ex); + await HandleNetworkExceptionAsync(peer, ex); } }); } @@ -201,7 +201,7 @@ public Task BroadcastLibAnnounceAsync(Hash libHash, long libHeight) { Logger.LogWarning(ex, $"Could not broadcast lib announcement to {peer} " + $"- status {peer.ConnectionStatus}."); - await HandleNetworkException(peer, ex); + await HandleNetworkExceptionAsync(peer, ex); } }); } @@ -256,7 +256,6 @@ public async Task>> GetBlocksAsync(Hash pre if (peer == null) throw new InvalidOperationException($"Could not find peer {peerPubkey}."); - var response = await Request(peer, p => p.GetBlocksAsync(previousBlock, count)); if (response.Success && response.Payload != null @@ -343,7 +342,7 @@ private void EnqueueBlock(IPeer peer, BlockWithTransactions blockWithTransaction if (ex != null) { Logger.LogWarning(ex, $"Could not broadcast block to {peer} - status {peer.ConnectionStatus}."); - await HandleNetworkException(peer, ex); + await HandleNetworkExceptionAsync(peer, ex); } }); } @@ -372,13 +371,33 @@ private async Task> Request(IPeer peer, Func> func if (ex.ExceptionType == NetworkExceptionType.HandlerException) return new Response(default); - await HandleNetworkException(peer, ex); + await HandleNetworkExceptionAsync(peer, ex); } return new Response(); } - private async Task HandleNetworkException(IPeer peer, NetworkException exception) + public async Task> GetNodesAsync(IPeer peer) + { + try + { + var nodeList = await peer.GetNodesAsync(); + + if (nodeList?.Nodes == null) + return new List(); + + Logger.LogDebug("get nodes: {nodeList} from peer: {peer}.", nodeList, peer); + return nodeList.Nodes.ToList(); + } + catch (Exception e) + { + if (e is NetworkException exception) await HandleNetworkExceptionAsync(peer, exception); + Logger.LogWarning(e, "get nodes failed. peer={peer}", peer); + return new List(); + } + } + + private async Task HandleNetworkExceptionAsync(IPeer peer, NetworkException exception) { if (exception.ExceptionType == NetworkExceptionType.Unrecoverable) { @@ -398,8 +417,9 @@ private async Task RecoverPeerAsync(IPeer peer) return; var success = await peer.TryRecoverAsync(); - - if (!success) + if (success) + await _networkServer.BuildStreamForPeerAsync(peer); + else await _networkServer.TrySchedulePeerReconnectionAsync(peer); } diff --git a/src/AElf.OS.Core/Network/Events/StreamMessageReceivedEvent.cs b/src/AElf.OS.Core/Network/Events/StreamMessageReceivedEvent.cs new file mode 100644 index 0000000000..0851ca185f --- /dev/null +++ b/src/AElf.OS.Core/Network/Events/StreamMessageReceivedEvent.cs @@ -0,0 +1,19 @@ +using Google.Protobuf; + +namespace AElf.OS.Network.Events; + +public class StreamMessageReceivedEvent +{ + public StreamMessageReceivedEvent(ByteString message, string clientPubkey, string requestId) + { + Message = message; + ClientPubkey = clientPubkey; + RequestId = requestId; + } + + public ByteString Message { get; } + + public string ClientPubkey { get; } + + public string RequestId { get; } +} \ No newline at end of file diff --git a/src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs b/src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs new file mode 100644 index 0000000000..ce096fa496 --- /dev/null +++ b/src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs @@ -0,0 +1,16 @@ +using AElf.OS.Network.Application; +using AElf.OS.Network.Infrastructure; + +namespace AElf.OS.Network.Events; + +public class StreamPeerExceptionEvent +{ + public NetworkException Exception { get; } + public IPeer Peer { get; } + + public StreamPeerExceptionEvent(NetworkException exception, IPeer peer) + { + Exception = exception; + Peer = peer; + } +} \ No newline at end of file diff --git a/src/AElf.OS.Core/Network/Infrastructure/IAElfNetworkServer.cs b/src/AElf.OS.Core/Network/Infrastructure/IAElfNetworkServer.cs index 3bf1afdd6d..d65a2353ab 100644 --- a/src/AElf.OS.Core/Network/Infrastructure/IAElfNetworkServer.cs +++ b/src/AElf.OS.Core/Network/Infrastructure/IAElfNetworkServer.cs @@ -12,4 +12,5 @@ public interface IAElfNetworkServer Task StopAsync(bool gracefulDisconnect = true); void CheckNtpDrift(); Task CheckEndpointAvailableAsync(DnsEndPoint endpoint); + Task BuildStreamForPeerAsync(IPeer peer); } \ No newline at end of file diff --git a/src/AElf.OS.Core/Network/Infrastructure/IPeerDiscoveryJobProcessor.cs b/src/AElf.OS.Core/Network/Infrastructure/IPeerDiscoveryJobProcessor.cs index cc1be3d022..46c06d16ec 100644 --- a/src/AElf.OS.Core/Network/Infrastructure/IPeerDiscoveryJobProcessor.cs +++ b/src/AElf.OS.Core/Network/Infrastructure/IPeerDiscoveryJobProcessor.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using AElf.Kernel.Account.Application; +using AElf.OS.Network.Application; using AElf.OS.Network.Domain; using AElf.OS.Network.Extensions; using Microsoft.Extensions.Logging; @@ -28,18 +28,20 @@ public class PeerDiscoveryJobProcessor : IPeerDiscoveryJobProcessor, ISingletonD private readonly IDiscoveredNodeCacheProvider _discoveredNodeCacheProvider; private readonly IAElfNetworkServer _networkServer; private readonly INodeManager _nodeManager; + private readonly INetworkService _networkService; private TransformManyBlock _discoverNodesDataflow; private ActionBlock _processNodeDataflow; public PeerDiscoveryJobProcessor(INodeManager nodeManager, IDiscoveredNodeCacheProvider discoveredNodeCacheProvider, IAElfNetworkServer networkServer, - IAccountService accountService) + IAccountService accountService, INetworkService networkService) { _nodeManager = nodeManager; _discoveredNodeCacheProvider = discoveredNodeCacheProvider; _networkServer = networkServer; _accountService = accountService; + _networkService = networkService; CreatePeerDiscoveryDataflow(); Logger = NullLogger.Instance; @@ -80,21 +82,7 @@ private void CreatePeerDiscoveryDataflow() private async Task> DiscoverNodesAsync(IPeer peer) { - try - { - var nodeList = await peer.GetNodesAsync(); - - if (nodeList?.Nodes == null) - return new List(); - - Logger.LogDebug($"Discover nodes: {nodeList} from peer: {peer}."); - return nodeList.Nodes.ToList(); - } - catch (Exception e) - { - Logger.LogWarning(e, "Discover nodes failed."); - return new List(); - } + return await _networkService.GetNodesAsync(peer); } private async Task ProcessNodeAsync(NodeInfo node) diff --git a/src/AElf.OS.Core/Network/NetworkConstants.cs b/src/AElf.OS.Core/Network/NetworkConstants.cs index 21bd91d915..da366b3611 100644 --- a/src/AElf.OS.Core/Network/NetworkConstants.cs +++ b/src/AElf.OS.Core/Network/NetworkConstants.cs @@ -33,6 +33,7 @@ public static class NetworkConstants public const int DefaultMaxBufferedTransactionCount = 100; public const int DefaultMaxBufferedBlockCount = 50; public const int DefaultMaxBufferedAnnouncementCount = 200; + public const int DefaultMaxBufferedStreamCount = 200; public const int DefaultPeerReconnectionPeriod = 60_000; // 1 min public const int DefaultMaximumReconnectionTime = 60_000 * 60 * 24; // 1 day diff --git a/src/AElf.OS.Core/Network/NetworkOptions.cs b/src/AElf.OS.Core/Network/NetworkOptions.cs index a298f86376..9dd235d0ea 100644 --- a/src/AElf.OS.Core/Network/NetworkOptions.cs +++ b/src/AElf.OS.Core/Network/NetworkOptions.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using AElf.Kernel; namespace AElf.OS.Network; @@ -75,6 +76,8 @@ public class NetworkOptions public int PeerInvalidTransactionTimeout { get; set; } = NetworkConstants.DefaultPeerInvalidTransactionTimeout; public int PeerInvalidTransactionLimit { get; set; } = NetworkConstants.DefaultPeerInvalidTransactionLimit; + + public string SupportStreamMinVersion { get; set; } = KernelConstants.SupportStreamMinVersion; } [Flags] diff --git a/src/AElf.OS.Core/Network/Protocol/HandshakeProvider.cs b/src/AElf.OS.Core/Network/Protocol/HandshakeProvider.cs index bbb155670a..4da2d3597b 100644 --- a/src/AElf.OS.Core/Network/Protocol/HandshakeProvider.cs +++ b/src/AElf.OS.Core/Network/Protocol/HandshakeProvider.cs @@ -70,13 +70,13 @@ public async Task ValidateHandshakeAsync(Handshake ha var chainId = _blockchainService.GetChainId(); if (handshake.HandshakeData.ChainId != chainId) { - Logger.LogDebug($"Chain is is incorrect: {handshake.HandshakeData.ChainId}."); + Logger.LogDebug($"Chain is incorrect: {handshake.HandshakeData.ChainId}."); return HandshakeValidationResult.InvalidChainId; } if (handshake.HandshakeData.Version != KernelConstants.ProtocolVersion) { - Logger.LogDebug($"Version is is incorrect: {handshake.HandshakeData.Version}."); + Logger.LogDebug($"Version is incorrect: {handshake.HandshakeData.Version}."); return HandshakeValidationResult.InvalidVersion; } diff --git a/src/AElf.OS.Core/Network/Protocol/Types/PeerConnectionInfo.cs b/src/AElf.OS.Core/Network/Protocol/Types/PeerConnectionInfo.cs index 2e40d7b1d4..5dc30cf829 100644 --- a/src/AElf.OS.Core/Network/Protocol/Types/PeerConnectionInfo.cs +++ b/src/AElf.OS.Core/Network/Protocol/Types/PeerConnectionInfo.cs @@ -10,4 +10,9 @@ public class PeerConnectionInfo public bool IsInbound { get; set; } public byte[] SessionId { get; set; } public string NodeVersion { get; set; } + + public override string ToString() + { + return $"key: {Pubkey.Substring(0, 45)}..."; + } } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/AElf.OS.Network.Grpc.csproj b/src/AElf.OS.Network.Grpc/AElf.OS.Network.Grpc.csproj index ae33d21245..e4a8dfcc99 100644 --- a/src/AElf.OS.Network.Grpc/AElf.OS.Network.Grpc.csproj +++ b/src/AElf.OS.Network.Grpc/AElf.OS.Network.Grpc.csproj @@ -1,5 +1,5 @@ - + net6.0 @@ -9,14 +9,14 @@ - - - - + + + + - + @@ -26,13 +26,13 @@ - - + + - + diff --git a/src/AElf.OS.Network.Grpc/Connection/ConnectionService.cs b/src/AElf.OS.Network.Grpc/Connection/ConnectionService.cs index bbd01152ac..8ef335c66d 100644 --- a/src/AElf.OS.Network.Grpc/Connection/ConnectionService.cs +++ b/src/AElf.OS.Network.Grpc/Connection/ConnectionService.cs @@ -2,11 +2,14 @@ using System.Linq; using System.Net; using System.Threading.Tasks; +using AElf.Kernel; using AElf.OS.Network.Application; using AElf.OS.Network.Events; +using AElf.OS.Network.Grpc.Helpers; using AElf.OS.Network.Infrastructure; using AElf.OS.Network.Protocol; using AElf.OS.Network.Types; +using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; @@ -167,7 +170,7 @@ so either P1s dial or P2s. */ return true; } - public async Task DoHandshakeAsync(DnsEndPoint endpoint, Handshake handshake) + private async Task ValidateHandshakeAsync(DnsEndPoint endpoint, Handshake handshake) { // validate the handshake (signature, chain id...) var handshakeValidationResult = await _handshakeProvider.ValidateHandshakeAsync(handshake); @@ -178,17 +181,22 @@ public async Task DoHandshakeAsync(DnsEndPoint endpoint, Handsha } Logger.LogDebug($"peer {endpoint} sent a valid handshake {handshake}"); + return null; + } - var pubkey = handshake.HandshakeData.Pubkey.ToHex(); + public async Task DoHandshakeAsync(DnsEndPoint endpoint, Handshake handshake) + { + var preCheckRes = await ValidateHandshakeAsync(endpoint, handshake); + if (preCheckRes != null) + return preCheckRes; - // keep the healthy peer. - var currentPeer = _peerPool.FindPeerByPublicKey(pubkey); - if (currentPeer != null) + if (handshake.HandshakeData.NodeVersion.GreaterThanSupportStreamMinVersion(NetworkOptions.SupportStreamMinVersion)) { - Logger.LogDebug($"Peer: {pubkey} already in peer pool, repeated connection {endpoint}"); - return new HandshakeReply { Error = HandshakeError.RepeatedConnection }; + Logger.LogDebug("receive nodeversion>={version} handshake, will upgrade to stream, remote={endpoint}", KernelConstants.SupportStreamMinVersion, endpoint.Host); + return new HandshakeReply { Handshake = await _handshakeProvider.GetHandshakeAsync(), Error = HandshakeError.HandshakeOk }; } + var pubkey = handshake.HandshakeData.Pubkey.ToHex(); try { // mark the (IP; pubkey) pair as currently handshaking @@ -205,8 +213,24 @@ public async Task DoHandshakeAsync(DnsEndPoint endpoint, Handsha return new HandshakeReply { Error = HandshakeError.InvalidConnection }; } + var oldPeer = _peerPool.FindPeerByPublicKey(pubkey); + if (oldPeer != null) + { + var oldPeerIsStream = oldPeer is GrpcStreamBackPeer; + if (oldPeerIsStream && oldPeer.IsInvalid && _peerPool.TryReplace(pubkey, oldPeer, grpcPeer)) + { + await oldPeer.DisconnectAsync(false); + Logger.LogDebug("replace connection, oldPeerIsP2P={oldPeerIsStream} IsInvalid={IsInvalid} {pubkey}.", oldPeerIsStream, oldPeer.IsInvalid, grpcPeer.Info.Pubkey); + } + else + { + Logger.LogDebug("Stopping connection, peer already in the pool oldPeerIsP2P={oldPeerIsStream} IsInvalid={IsInvalid} {pubkey}.", oldPeerIsStream, oldPeer.IsInvalid, grpcPeer.Info.Pubkey); + await grpcPeer.DisconnectAsync(false); + return new HandshakeReply { Error = HandshakeError.RepeatedConnection }; + } + } // add the new peer to the pool - if (!_peerPool.TryAddPeer(grpcPeer)) + else if (!_peerPool.TryAddPeer(grpcPeer)) { Logger.LogDebug($"Stopping connection, peer already in the pool {grpcPeer.Info.Pubkey}."); await grpcPeer.DisconnectAsync(false); @@ -230,6 +254,67 @@ public async Task DoHandshakeAsync(DnsEndPoint endpoint, Handsha } } + public async Task DoHandshakeByStreamAsync(DnsEndPoint endpoint, IAsyncStreamWriter responseStream, Handshake handshake) + { + var preCheckRes = await ValidateHandshakeAsync(endpoint, handshake); + if (preCheckRes != null) + return preCheckRes; + + var pubkey = handshake.HandshakeData.Pubkey.ToHex(); + try + { + // mark the (IP; pubkey) pair as currently handshaking + if (!_peerPool.AddHandshakingPeer(endpoint.Host, pubkey)) + return new HandshakeReply { Error = HandshakeError.ConnectionRefused }; + + // create the connection to the peer + var grpcPeer = await _peerDialer.DialBackPeerByStreamAsync(new AElfPeerEndpoint(endpoint.Host, handshake.HandshakeData.ListeningPort), responseStream, handshake); + + if (grpcPeer == null) + { + Logger.LogWarning("Could not dial back to stream {pubkey}.", pubkey); + return new HandshakeReply { Error = HandshakeError.InvalidConnection }; + } + + var oldPeer = _peerPool.FindPeerByPublicKey(pubkey); + if (oldPeer != null) + { + var oldPeerIsStream = oldPeer is GrpcStreamBackPeer; + if (_peerPool.TryReplace(pubkey, oldPeer, grpcPeer)) + { + await oldPeer.DisconnectAsync(false); + Logger.LogDebug("replace connection, oldPeerIsP2P={oldPeerIsStream} IsInvalid={IsInvalid} {pubkey}.", oldPeerIsStream, oldPeer.IsInvalid, grpcPeer.Info.Pubkey); + } + else + { + Logger.LogDebug("Stopping connection, peer already in the pool oldPeerIsP2P={oldPeerIsStream} IsInvalid={IsInvalid} {pubkey}.", oldPeerIsStream, oldPeer.IsInvalid, grpcPeer.Info.Pubkey); + await grpcPeer.DisconnectAsync(false); + return new HandshakeReply { Error = HandshakeError.RepeatedConnection }; + } + } + else if (!_peerPool.TryAddPeer(grpcPeer)) // add the new peer to the pool + { + Logger.LogDebug("Stopping connection, peer already in the pool {pubkey}.", grpcPeer.Info.Pubkey); + await grpcPeer.DisconnectAsync(false); + return new HandshakeReply { Error = HandshakeError.RepeatedConnection }; + } + + Logger.LogDebug($"Added to pool {grpcPeer.RemoteEndpoint} - {grpcPeer.Info.Pubkey}."); + + // send back our handshake + var replyHandshake = await _handshakeProvider.GetHandshakeAsync(); + grpcPeer.InboundSessionId = replyHandshake.SessionId.ToByteArray(); + grpcPeer.UpdateLastSentHandshake(replyHandshake); + + Logger.LogDebug("Sending back handshake to {pubkey}.", pubkey); + return new HandshakeReply { Handshake = replyHandshake, Error = HandshakeError.HandshakeOk }; + } + finally + { + _peerPool.RemoveHandshakingPeer(endpoint.Host, pubkey); + } + } + public void ConfirmHandshake(string peerPubkey) { var peer = _peerPool.FindPeerByPublicKey(peerPubkey) as GrpcPeer; @@ -263,6 +348,12 @@ public async Task RemovePeerAsync(string pubkey) await peer.DisconnectAsync(false); } + public async Task BuildStreamForPeerAsync(IPeer peer) + { + if (peer is GrpcStreamPeer streamPeer) return await _peerDialer.BuildStreamForPeerAsync(streamPeer); + return false; + } + public async Task CheckEndpointAvailableAsync(DnsEndPoint endpoint) { return await _peerDialer.CheckEndpointAvailableAsync(endpoint); diff --git a/src/AElf.OS.Network.Grpc/Connection/IConnectionService.cs b/src/AElf.OS.Network.Grpc/Connection/IConnectionService.cs index 0623af748a..82a6c85bd2 100644 --- a/src/AElf.OS.Network.Grpc/Connection/IConnectionService.cs +++ b/src/AElf.OS.Network.Grpc/Connection/IConnectionService.cs @@ -1,6 +1,7 @@ using System.Net; using System.Threading.Tasks; using AElf.OS.Network.Infrastructure; +using Grpc.Core; namespace AElf.OS.Network.Grpc; @@ -10,10 +11,12 @@ public interface IConnectionService Task DisconnectAsync(IPeer peer, bool sendDisconnect = false); Task SchedulePeerReconnection(DnsEndPoint endpoint); Task TrySchedulePeerReconnectionAsync(IPeer peer); - Task ConnectAsync(DnsEndPoint endpoint); + Task ConnectAsync(DnsEndPoint endpoint); Task DoHandshakeAsync(DnsEndPoint endpoint, Handshake handshake); + Task DoHandshakeByStreamAsync(DnsEndPoint endpoint, IAsyncStreamWriter responseStream, Handshake handshake); void ConfirmHandshake(string peerPubkey); Task DisconnectPeersAsync(bool gracefulDisconnect); Task CheckEndpointAvailableAsync(DnsEndPoint endpoint); Task RemovePeerAsync(string pubkey); + Task BuildStreamForPeerAsync(IPeer peer); } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Connection/IPeerDialer.cs b/src/AElf.OS.Network.Grpc/Connection/IPeerDialer.cs index 7d3d70b755..49c086de1a 100644 --- a/src/AElf.OS.Network.Grpc/Connection/IPeerDialer.cs +++ b/src/AElf.OS.Network.Grpc/Connection/IPeerDialer.cs @@ -1,5 +1,6 @@ using System.Net; using System.Threading.Tasks; +using Grpc.Core; namespace AElf.OS.Network.Grpc; @@ -7,5 +8,7 @@ public interface IPeerDialer { Task DialPeerAsync(DnsEndPoint remoteEndpoint); Task DialBackPeerAsync(DnsEndPoint remoteEndpoint, Handshake handshake); + Task DialBackPeerByStreamAsync(DnsEndPoint remoteEndPoint, IAsyncStreamWriter responseStream, Handshake handshake); Task CheckEndpointAvailableAsync(DnsEndPoint remoteEndpoint); + Task BuildStreamForPeerAsync(GrpcStreamPeer streamPeer, AsyncDuplexStreamingCall call=null); } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs b/src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs index c5f5615f94..9faff908e6 100644 --- a/src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs +++ b/src/AElf.OS.Network.Grpc/Connection/PeerDialer.cs @@ -8,15 +8,19 @@ using AElf.Kernel; using AElf.Kernel.Account.Application; using AElf.Kernel.SmartContract; +using AElf.OS.Network.Events; using AElf.OS.Network.Grpc.Helpers; using AElf.OS.Network.Protocol; using AElf.OS.Network.Protocol.Types; +using Google.Protobuf; using Grpc.Core; using Grpc.Core.Interceptors; +using Grpc.Core.Utils; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Org.BouncyCastle.X509; +using Volo.Abp.EventBus.Local; using Volo.Abp.Threading; namespace AElf.OS.Network.Grpc; @@ -29,18 +33,18 @@ public class PeerDialer : IPeerDialer { private readonly IAccountService _accountService; private readonly IHandshakeProvider _handshakeProvider; - - private KeyCertificatePair _clientKeyCertificatePair; + private readonly IStreamTaskResourcePool _streamTaskResourcePool; + public ILocalEventBus EventBus { get; set; } public PeerDialer(IAccountService accountService, - IHandshakeProvider handshakeProvider) + IHandshakeProvider handshakeProvider, IStreamTaskResourcePool streamTaskResourcePool) { _accountService = accountService; _handshakeProvider = handshakeProvider; + _streamTaskResourcePool = streamTaskResourcePool; + EventBus = NullLocalEventBus.Instance; Logger = NullLogger.Instance; - - CreateClientKeyCertificatePair(); } private NetworkOptions NetworkOptions => NetworkOptionsSnapshot.Value; @@ -63,23 +67,13 @@ public async Task DialPeerAsync(DnsEndPoint remoteEndpoint) var handshake = await _handshakeProvider.GetHandshakeAsync(); var handshakeReply = await CallDoHandshakeAsync(client, remoteEndpoint, handshake); - // verify handshake - if (handshakeReply.Error != HandshakeError.HandshakeOk) + if (!await ProcessHandshakeReplyAsync(handshakeReply, remoteEndpoint)) { - Logger.LogWarning($"Handshake error: {remoteEndpoint} {handshakeReply.Error}."); await client.Channel.ShutdownAsync(); return null; } - if (await _handshakeProvider.ValidateHandshakeAsync(handshakeReply.Handshake) != - HandshakeValidationResult.Ok) - { - Logger.LogWarning($"Connect error: {remoteEndpoint} {handshakeReply}."); - await client.Channel.ShutdownAsync(); - return null; - } - - var peer = new GrpcPeer(client, remoteEndpoint, new PeerConnectionInfo + var connectionInfo = new PeerConnectionInfo { Pubkey = handshakeReply.Handshake.HandshakeData.Pubkey.ToHex(), ConnectionTime = TimestampHelper.GetUtcNow(), @@ -87,12 +81,76 @@ public async Task DialPeerAsync(DnsEndPoint remoteEndpoint) SessionId = handshakeReply.Handshake.SessionId.ToByteArray(), IsInbound = false, NodeVersion = handshakeReply.Handshake.HandshakeData.NodeVersion - }); + }; + GrpcPeer peer; - peer.UpdateLastReceivedHandshake(handshakeReply.Handshake); + if (UpgradeToStream(handshake, handshakeReply.Handshake)) + { + peer = await DailStreamPeerAsync(client, remoteEndpoint, connectionInfo); + if (peer == null) return peer; + } + else + { + peer = new GrpcPeer(client, remoteEndpoint, connectionInfo); + peer.InboundSessionId = handshake.SessionId.ToByteArray(); + } - peer.InboundSessionId = handshake.SessionId.ToByteArray(); + + Logger.LogDebug("peer sessionId {InboundSessionId} {sessionId}", peer.InboundSessionId.ToHex(), connectionInfo.SessionId.ToHex()); + peer.UpdateLastReceivedHandshake(handshakeReply.Handshake); peer.UpdateLastSentHandshake(handshake); + return peer; + } + + private async Task ProcessHandshakeReplyAsync(HandshakeReply handshakeReply, DnsEndPoint remoteEndpoint) + { + // verify handshake + if (handshakeReply.Error != HandshakeError.HandshakeOk) + { + Logger.LogWarning("Handshake error: {remoteEndpoint} {Error}.", remoteEndpoint, handshakeReply.Error); + + return false; + } + + if (await _handshakeProvider.ValidateHandshakeAsync(handshakeReply.Handshake) == + HandshakeValidationResult.Ok) return true; + Logger.LogWarning("Connect error: {remoteEndpoint} {handshakeReply}.", remoteEndpoint, handshakeReply); + return false; + } + + + public async Task DialBackPeerByStreamAsync(DnsEndPoint remoteEndpoint, IAsyncStreamWriter responseStream, Handshake handshake) + { + Logger.LogWarning("receive stream ping reply"); + var info = new PeerConnectionInfo + { + Pubkey = handshake.HandshakeData.Pubkey.ToHex(), + ConnectionTime = TimestampHelper.GetUtcNow(), + SessionId = handshake.SessionId.ToByteArray(), + ProtocolVersion = handshake.HandshakeData.Version, + IsInbound = true, + NodeVersion = handshake.HandshakeData.NodeVersion + }; + var nodePubkey = (await _accountService.GetPublicKeyAsync()).ToHex(); + var meta = new Dictionary() + { + { GrpcConstants.PubkeyMetadataKey, nodePubkey }, + { GrpcConstants.PeerInfoMetadataKey, info.ToString() } + }; + Logger.LogWarning("DialBackPeerByStreamAsync meta={meta}", meta); + var peer = new GrpcStreamBackPeer(remoteEndpoint, info, responseStream, _streamTaskResourcePool, meta); + peer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) => + { + if (ex == null) + Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, peer, + CommonHelper.GetRequestLatency(streamMessage.RequestId)); + else + { + Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, peer); + await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, peer), false); + } + }); + peer.UpdateLastReceivedHandshake(handshake); return peer; } @@ -103,7 +161,6 @@ public async Task CheckEndpointAvailableAsync(DnsEndPoint remoteEndpoint) if (client == null) return false; - try { await PingNodeAsync(client, remoteEndpoint); @@ -141,11 +198,6 @@ public async Task DialBackPeerAsync(DnsEndPoint remoteEndpoint, Handsh return peer; } - private void CreateClientKeyCertificatePair() - { - _clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair(); - } - /// /// Calls the server side DoHandshake RPC method, in order to establish a 2-way connection. /// @@ -177,6 +229,85 @@ private async Task CallDoHandshakeAsync(GrpcClient client, DnsEn return handshakeReply; } + private bool UpgradeToStream(Handshake handshake, Handshake handshakeReply) + { + return handshake.HandshakeData.NodeVersion.GreaterThanSupportStreamMinVersion(NetworkOptions.SupportStreamMinVersion) && + handshakeReply.HandshakeData.NodeVersion.GreaterThanSupportStreamMinVersion(NetworkOptions.SupportStreamMinVersion); + } + + private async Task DailStreamPeerAsync(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInfo connectionInfo) + { + try + { + var nodePubkey = (await _accountService.GetPublicKeyAsync()).ToHex(); + var call = client.Client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue)); + var streamPeer = new GrpcStreamPeer(client, remoteEndpoint, connectionInfo, call, null, _streamTaskResourcePool, + new Dictionary() + { + { GrpcConstants.PubkeyMetadataKey, nodePubkey }, + { GrpcConstants.PeerInfoMetadataKey, connectionInfo.ToString() } + }); + streamPeer.SetStreamSendCallBack(async (ex, streamMessage, callTimes) => + { + if (ex == null) + Logger.LogDebug("streamRequest write success {times}-{requestId}-{messageType}-{this}-{latency}", callTimes, streamMessage.RequestId, streamMessage.MessageType, streamPeer, + CommonHelper.GetRequestLatency(streamMessage.RequestId)); + else + { + Logger.LogError(ex, "streamRequest write fail, {requestId}-{messageType}-{this}", streamMessage.RequestId, streamMessage.MessageType, streamPeer); + await EventBus.PublishAsync(new StreamPeerExceptionEvent(ex, streamPeer), false); + } + }); + var success = await BuildStreamForPeerAsync(streamPeer, call); + return success ? streamPeer : null; + } + catch (Exception e) + { + Logger.LogError(e, "stream handle shake failed {remoteEndpoint}", remoteEndpoint); + if (client.Channel.State == ChannelState.Idle || client.Channel.State == ChannelState.Ready) + await client.Channel.ShutdownAsync(); + throw; + } + } + + public async Task BuildStreamForPeerAsync(GrpcStreamPeer streamPeer, AsyncDuplexStreamingCall call = null) + { + call ??= streamPeer.BuildCall(); + if (call == null) return false; + var tokenSource = new CancellationTokenSource(); + Task.Run(async () => + { + try + { + await call.ResponseStream.ForEachAsync(async req => + { + Logger.LogDebug("listenReceive request={requestId} {streamType}-{messageType} latency={latency}", req.RequestId, req.StreamType, req.MessageType, CommonHelper.GetRequestLatency(req.RequestId)); + await EventBus.PublishAsync(new StreamMessageReceivedEvent(req.ToByteString(), streamPeer.Info.Pubkey, req.RequestId), false); + }); + Logger.LogWarning("listen end and complete {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString()); + } + catch (Exception e) + { + if (e is RpcException exception) + await EventBus.PublishAsync(new StreamPeerExceptionEvent(streamPeer.HandleRpcException(exception, "listen err {remoteEndPoint}"), streamPeer)); + Logger.LogError(e, "listen err {remoteEndPoint}", streamPeer.RemoteEndpoint.ToString()); + } + }, tokenSource.Token); + streamPeer.StartServe(tokenSource); + var handshake = await _handshakeProvider.GetHandshakeAsync(); + var handShakeReply = await streamPeer.HandShakeAsync(new HandshakeRequest { Handshake = handshake }); + if (!await ProcessHandshakeReplyAsync(handShakeReply, streamPeer.RemoteEndpoint)) + { + await streamPeer.DisconnectAsync(true); + return false; + } + + streamPeer.InboundSessionId = handshake.SessionId.ToByteArray(); + streamPeer.Info.SessionId = handShakeReply.Handshake.SessionId.ToByteArray(); + Logger.LogInformation("streaming Handshake to {remoteEndPoint} successful.sessionInfo {InboundSessionId} {SessionId}", streamPeer.RemoteEndpoint.ToString(), streamPeer.InboundSessionId.ToHex(), streamPeer.Info.SessionId.ToHex()); + return true; + } + /// /// Checks that the distant node is reachable by pinging it. @@ -216,14 +347,19 @@ private async Task CreateClientAsync(DnsEndPoint remoteEndpoint) return null; Logger.LogDebug($"Upgrading connection to TLS: {certificate}."); + var clientKeyCertificatePair = TlsHelper.GenerateKeyCertificatePair(); ChannelCredentials credentials = - new SslCredentials(TlsHelper.ObjectToPem(certificate), _clientKeyCertificatePair); + new SslCredentials(TlsHelper.ObjectToPem(certificate), clientKeyCertificatePair); var channel = new Channel(remoteEndpoint.ToString(), credentials, new List { new(ChannelOptions.MaxSendMessageLength, GrpcConstants.DefaultMaxSendMessageLength), new(ChannelOptions.MaxReceiveMessageLength, GrpcConstants.DefaultMaxReceiveMessageLength), - new(ChannelOptions.SslTargetNameOverride, GrpcConstants.DefaultTlsCommonName) + new(ChannelOptions.SslTargetNameOverride, GrpcConstants.DefaultTlsCommonName), + new(GrpcConstants.GrpcArgKeepalivePermitWithoutCalls, GrpcConstants.GrpcArgKeepalivePermitWithoutCallsOpen), + new(GrpcConstants.GrpcArgHttp2MaxPingsWithoutData, GrpcConstants.GrpcArgHttp2MaxPingsWithoutDataVal), + new(GrpcConstants.GrpcArgKeepaliveTimeoutMs, GrpcConstants.GrpcArgKeepaliveTimeoutMsVal), + new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal), }); var nodePubkey = AsyncHelper.RunSync(() => _accountService.GetPublicKeyAsync()).ToHex(); @@ -235,7 +371,6 @@ private async Task CreateClientAsync(DnsEndPoint remoteEndpoint) }).Intercept(new RetryInterceptor()); var client = new PeerService.PeerServiceClient(interceptedChannel); - return new GrpcClient(channel, client, certificate); } diff --git a/src/AElf.OS.Network.Grpc/GrpcConstants.cs b/src/AElf.OS.Network.Grpc/GrpcConstants.cs index f7f263d075..ceb29ab69a 100644 --- a/src/AElf.OS.Network.Grpc/GrpcConstants.cs +++ b/src/AElf.OS.Network.Grpc/GrpcConstants.cs @@ -9,6 +9,18 @@ public static class GrpcConstants public const string RetryCountMetadataKey = "retry-count"; public const string GrpcRequestCompressKey = "grpc-internal-encoding-request"; + public const string GrpcArgKeepalivePermitWithoutCalls = "grpc.keepalive_permit_without_calls"; + public const string GrpcArgHttp2MaxPingsWithoutData = "grpc.http2_max_pings_without_data"; + public const string GrpcArgKeepaliveTimeoutMs = "grpc.keepalive_timeout_ms"; + public const string GrpcArgKeepaliveTimeMs = "grpc.keepalive_time_ms"; + // public const string GrpcArgHttp2WriteBufferSize = "grpc.http2.write_buffer_size"; + + public const int GrpcArgKeepalivePermitWithoutCallsOpen = 1; + public const int GrpcArgHttp2MaxPingsWithoutDataVal = 0; + public const int GrpcArgKeepaliveTimeoutMsVal = 60 * 1000; + public const int GrpcArgKeepaliveTimeMsVal = 2 * 60 * 60 * 1000; + // public const int GrpcArgHttp2WriteBufferSizeVal = 6 * 1024; + public const string GrpcGzipConst = "gzip"; public const int DefaultRequestTimeout = 200; diff --git a/src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs b/src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs index a9aa0f7527..4afd933403 100644 --- a/src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs +++ b/src/AElf.OS.Network.Grpc/GrpcNetworkModule.cs @@ -16,10 +16,27 @@ public override void ConfigureServices(ServiceConfigurationContext context) context.Services.AddSingleton(); // Internal dependencies - context.Services.AddTransient(); + context.Services.AddSingleton(); context.Services.AddSingleton(); context.Services.AddSingleton(); context.Services.AddSingleton(); + + ConfigureStreamMethods(context); + } + + private void ConfigureStreamMethods(ServiceConfigurationContext context) + { + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); + context.Services.AddSingleton(); } } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/GrpcNetworkServer.cs b/src/AElf.OS.Network.Grpc/GrpcNetworkServer.cs index f3ece8513d..4f7c66aec1 100644 --- a/src/AElf.OS.Network.Grpc/GrpcNetworkServer.cs +++ b/src/AElf.OS.Network.Grpc/GrpcNetworkServer.cs @@ -120,7 +120,11 @@ internal Task StartListeningAsync() var serverOptions = new List { new(ChannelOptions.MaxSendMessageLength, GrpcConstants.DefaultMaxSendMessageLength), - new(ChannelOptions.MaxReceiveMessageLength, GrpcConstants.DefaultMaxReceiveMessageLength) + new(ChannelOptions.MaxReceiveMessageLength, GrpcConstants.DefaultMaxReceiveMessageLength), + new(GrpcConstants.GrpcArgKeepalivePermitWithoutCalls, GrpcConstants.GrpcArgKeepalivePermitWithoutCallsOpen), + new(GrpcConstants.GrpcArgHttp2MaxPingsWithoutData, GrpcConstants.GrpcArgHttp2MaxPingsWithoutDataVal), + new(GrpcConstants.GrpcArgKeepaliveTimeoutMs, GrpcConstants.GrpcArgKeepaliveTimeoutMsVal), + new(GrpcConstants.GrpcArgKeepaliveTimeMs, GrpcConstants.GrpcArgKeepaliveTimeMsVal) }; // setup service @@ -184,4 +188,9 @@ private async Task DialBootNodesAsync() await Task.WhenAll(taskList.ToArray()); } + + public async Task BuildStreamForPeerAsync(IPeer peer) + { + return await _connectionService.BuildStreamForPeerAsync(peer); + } } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Helpers/CommonHelper.cs b/src/AElf.OS.Network.Grpc/Helpers/CommonHelper.cs new file mode 100644 index 0000000000..5ed1c8f205 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Helpers/CommonHelper.cs @@ -0,0 +1,26 @@ +using System; +using AElf.Kernel; + +namespace AElf.OS.Network.Grpc.Helpers; + +public static class CommonHelper +{ + public static string GenerateRequestId() + { + var timeMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + var guid = Guid.NewGuid().ToString(); + return timeMs.ToString() + '_' + guid; + } + + public static long GetRequestLatency(string requestId) + { + var sp = requestId.Split("_"); + if (sp.Length != 2) return -1; + return long.TryParse(sp[0], out var start) ? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start : -1; + } + + public static bool GreaterThanSupportStreamMinVersion(this string version, string minVersion) + { + return Version.Parse(version).CompareTo(Version.Parse(minVersion)) >= 0; + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Interceptors/AuthInterceptor.cs b/src/AElf.OS.Network.Grpc/Interceptors/AuthInterceptor.cs index 4ff85ff911..b0cbadead1 100644 --- a/src/AElf.OS.Network.Grpc/Interceptors/AuthInterceptor.cs +++ b/src/AElf.OS.Network.Grpc/Interceptors/AuthInterceptor.cs @@ -72,7 +72,8 @@ public override Task UnaryServerHandler(TRequest private bool IsNeedAuth(string methodName) { return methodName != GetFullMethodName(nameof(PeerService.PeerServiceBase.Ping)) && - methodName != GetFullMethodName(nameof(PeerService.PeerServiceBase.DoHandshake)); + methodName != GetFullMethodName(nameof(PeerService.PeerServiceBase.DoHandshake)) && + methodName != GetFullMethodName(nameof(PeerService.PeerServiceBase.RequestByStream));//we can not read stream sessionId here so we auth it in Stream service } private string GetFullMethodName(string methodName) diff --git a/src/AElf.OS.Network.Grpc/GrpcPeer.cs b/src/AElf.OS.Network.Grpc/Peer/GrpcPeer.cs similarity index 91% rename from src/AElf.OS.Network.Grpc/GrpcPeer.cs rename to src/AElf.OS.Network.Grpc/Peer/GrpcPeer.cs index c518bb690e..2987bed14f 100644 --- a/src/AElf.OS.Network.Grpc/GrpcPeer.cs +++ b/src/AElf.OS.Network.Grpc/Peer/GrpcPeer.cs @@ -25,12 +25,12 @@ namespace AElf.OS.Network.Grpc; public class GrpcPeer : IPeer { private const int MaxMetricsPerMethod = 100; - private const int BlockRequestTimeout = 700; - private const int CheckHealthTimeout = 1000; - private const int BlocksRequestTimeout = 5000; - private const int GetNodesTimeout = 500; - private const int UpdateHandshakeTimeout = 3000; - private const int StreamRecoveryWaitTime = 500; + protected const int BlockRequestTimeout = 2000; + protected const int CheckHealthTimeout = 2000; + protected const int BlocksRequestTimeout = 5000; + protected const int GetNodesTimeout = 2000; + protected const int UpdateHandshakeTimeout = 3000; + protected const int StreamRecoveryWaitTime = 500; private const int BlockCacheMaxItems = 1024; private const int TransactionCacheMaxItems = 10_000; @@ -38,8 +38,8 @@ public class GrpcPeer : IPeer private const int QueuedTransactionTimeout = 10_000; private const int QueuedBlockTimeout = 100_000; - private readonly Channel _channel; - private readonly PeerService.PeerServiceClient _client; + protected readonly Channel _channel; + protected readonly PeerService.PeerServiceClient _client; private readonly BoundedExpirationCache _knownBlockCache; private readonly BoundedExpirationCache _knownTransactionCache; @@ -56,8 +56,8 @@ public class GrpcPeer : IPeer public GrpcPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInfo peerConnectionInfo) { - _channel = client.Channel; - _client = client.Client; + _channel = client?.Channel; + _client = client?.Client; RemoteEndpoint = remoteEndpoint; Info = peerConnectionInfo; @@ -108,14 +108,14 @@ public GrpcPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInf /// Property that describes that describes if the peer is ready for send/request operations. It's based /// on the state of the underlying channel and the IsConnected. /// - public bool IsReady => (_channel.State == ChannelState.Idle || _channel.State == ChannelState.Ready) && IsConnected; + public bool IsReady => _channel != null ? (_channel.State == ChannelState.Idle || _channel.State == ChannelState.Ready) && IsConnected : IsConnected; public bool IsInvalid => !IsConnected && Info.ConnectionTime.AddMilliseconds(NetworkConstants.PeerConnectionTimeout) < TimestampHelper.GetUtcNow(); - public string ConnectionStatus => _channel.State.ToString(); + public virtual string ConnectionStatus => _channel != null ? _channel.State.ToString() : ""; public Hash LastKnownLibHash { get; private set; } public long LastKnownLibHeight { get; private set; } @@ -134,7 +134,7 @@ public GrpcPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInf public int BufferedAnnouncementsCount => _sendAnnouncementJobs.InputCount; public PeerConnectionInfo Info { get; } - + public Dictionary> GetRequestMetrics() { var metrics = new Dictionary>(); @@ -149,7 +149,7 @@ public Dictionary> GetRequestMetrics() return metrics; } - public Task GetNodesAsync(int count = NetworkConstants.DefaultDiscoveryMaxNodesToRequest) + public virtual Task GetNodesAsync(int count = NetworkConstants.DefaultDiscoveryMaxNodesToRequest) { var request = new GrpcRequest { ErrorMessage = "Request nodes failed." }; var data = new Metadata @@ -169,7 +169,7 @@ public void UpdateLastKnownLib(LibAnnouncement libAnnouncement) LastKnownLibHeight = libAnnouncement.LibHeight; } - public async Task CheckHealthAsync() + public virtual async Task CheckHealthAsync() { var request = new GrpcRequest { ErrorMessage = "Check health failed." }; @@ -182,7 +182,7 @@ public async Task CheckHealthAsync() await RequestAsync(() => _client.CheckHealthAsync(new HealthCheckRequest(), data), request); } - public async Task GetBlockByHashAsync(Hash hash) + public virtual async Task GetBlockByHashAsync(Hash hash) { var blockRequest = new BlockRequest { Hash = hash }; @@ -204,7 +204,7 @@ public async Task GetBlockByHashAsync(Hash hash) return blockReply?.Block; } - public async Task> GetBlocksAsync(Hash firstHash, int count) + public virtual async Task> GetBlocksAsync(Hash firstHash, int count) { var blockRequest = new BlocksRequest { PreviousBlockHash = firstHash, Count = count }; var blockInfo = $"{{ first: {firstHash}, count: {count} }}"; @@ -230,7 +230,7 @@ public async Task> GetBlocksAsync(Hash firstHash, in return list.Blocks.ToList(); } - public async Task TryRecoverAsync() + public virtual async Task TryRecoverAsync() { if (_channel.State == ChannelState.Shutdown) return false; @@ -268,7 +268,7 @@ public bool TryAddKnownTransaction(Hash transactionHash) return _knownTransactionCache.TryAdd(transactionHash); } - public async Task DisconnectAsync(bool gracefulDisconnect) + public virtual async Task DisconnectAsync(bool gracefulDisconnect) { IsConnected = false; IsShutdown = true; @@ -325,7 +325,7 @@ public void UpdateLastSentHandshake(Handshake handshake) LastSentHandshakeTime = handshake.HandshakeData.Time; } - public async Task ConfirmHandshakeAsync() + public virtual async Task ConfirmHandshakeAsync() { var request = new GrpcRequest { ErrorMessage = "Could not send confirm handshake." }; @@ -374,7 +374,7 @@ private async Task RequestAsync(Func> func, } } - private void RecordMetric(GrpcRequest grpcRequest, Timestamp requestStartTime, long elapsedMilliseconds) + protected virtual void RecordMetric(GrpcRequest grpcRequest, Timestamp requestStartTime, long elapsedMilliseconds) { var metrics = _recentRequestsRoundtripTimes[grpcRequest.MetricName]; @@ -394,7 +394,7 @@ private void RecordMetric(GrpcRequest grpcRequest, Timestamp requestStartTime, l /// This method handles the case where the peer is potentially down. If the Rpc call /// put the channel in TransientFailure or Connecting, we give the connection a certain time to recover. /// - private NetworkException HandleRpcException(RpcException exception, string errorMessage) + public virtual NetworkException HandleRpcException(RpcException exception, string errorMessage) { var message = $"Failed request to {this}: {errorMessage}"; var type = NetworkExceptionType.Rpc; @@ -443,7 +443,7 @@ public override string ToString() return $"{{ listening-port: {RemoteEndpoint}, key: {Info.Pubkey.Substring(0, 45)}... }}"; } - private enum MetricNames + protected enum MetricNames { GetBlocks, GetBlock @@ -522,7 +522,7 @@ private async Task SendStreamJobAsync(StreamJob job) job.SendCallback?.Invoke(null); } - private async Task BroadcastBlockAsync(BlockWithTransactions blockWithTransactions) + protected virtual async Task BroadcastBlockAsync(BlockWithTransactions blockWithTransactions) { if (_blockStreamCall == null) _blockStreamCall = _client.BlockBroadcastStream(new Metadata @@ -545,7 +545,7 @@ private async Task BroadcastBlockAsync(BlockWithTransactions blockWithTransactio /// Send a announcement to the peer using the stream call. /// Note: this method is not thread safe. /// - private async Task SendAnnouncementAsync(BlockAnnouncement header) + protected virtual async Task SendAnnouncementAsync(BlockAnnouncement header) { if (_announcementStreamCall == null) _announcementStreamCall = _client.AnnouncementBroadcastStream(new Metadata @@ -568,7 +568,7 @@ private async Task SendAnnouncementAsync(BlockAnnouncement header) /// Send a transaction to the peer using the stream call. /// Note: this method is not thread safe. /// - private async Task SendTransactionAsync(Transaction transaction) + protected virtual async Task SendTransactionAsync(Transaction transaction) { if (_transactionStreamCall == null) _transactionStreamCall = _client.TransactionBroadcastStream(new Metadata @@ -591,7 +591,7 @@ private async Task SendTransactionAsync(Transaction transaction) /// Send a lib announcement to the peer using the stream call. /// Note: this method is not thread safe. /// - public async Task SendLibAnnouncementAsync(LibAnnouncement libAnnouncement) + public virtual async Task SendLibAnnouncementAsync(LibAnnouncement libAnnouncement) { if (_libAnnouncementStreamCall == null) _libAnnouncementStreamCall = _client.LibAnnouncementBroadcastStream(new Metadata diff --git a/src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs b/src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs new file mode 100644 index 0000000000..9531141467 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Peer/GrpcStreamBackPeer.cs @@ -0,0 +1,104 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using AElf.OS.Network.Application; +using AElf.OS.Network.Grpc.Helpers; +using AElf.OS.Network.Protocol.Types; +using AElf.Types; +using Grpc.Core; + +namespace AElf.OS.Network.Grpc; + +public class GrpcStreamBackPeer : GrpcStreamPeer +{ + public GrpcStreamBackPeer(DnsEndPoint remoteEndpoint, PeerConnectionInfo peerConnectionInfo, + IAsyncStreamWriter clientStreamWriter, IStreamTaskResourcePool streamTaskResourcePool, + Dictionary peerMeta) + : base(null, remoteEndpoint, peerConnectionInfo, + null, clientStreamWriter, streamTaskResourcePool, peerMeta) + { + } + + public override string ConnectionStatus => IsConnected ? "Stream Ready" : "Stream Closed"; + + public override async Task CheckHealthAsync() + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"Check health failed.requestId={requestId}" }; + + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, CheckHealthTimeout.ToString() }, + }; + await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data, requestId), request); + } + + public override async Task> GetBlocksAsync(Hash firstHash, int count) + { + var blockRequest = new BlocksRequest { PreviousBlockHash = firstHash, Count = count }; + var blockInfo = $"{{ first: {firstHash}, count: {count} }}"; + + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest + { + ErrorMessage = $"Get blocks for {blockInfo} failed.requestId={requestId}", + MetricName = nameof(MetricNames.GetBlocks), + MetricInfo = $"Get blocks for {blockInfo}" + }; + + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, BlocksRequestTimeout.ToString() }, + }; + var listMessage = await RequestAsync(() => StreamRequestAsync(MessageType.RequestBlocks, blockRequest, data, requestId), request); + return listMessage != null ? BlockList.Parser.ParseFrom(listMessage.Message).Blocks.ToList() : new List(); + } + + public override async Task DisconnectAsync(bool gracefulDisconnect) + { + if (!IsConnected) return; + IsConnected = false; + _sendStreamJobs.Complete(); + // send disconnect message if the peer is still connected and the connection + // is stable. + try + { + await RequestAsync(() => StreamRequestAsync(MessageType.Disconnect, + new DisconnectReason { Why = DisconnectReason.Types.Reason.Shutdown }, + new Metadata { { GrpcConstants.SessionIdMetadataKey, OutboundSessionId } }), + new GrpcRequest { ErrorMessage = "Could not send disconnect." }); + } + catch (Exception) + { + // swallow the exception, we don't care because we're disconnecting. + } + } + + public override Task TryRecoverAsync() + { + return Task.FromResult(true); + } + + + public override NetworkException HandleRpcException(RpcException exception, string errorMessage) + { + var message = $"Failed request to {this}: {errorMessage}"; + var type = NetworkExceptionType.Rpc; + if (exception.StatusCode == + // there was an exception, not related to connectivity. + StatusCode.Cancelled) + { + message = $"Request was cancelled {this}: {errorMessage}"; + type = NetworkExceptionType.Unrecoverable; + } + else if (exception.StatusCode == StatusCode.Unknown) + { + message = $"Exception in handler {this}: {errorMessage}"; + type = NetworkExceptionType.Unrecoverable; + } + + return new NetworkException(message, exception, type); + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Peer/GrpcStreamPeer.cs b/src/AElf.OS.Network.Grpc/Peer/GrpcStreamPeer.cs new file mode 100644 index 0000000000..abd366cecd --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Peer/GrpcStreamPeer.cs @@ -0,0 +1,355 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using AElf.CSharp.Core.Extension; +using AElf.Kernel; +using AElf.OS.Network.Application; +using AElf.OS.Network.Grpc.Helpers; +using AElf.OS.Network.Protocol.Types; +using AElf.Types; +using Google.Protobuf; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace AElf.OS.Network.Grpc; + +public delegate void StreamSendCallBack(NetworkException ex, StreamMessage streamMessage, int callTimes = 0); + +public class GrpcStreamPeer : GrpcPeer +{ + private const int StreamWaitTime = 3000; + private const int TimeOutRetryTimes = 3; + + private AsyncDuplexStreamingCall _duplexStreamingCall; + private CancellationTokenSource _streamListenTaskTokenSource; + private IAsyncStreamWriter _clientStreamWriter; + + private readonly IStreamTaskResourcePool _streamTaskResourcePool; + private readonly Dictionary _peerMeta; + private StreamSendCallBack _streamSendCallBack; + + protected readonly ActionBlock _sendStreamJobs; + public ILogger Logger { get; set; } + + public GrpcStreamPeer(GrpcClient client, DnsEndPoint remoteEndpoint, PeerConnectionInfo peerConnectionInfo, + AsyncDuplexStreamingCall duplexStreamingCall, + IAsyncStreamWriter clientStreamWriter, + IStreamTaskResourcePool streamTaskResourcePool, Dictionary peerMeta) : base(client, + remoteEndpoint, peerConnectionInfo) + { + _duplexStreamingCall = duplexStreamingCall; + _clientStreamWriter = duplexStreamingCall?.RequestStream ?? clientStreamWriter; + _streamTaskResourcePool = streamTaskResourcePool; + _peerMeta = peerMeta; + _sendStreamJobs = new ActionBlock(WriteStreamJobAsync, new ExecutionDataflowBlockOptions + { + BoundedCapacity = NetworkConstants.DefaultMaxBufferedStreamCount + }); + Logger = NullLogger.Instance; + } + + public AsyncDuplexStreamingCall BuildCall() + { + _duplexStreamingCall = _client.RequestByStream(new CallOptions().WithDeadline(DateTime.MaxValue)); + _clientStreamWriter = _duplexStreamingCall.RequestStream; + return _duplexStreamingCall; + } + + public void StartServe(CancellationTokenSource listenTaskTokenSource) + { + _streamListenTaskTokenSource = listenTaskTokenSource; + } + + public void SetStreamSendCallBack(StreamSendCallBack callBack) + { + _streamSendCallBack = callBack; + } + + public async Task DisposeAsync() + { + _sendStreamJobs.Complete(); + await _duplexStreamingCall?.RequestStream?.CompleteAsync(); + _duplexStreamingCall?.Dispose(); + _streamListenTaskTokenSource?.Cancel(); + } + + public override async Task DisconnectAsync(bool gracefulDisconnect) + { + try + { + await RequestAsync(() => StreamRequestAsync(MessageType.Disconnect, + new DisconnectReason { Why = DisconnectReason.Types.Reason.Shutdown }, + new Metadata { { GrpcConstants.SessionIdMetadataKey, OutboundSessionId } }, null, true), + new GrpcRequest { ErrorMessage = "Could not send disconnect." }); + await DisposeAsync(); + } + catch (Exception) + { + // swallow the exception, we don't care because we're disconnecting. + } + + await base.DisconnectAsync(gracefulDisconnect); + } + + public async Task HandShakeAsync(HandshakeRequest request) + { + var metadata = new Metadata + { + { GrpcConstants.RetryCountMetadataKey, "0" }, + }; + var requestId = CommonHelper.GenerateRequestId(); + var grpcRequest = new GrpcRequest { ErrorMessage = $"handshake failed.requestId={requestId}" }; + var reply = await RequestAsync(() => StreamRequestAsync(MessageType.HandShake, request, metadata, requestId, true), grpcRequest); + return reply != null ? HandshakeReply.Parser.ParseFrom(reply.Message) : new HandshakeReply(); + } + + public override async Task ConfirmHandshakeAsync() + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"Could not send confirm handshake.requestId={requestId}" }; + + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, UpdateHandshakeTimeout.ToString() }, + }; + var reply = await RequestAsync(() => StreamRequestAsync(MessageType.ConfirmHandShake, new ConfirmHandshakeRequest(), data, requestId, true), request); + if (reply == null) throw new Exception("confirm handshake failed"); + } + + protected override async Task BroadcastBlockAsync(BlockWithTransactions blockWithTransactions) + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"broadcast block failed.requestId={requestId}" }; + await RequestAsync(() => StreamRequestAsync(MessageType.BlockBroadcast, blockWithTransactions, null, requestId), request); + } + + protected override async Task SendAnnouncementAsync(BlockAnnouncement header) + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"broadcast block announcement failed.requestId={requestId}" }; + await RequestAsync(() => StreamRequestAsync(MessageType.AnnouncementBroadcast, header, null, requestId), request); + } + + protected override async Task SendTransactionAsync(Transaction transaction) + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"broadcast transaction failed.requestId={requestId}" }; + await RequestAsync(() => StreamRequestAsync(MessageType.TransactionBroadcast, transaction, null, requestId), request); + } + + public override async Task SendLibAnnouncementAsync(LibAnnouncement libAnnouncement) + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"broadcast lib announcement failed. requestId={requestId}" }; + await RequestAsync(() => StreamRequestAsync(MessageType.LibAnnouncementBroadcast, libAnnouncement, null, requestId), request); + } + + + public override async Task GetBlockByHashAsync(Hash hash) + { + var blockRequest = new BlockRequest { Hash = hash }; + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest + { + ErrorMessage = $"Block request for {hash} failed. requestId={requestId}", + MetricName = nameof(MetricNames.GetBlock), + MetricInfo = $"Block request for {hash}" + }; + + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, BlockRequestTimeout.ToString() }, + }; + var blockMessage = await RequestAsync(() => StreamRequestAsync(MessageType.RequestBlock, blockRequest, data, requestId), request); + return blockMessage != null ? BlockReply.Parser.ParseFrom(blockMessage.Message).Block : new BlockWithTransactions(); + } + + public override async Task GetNodesAsync(int count = NetworkConstants.DefaultDiscoveryMaxNodesToRequest) + { + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"Request nodes failed.requestId={requestId}" }; + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, GetNodesTimeout.ToString() }, + }; + var listMessage = await RequestAsync(() => StreamRequestAsync(MessageType.GetNodes, new NodesRequest { MaxCount = count }, data, requestId), request); + return listMessage != null ? NodeList.Parser.ParseFrom(listMessage.Message) : null; + } + + public override async Task CheckHealthAsync() + { + await base.CheckHealthAsync(); + var requestId = CommonHelper.GenerateRequestId(); + var request = new GrpcRequest { ErrorMessage = $"Check health failed.requestId={requestId}" }; + + var data = new Metadata + { + { GrpcConstants.TimeoutMetadataKey, CheckHealthTimeout.ToString() }, + }; + await RequestAsync(() => StreamRequestAsync(MessageType.HealthCheck, new HealthCheckRequest(), data, requestId), request); + } + + public async Task WriteAsync(StreamMessage message, Action sendCallback) + { + await _sendStreamJobs.SendAsync(new StreamJob { StreamMessage = message, SendCallback = sendCallback }); + } + + private async Task WriteStreamJobAsync(StreamJob job) + { + //avoid write stream concurrency + try + { + if (job.StreamMessage == null) return; + Logger.LogDebug("write request={requestId} {streamType}-{messageType}", job.StreamMessage.RequestId, job.StreamMessage.StreamType, job.StreamMessage.MessageType); + + await _clientStreamWriter.WriteAsync(job.StreamMessage); + } + catch (RpcException ex) + { + job.SendCallback?.Invoke(HandleRpcException(ex, $"Could not write to stream to {this}, queueCount={_sendStreamJobs.InputCount}")); + return; + } + catch (Exception e) + { + var type = e is InvalidOperationException or TimeoutException ? NetworkExceptionType.PeerUnstable : NetworkExceptionType.HandlerException; + job.SendCallback?.Invoke( + new NetworkException($"{job.StreamMessage?.RequestId}{job.StreamMessage?.StreamType}-{job.StreamMessage?.MessageType} size={job.StreamMessage.ToByteArray().Length} queueCount={_sendStreamJobs.InputCount}", e, type)); + await Task.Delay(StreamRecoveryWaitTime); + return; + } + + job.SendCallback?.Invoke(null); + } + + protected async Task RequestAsync(Func> func, GrpcRequest request) + { + var recordRequestTime = !string.IsNullOrEmpty(request.MetricName); + var requestStartTime = TimestampHelper.GetUtcNow(); + var resp = await func(); + if (recordRequestTime) + RecordMetric(request, requestStartTime, (TimestampHelper.GetUtcNow() - requestStartTime).Milliseconds()); + return resp; + } + + protected async Task StreamRequestAsync(MessageType messageType, IMessage message, Metadata header = null, string requestId = null, bool graceful = false) + { + for (var i = 0; i < TimeOutRetryTimes; i++) + { + requestId = requestId == null || i > 0 ? CommonHelper.GenerateRequestId() : requestId; + var streamMessage = new StreamMessage { StreamType = StreamType.Request, MessageType = messageType, RequestId = requestId, Message = message.ToByteString() }; + AddAllHeaders(streamMessage, header); + TaskCompletionSource promise = new TaskCompletionSource(); + await _streamTaskResourcePool.RegistryTaskPromiseAsync(requestId, messageType, promise); + + await _sendStreamJobs.SendAsync(new StreamJob + { + StreamMessage = streamMessage, SendCallback = ex => + { + if (ex != null && !graceful) _streamSendCallBack?.Invoke(ex, streamMessage, i); + } + }); + var result = await _streamTaskResourcePool.GetResultAsync(promise, requestId, GetTimeOutFromHeader(header)); + if (result.Item1) + { + if (!graceful) _streamSendCallBack?.Invoke(null, streamMessage, i); + return result.Item2; + } + + if (i >= TimeOutRetryTimes - 1) + { + if (!graceful) _streamSendCallBack?.Invoke(new NetworkException($"streaming call time out requestId {requestId}-{messageType}-{this}", null, NetworkExceptionType.PeerUnstable), streamMessage, i); + return null; + } + + if (graceful) return null; + await Task.Delay(StreamRecoveryWaitTime); + } + + return null; + } + + private void AddAllHeaders(StreamMessage streamMessage, Metadata metadata = null) + { + foreach (var kv in _peerMeta) + { + streamMessage.Meta[kv.Key] = kv.Value; + } + + streamMessage.Meta[GrpcConstants.SessionIdMetadataKey] = OutboundSessionId.ToHex(); + if (metadata == null) return; + foreach (var e in metadata) + { + if (e.IsBinary) + { + streamMessage.Meta[e.Key] = e.ValueBytes.ToHex(); + continue; + } + + streamMessage.Meta[e.Key] = e.Value; + } + } + + private int GetTimeOutFromHeader(Metadata header) + { + if (header == null) return StreamWaitTime; + var t = header.Get(GrpcConstants.TimeoutMetadataKey)?.Value; + return t == null ? StreamWaitTime : int.Parse(t); + } + + public override NetworkException HandleRpcException(RpcException exception, string errorMessage) + { + var message = $"Failed request to {this}: {errorMessage}"; + NetworkExceptionType type; + + if (_channel.State != ChannelState.Ready) + { + // if channel has been shutdown (unrecoverable state) remove it. + if (_channel.State == ChannelState.Shutdown) + { + message = $"Peer is shutdown - {this}: {errorMessage}"; + type = NetworkExceptionType.Unrecoverable; + } + else if (_channel.State == ChannelState.TransientFailure || _channel.State == ChannelState.Connecting) + { + // from this we try to recover + message = $"Peer is unstable - {this}: {errorMessage}"; + type = NetworkExceptionType.PeerUnstable; + } + else + { + // if idle just after an exception, disconnect. + message = $"Peer idle, channel state {_channel.State} - {this}: {errorMessage}"; + type = NetworkExceptionType.Unrecoverable; + } + } + else + { + switch (exception.StatusCode) + { + case StatusCode.Unavailable: + case StatusCode.Cancelled: + message = $"Request was cancelled {this}: {errorMessage}"; + type = NetworkExceptionType.Unrecoverable; + break; + case StatusCode.Internal: + message = $"internal exception {this}: {errorMessage}"; + type = NetworkExceptionType.PeerUnstable; + break; + case StatusCode.DeadlineExceeded: + message = $"stream call timeout {this} : {errorMessage}"; + type = NetworkExceptionType.PeerUnstable; + break; + default: + message = $"Exception in handler {this} : {errorMessage}"; + type = NetworkExceptionType.HandlerException; + break; + } + } + + return new NetworkException(message, exception, type); + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/GrpcServerService.cs b/src/AElf.OS.Network.Grpc/Service/GrpcServerService.cs index c88affdddf..3c0b1d2c5a 100644 --- a/src/AElf.OS.Network.Grpc/Service/GrpcServerService.cs +++ b/src/AElf.OS.Network.Grpc/Service/GrpcServerService.cs @@ -1,14 +1,9 @@ using System; -using System.Collections.Generic; +using System.Net; using System.Threading.Tasks; -using AElf.Kernel.Blockchain.Application; -using AElf.Kernel.TransactionPool; -using AElf.OS.Network.Application; -using AElf.OS.Network.Domain; -using AElf.OS.Network.Events; -using AElf.OS.Network.Extensions; using AElf.OS.Network.Grpc.Helpers; using AElf.Types; +using Google.Protobuf; using Grpc.Core; using Grpc.Core.Utils; using Microsoft.Extensions.Logging; @@ -24,19 +19,15 @@ namespace AElf.OS.Network.Grpc; /// public class GrpcServerService : PeerService.PeerServiceBase { - private readonly IBlockchainService _blockchainService; private readonly IConnectionService _connectionService; - private readonly INodeManager _nodeManager; + private readonly IStreamService _streamService; + private readonly IGrpcRequestProcessor _grpcRequestProcessor; - private readonly ISyncStateService _syncStateService; - - public GrpcServerService(ISyncStateService syncStateService, IConnectionService connectionService, - IBlockchainService blockchainService, INodeManager nodeManager) + public GrpcServerService(IConnectionService connectionService, IStreamService streamService, IGrpcRequestProcessor grpcRequestProcessor) { - _syncStateService = syncStateService; _connectionService = connectionService; - _blockchainService = blockchainService; - _nodeManager = nodeManager; + _streamService = streamService; + _grpcRequestProcessor = grpcRequestProcessor; EventBus = NullLocalEventBus.Instance; Logger = NullLogger.Instance; @@ -44,7 +35,6 @@ public GrpcServerService(ISyncStateService syncStateService, IConnectionService private NetworkOptions NetworkOptions => NetworkOptionsSnapshot.Value; public IOptionsSnapshot NetworkOptionsSnapshot { get; set; } - public ILocalEventBus EventBus { get; set; } public ILogger Logger { get; set; } @@ -53,15 +43,8 @@ public override async Task DoHandshake(HandshakeRequest request, try { Logger.LogDebug($"Peer {context.Peer} has requested a handshake."); - - if (context.AuthContext?.Properties != null) - foreach (var authProperty in context.AuthContext.Properties) - Logger.LogDebug($"Auth property: {authProperty.Name} -> {authProperty.Value}"); - - if (!GrpcEndPointHelper.ParseDnsEndPoint(context.Peer, out var peerEndpoint)) - return new HandshakeReply { Error = HandshakeError.InvalidConnection }; - - return await _connectionService.DoHandshakeAsync(peerEndpoint, request.Handshake); + var authReply = AuthHandshake(request, context); + return authReply.Item1 ?? await _connectionService.DoHandshakeAsync(authReply.Item2, request.Handshake); } catch (Exception e) { @@ -70,33 +53,67 @@ public override async Task DoHandshake(HandshakeRequest request, } } - public override Task ConfirmHandshake(ConfirmHandshakeRequest request, - ServerCallContext context) + private Tuple AuthHandshake(HandshakeRequest request, ServerCallContext context) { + if (context.AuthContext?.Properties != null) + foreach (var authProperty in context.AuthContext.Properties) + Logger.LogDebug($"Auth property: {authProperty.Name} -> {authProperty.Value}"); + + return !GrpcEndPointHelper.ParseDnsEndPoint(context.Peer, out var peerEndpoint) + ? new Tuple(new HandshakeReply { Error = HandshakeError.InvalidConnection }, peerEndpoint) + : new Tuple(null, peerEndpoint); + } + + public override async Task RequestByStream(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + Logger.LogDebug("RequestByStream started with {peer}.", context.Peer); try { - Logger.LogDebug($"Peer {context.GetPeerInfo()} has requested a handshake confirmation."); - - _connectionService.ConfirmHandshake(context.GetPublicKey()); + await requestStream.ForEachAsync(async req => + { + Logger.LogDebug("receive request={requestId} {streamType}-{messageType}", req.RequestId, req.StreamType, req.MessageType); + if (req.MessageType == MessageType.HandShake) + { + var realRequest = HandshakeRequest.Parser.ParseFrom(req.Message); + var authReply = AuthHandshake(realRequest, context); + var handshakeReply = authReply.Item1 ?? await _connectionService.DoHandshakeByStreamAsync(authReply.Item2, responseStream, realRequest.Handshake); + Logger.LogDebug("request invoke success {reply}", handshakeReply.Handshake); + await responseStream.WriteAsync(new StreamMessage + { + // other stream message will come after handshake, so this write will not called concurrently with others + StreamType = StreamType.Reply, MessageType = req.MessageType, + RequestId = req.RequestId, Message = handshakeReply.ToByteString() + }); + } + else + { + _streamService.ProcessStreamRequestAsync(req, context); + } + }); } catch (Exception e) { - Logger.LogWarning(e, $"Confirm handshake error - {context.GetPeerInfo()}: "); + Logger.LogError(e, "RequestByStream error - {peer}: ", context.Peer); throw; } - return Task.FromResult(new VoidReply()); + Logger.LogDebug("RequestByStream finished with {peer}.", context.Peer); + } + + public override Task ConfirmHandshake(ConfirmHandshakeRequest request, + ServerCallContext context) + { + return _grpcRequestProcessor.ConfirmHandshakeAsync(context.GetPeerInfo(), context.GetPublicKey()); } public override async Task BlockBroadcastStream( IAsyncStreamReader requestStream, ServerCallContext context) { - Logger.LogDebug($"Block stream started with {context.GetPeerInfo()} - {context.Peer}."); - try { + Logger.LogDebug($"Block stream started with {context.GetPeerInfo()} - {context.Peer}."); var peerPubkey = context.GetPublicKey(); - await requestStream.ForEachAsync(async block => await ProcessBlockAsync(block, peerPubkey)); + await requestStream.ForEachAsync(async block => await _grpcRequestProcessor.ProcessBlockAsync(block, peerPubkey)); } catch (Exception e) { @@ -109,18 +126,6 @@ public override async Task BlockBroadcastStream( return new VoidReply(); } - private Task ProcessBlockAsync(BlockWithTransactions block, string peerPubkey) - { - var peer = TryGetPeerByPubkey(peerPubkey); - - if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; - - if (!peer.TryAddKnownBlock(block.GetHash())) - return Task.CompletedTask; - - _ = EventBus.PublishAsync(new BlockReceivedEvent(block, peerPubkey)); - return Task.CompletedTask; - } public override async Task AnnouncementBroadcastStream( IAsyncStreamReader requestStream, ServerCallContext context) @@ -130,7 +135,7 @@ public override async Task AnnouncementBroadcastStream( try { var peerPubkey = context.GetPublicKey(); - await requestStream.ForEachAsync(async r => await ProcessAnnouncementAsync(r, peerPubkey)); + await requestStream.ForEachAsync(async r => await _grpcRequestProcessor.ProcessAnnouncementAsync(r, peerPubkey)); } catch (Exception e) { @@ -143,26 +148,6 @@ public override async Task AnnouncementBroadcastStream( return new VoidReply(); } - private Task ProcessAnnouncementAsync(BlockAnnouncement announcement, string peerPubkey) - { - if (announcement?.BlockHash == null) - { - Logger.LogWarning($"Received null announcement or header from {peerPubkey}."); - return Task.CompletedTask; - } - - var peer = TryGetPeerByPubkey(peerPubkey); - - if (!peer.TryAddKnownBlock(announcement.BlockHash)) - return Task.CompletedTask; - - if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; - - _ = EventBus.PublishAsync(new AnnouncementReceivedEventData(announcement, peerPubkey)); - - return Task.CompletedTask; - } - public override async Task TransactionBroadcastStream(IAsyncStreamReader requestStream, ServerCallContext context) { @@ -171,7 +156,7 @@ public override async Task TransactionBroadcastStream(IAsyncStreamRea try { var peerPubkey = context.GetPublicKey(); - await requestStream.ForEachAsync(async tx => await ProcessTransactionAsync(tx, peerPubkey)); + await requestStream.ForEachAsync(async tx => await _grpcRequestProcessor.ProcessTransactionAsync(tx, peerPubkey)); } catch (Exception e) { @@ -184,23 +169,6 @@ public override async Task TransactionBroadcastStream(IAsyncStreamRea return new VoidReply(); } - private async Task ProcessTransactionAsync(Transaction tx, string peerPubkey) - { - var chain = await _blockchainService.GetChainAsync(); - - // if this transaction's ref block is a lot higher than our chain - // then don't participate in p2p network - if (tx.RefBlockNumber > chain.LongestChainHeight + NetworkConstants.DefaultInitialSyncOffset) - return; - - var peer = TryGetPeerByPubkey(peerPubkey); - - if (!peer.TryAddKnownTransaction(tx.GetHash())) - return; - - _ = EventBus.PublishAsync(new TransactionsReceivedEvent { Transactions = new List { tx } }); - } - public override async Task LibAnnouncementBroadcastStream( IAsyncStreamReader requestStream, ServerCallContext context) { @@ -209,7 +177,7 @@ public override async Task LibAnnouncementBroadcastStream( try { var peerPubkey = context.GetPublicKey(); - await requestStream.ForEachAsync(async r => await ProcessLibAnnouncementAsync(r, peerPubkey)); + await requestStream.ForEachAsync(async r => await _grpcRequestProcessor.ProcessLibAnnouncementAsync(r, peerPubkey)); } catch (Exception e) { @@ -222,25 +190,6 @@ public override async Task LibAnnouncementBroadcastStream( return new VoidReply(); } - public Task ProcessLibAnnouncementAsync(LibAnnouncement announcement, string peerPubkey) - { - if (announcement?.LibHash == null) - { - Logger.LogWarning($"Received null or empty announcement from {peerPubkey}."); - return Task.CompletedTask; - } - - Logger.LogDebug( - $"Received lib announce hash: {announcement.LibHash}, height {announcement.LibHeight} from {peerPubkey}."); - - var peer = TryGetPeerByPubkey(peerPubkey); - - peer.UpdateLastKnownLib(announcement); - - if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; - - return Task.CompletedTask; - } /// /// This method returns a block. The parameter is a object, if the value @@ -249,97 +198,32 @@ public Task ProcessLibAnnouncementAsync(LibAnnouncement announcement, string pee /// public override async Task RequestBlock(BlockRequest request, ServerCallContext context) { - if (request == null || request.Hash == null || _syncStateService.SyncState != SyncState.Finished) - return new BlockReply(); - - Logger.LogDebug($"Peer {context.GetPeerInfo()} requested block {request.Hash}."); - - BlockWithTransactions block; - try - { - block = await _blockchainService.GetBlockWithTransactionsByHashAsync(request.Hash); - - if (block == null) - { - Logger.LogDebug($"Could not find block {request.Hash} for {context.GetPeerInfo()}."); - } - else - { - var peer = _connectionService.GetPeerByPubkey(context.GetPublicKey()); - peer.TryAddKnownBlock(block.GetHash()); - } - } - catch (Exception e) - { - Logger.LogWarning(e, $"Request block error: {context.GetPeerInfo()}"); - throw; - } - - return new BlockReply { Block = block }; + return await _grpcRequestProcessor.GetBlockAsync(request, context.GetPeerInfo(), context.GetPublicKey()); } public override async Task RequestBlocks(BlocksRequest request, ServerCallContext context) { - if (request == null || - request.PreviousBlockHash == null || - _syncStateService.SyncState != SyncState.Finished || - request.Count == 0 || - request.Count > GrpcConstants.MaxSendBlockCountLimit) - return new BlockList(); - - Logger.LogDebug( - $"Peer {context.GetPeerInfo()} requested {request.Count} blocks from {request.PreviousBlockHash}."); - - var blockList = new BlockList(); - + var blocks = await _grpcRequestProcessor.GetBlocksAsync(request, context.GetPeerInfo()); + if (!NetworkOptions.CompressBlocksOnRequest) return blocks; try { - var blocks = - await _blockchainService.GetBlocksWithTransactionsAsync(request.PreviousBlockHash, request.Count); - - blockList.Blocks.AddRange(blocks); - - if (NetworkOptions.CompressBlocksOnRequest) - { - var headers = new Metadata - { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) }; - await context.WriteResponseHeadersAsync(headers); - } - + var headers = new Metadata { new(GrpcConstants.GrpcRequestCompressKey, GrpcConstants.GrpcGzipConst) }; + await context.WriteResponseHeadersAsync(headers); Logger.LogDebug( - $"Replied to {context.GetPeerInfo()} with {blockList.Blocks.Count}, request was {request}"); + "Replied to {peerInfo} with {count}, request was {request}.", context.GetPeerInfo(), blocks.Blocks.Count, request); } catch (Exception e) { - Logger.LogWarning(e, $"Request blocks error - {context.GetPeerInfo()} - request {request}: "); + Logger.LogWarning(e, "Request blocks error - {peerInfo} - request={request}. ", context.GetPeerInfo(), request); throw; } - return blockList; + return blocks; } public override async Task GetNodes(NodesRequest request, ServerCallContext context) { - if (request == null) - return new NodeList(); - - var nodesCount = Math.Min(request.MaxCount, GrpcConstants.DefaultDiscoveryMaxNodesToResponse); - Logger.LogDebug($"Peer {context.GetPeerInfo()} requested {nodesCount} nodes."); - - NodeList nodes; - try - { - nodes = await _nodeManager.GetRandomNodesAsync(nodesCount); - } - catch (Exception e) - { - Logger.LogWarning(e, "Get nodes error: "); - throw; - } - - Logger.LogDebug($"Sending {nodes.Nodes.Count} to {context.GetPeerInfo()}."); - - return nodes; + return await _grpcRequestProcessor.GetNodesAsync(request, context.GetPeerInfo()); } public override Task Ping(PingRequest request, ServerCallContext context) @@ -357,40 +241,6 @@ public override Task CheckHealth(HealthCheckRequest request, S /// public override async Task Disconnect(DisconnectReason request, ServerCallContext context) { - Logger.LogDebug($"Peer {context.GetPeerInfo()} has sent a disconnect request."); - - try - { - await _connectionService.RemovePeerAsync(context.GetPublicKey()); - } - catch (Exception e) - { - Logger.LogError(e, "Disconnect error: "); - throw; - } - - return new VoidReply(); - } - - /// - /// Try to get the peer based on pubkey. - /// - /// - /// - /// - /// If the peer does not exist, a cancelled RPC exception is thrown to tell the client. - /// Need to verify the existence of the peer here, - /// because when we start transferring data using the streaming RPC, - /// the request no longer goes through the . - /// - private GrpcPeer TryGetPeerByPubkey(string peerPubkey) - { - var peer = _connectionService.GetPeerByPubkey(peerPubkey); - - if (peer != null) - return peer; - - Logger.LogDebug($"Peer: {peerPubkey} already removed."); - throw new RpcException(Status.DefaultCancelled); + return await _grpcRequestProcessor.DisconnectAsync(request, context.GetPeerInfo(), context.GetPublicKey()); } } \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/IGrpcRequestProcessor.cs b/src/AElf.OS.Network.Grpc/Service/IGrpcRequestProcessor.cs new file mode 100644 index 0000000000..c98e05c16f --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Service/IGrpcRequestProcessor.cs @@ -0,0 +1,267 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using AElf.Kernel.Blockchain.Application; +using AElf.Kernel.TransactionPool; +using AElf.OS.Network.Application; +using AElf.OS.Network.Domain; +using AElf.OS.Network.Events; +using AElf.OS.Network.Extensions; +using AElf.Types; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus.Local; + +namespace AElf.OS.Network.Grpc; + +public interface IGrpcRequestProcessor +{ + Task ProcessBlockAsync(BlockWithTransactions block, string peerPubkey); + Task ProcessAnnouncementAsync(BlockAnnouncement announcement, string peerPubkey); + Task ProcessTransactionAsync(Transaction tx, string peerPubkey); + Task ProcessLibAnnouncementAsync(LibAnnouncement announcement, string peerPubkey); + Task GetNodesAsync(NodesRequest request, string peerInfo); + Task GetBlockAsync(BlockRequest request, string peerInfo, string peerPubkey, string requestId = null); + Task GetBlocksAsync(BlocksRequest request, string peerInfo, string requestId = null); + Task ConfirmHandshakeAsync(string peerInfo, string peerPubkey, string requestId = null); + Task DisconnectAsync(DisconnectReason request, string peerInfo, string peerPubkey, string requestId = null); +} + +public class GrpcRequestProcessor : IGrpcRequestProcessor, ISingletonDependency +{ + private readonly IBlockchainService _blockchainService; + private readonly IConnectionService _connectionService; + private readonly ISyncStateService _syncStateService; + private readonly INodeManager _nodeManager; + + public GrpcRequestProcessor(IBlockchainService blockchainService, IConnectionService connectionService, ISyncStateService syncStateService, INodeManager nodeManager) + { + _blockchainService = blockchainService; + _connectionService = connectionService; + _syncStateService = syncStateService; + _nodeManager = nodeManager; + EventBus = NullLocalEventBus.Instance; + Logger = NullLogger.Instance; + } + + public ILocalEventBus EventBus { get; set; } + public ILogger Logger { get; set; } + + public Task ProcessBlockAsync(BlockWithTransactions block, string peerPubkey) + { + var peer = TryGetPeerByPubkey(peerPubkey); + + if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; + + if (!peer.TryAddKnownBlock(block.GetHash())) + return Task.CompletedTask; + _ = EventBus.PublishAsync(new BlockReceivedEvent(block, peerPubkey)); + return Task.CompletedTask; + } + + public Task ProcessAnnouncementAsync(BlockAnnouncement announcement, string peerPubkey) + { + if (announcement?.BlockHash == null) + { + Logger.LogWarning($"Received null announcement or header from {peerPubkey}."); + return Task.CompletedTask; + } + + var peer = TryGetPeerByPubkey(peerPubkey); + + if (!peer.TryAddKnownBlock(announcement.BlockHash)) + return Task.CompletedTask; + + if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; + + _ = EventBus.PublishAsync(new AnnouncementReceivedEventData(announcement, peerPubkey)); + + return Task.CompletedTask; + } + + public async Task ProcessTransactionAsync(Transaction tx, string peerPubkey) + { + var chain = await _blockchainService.GetChainAsync(); + + // if this transaction's ref block is a lot higher than our chain + // then don't participate in p2p network + if (tx.RefBlockNumber > chain.LongestChainHeight + NetworkConstants.DefaultInitialSyncOffset) + return; + + var peer = TryGetPeerByPubkey(peerPubkey); + + if (!peer.TryAddKnownTransaction(tx.GetHash())) + return; + + _ = EventBus.PublishAsync(new TransactionsReceivedEvent { Transactions = new List { tx } }); + } + + public Task ProcessLibAnnouncementAsync(LibAnnouncement announcement, string peerPubkey) + { + if (announcement?.LibHash == null) + { + Logger.LogWarning($"Received null or empty announcement from {peerPubkey}."); + return Task.CompletedTask; + } + + Logger.LogDebug( + $"Received lib announce hash: {announcement.LibHash}, height {announcement.LibHeight} from {peerPubkey}."); + + var peer = TryGetPeerByPubkey(peerPubkey); + + peer.UpdateLastKnownLib(announcement); + + if (peer.SyncState != SyncState.Finished) peer.SyncState = SyncState.Finished; + + return Task.CompletedTask; + } + + + /// + /// This method returns a block. The parameter is a object, if the value + /// of is not null, the request is by ID, otherwise it will be + /// by height. + /// + public async Task GetBlockAsync(BlockRequest request, string peerInfo, string peerPubkey, string requestId) + { + if (request == null || request.Hash == null || _syncStateService.SyncState != SyncState.Finished) + return new BlockReply(); + + Logger.LogDebug($"Peer {peerInfo} requested block {request.Hash}."); + + BlockWithTransactions block; + try + { + block = await _blockchainService.GetBlockWithTransactionsByHashAsync(request.Hash); + + if (block == null) + { + Logger.LogDebug($"Could not find block {request.Hash} for {peerInfo}."); + } + else + { + var peer = _connectionService.GetPeerByPubkey(peerPubkey); + peer.TryAddKnownBlock(block.GetHash()); + } + } + catch (Exception e) + { + Logger.LogWarning(e, $"Request block error: {peerPubkey}"); + throw; + } + + return new BlockReply { Block = block }; + } + + public async Task GetBlocksAsync(BlocksRequest request, string peerInfo, string requestId) + { + if (request == null || + request.PreviousBlockHash == null || + _syncStateService.SyncState != SyncState.Finished || + request.Count == 0 || + request.Count > GrpcConstants.MaxSendBlockCountLimit) + return new BlockList(); + + Logger.LogDebug( + "Peer {peerInfo} requested {count} blocks from {preHash}. requestId={requestId}", peerInfo, request.Count, request.PreviousBlockHash.ToHex(), requestId); + + var blockList = new BlockList(); + + try + { + var blocks = + await _blockchainService.GetBlocksWithTransactionsAsync(request.PreviousBlockHash, request.Count); + + blockList.Blocks.AddRange(blocks); + Logger.LogDebug( + "Replied to {peerInfo} with {count}, request was {request}. requestId={requestId}", peerInfo, blockList.Blocks.Count, request, requestId); + } + catch (Exception e) + { + Logger.LogWarning(e, "Request blocks error - {peerInfo} - request={request}. requestId={requestId} ", peerInfo, request, requestId); + throw; + } + + return blockList; + } + + public async Task GetNodesAsync(NodesRequest request, string peerInfo) + { + if (request == null) + return new NodeList(); + + var nodesCount = Math.Min(request.MaxCount, GrpcConstants.DefaultDiscoveryMaxNodesToResponse); + Logger.LogDebug("Peer {peerInfo} requested {nodesCount} nodes.", peerInfo, nodesCount); + + NodeList nodes; + try + { + nodes = await _nodeManager.GetRandomNodesAsync(nodesCount); + } + catch (Exception e) + { + Logger.LogWarning(e, "Get nodes error: "); + throw; + } + + Logger.LogDebug("Sending {Count} to {peerInfo}.", nodes.Nodes.Count, peerInfo); + return nodes; + } + + public Task ConfirmHandshakeAsync(string peerInfo, string peerPubkey, string requestId) + { + try + { + Logger.LogDebug("Peer {peerInfo} has requested a handshake confirmation. requestId={requestId}", peerInfo, requestId); + _connectionService.ConfirmHandshake(peerPubkey); + } + catch (Exception e) + { + Logger.LogWarning(e, "Confirm handshake error - {peerInfo}: requestId={requestId}", peerInfo, requestId); + throw; + } + + return Task.FromResult(new VoidReply()); + } + + public async Task DisconnectAsync(DisconnectReason request, string peerInfo, string peerPubkey, string requestId) + { + Logger.LogDebug("Peer {peerInfo} has sent a disconnect request. reason={reason} requestId={requestId}", peerInfo, request, requestId); + try + { + await _connectionService.RemovePeerAsync(peerPubkey); + } + catch (Exception e) + { + Logger.LogError(e, "requestId={requestId}, Disconnect error: ", requestId); + throw; + } + + return new VoidReply(); + } + + + /// + /// Try to get the peer based on peerPubkey. + /// + /// + /// + /// + /// If the peer does not exist, a cancelled RPC exception is thrown to tell the client. + /// Need to verify the existence of the peer here, + /// because when we start transferring data using the streaming RPC, + /// the request no longer goes through the . + /// + private GrpcPeer TryGetPeerByPubkey(string peerPubkey) + { + var peer = _connectionService.GetPeerByPubkey(peerPubkey); + + if (peer != null) + return peer; + + Logger.LogDebug("Peer: {peerPubkey} already removed.", peerPubkey); + throw new RpcException(Status.DefaultCancelled); + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/IStreamContext.cs b/src/AElf.OS.Network.Grpc/Service/IStreamContext.cs new file mode 100644 index 0000000000..580308f747 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Service/IStreamContext.cs @@ -0,0 +1,73 @@ +using System.Text; +using Google.Protobuf.Collections; +using Grpc.Core; + +namespace AElf.OS.Network.Grpc; + +public interface IStreamContext +{ + string GetPeerInfo(); + string GetPubKey(); + string GetSessionId(); + void SetPeerInfo(string peerInfo); +} + +public class ServiceStreamContext : IStreamContext +{ + public ServerCallContext Context; + + public ServiceStreamContext(ServerCallContext context) + { + Context = context; + } + + public string GetPeerInfo() + { + return Context.GetPeerInfo(); + } + + public string GetPubKey() + { + return Context.GetPublicKey(); + } + + public string GetSessionId() + { + return Context.GetSessionId()?.ToHex(); + } + + public void SetPeerInfo(string peerInfo) + { + Context.RequestHeaders.Add(new Metadata.Entry(GrpcConstants.PeerInfoMetadataKey, peerInfo)); + } +} + +public class StreamMessageMetaStreamContext : IStreamContext +{ + private MapField _meta; + + public StreamMessageMetaStreamContext(MapField meta) + { + _meta = meta; + } + + public string GetPeerInfo() + { + return _meta[GrpcConstants.PeerInfoMetadataKey]; + } + + public string GetPubKey() + { + return _meta[GrpcConstants.PubkeyMetadataKey]; + } + + public string GetSessionId() + { + return _meta[GrpcConstants.SessionIdMetadataKey]; + } + + public void SetPeerInfo(string peerInfo) + { + _meta[GrpcConstants.PeerInfoMetadataKey] = peerInfo; + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/IStreamMethod.cs b/src/AElf.OS.Network.Grpc/Service/IStreamMethod.cs new file mode 100644 index 0000000000..120dd0d164 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Service/IStreamMethod.cs @@ -0,0 +1,185 @@ +using System.Threading.Tasks; +using AElf.Types; +using Google.Protobuf; +using Volo.Abp.DependencyInjection; + +namespace AElf.OS.Network.Grpc; + +public interface IStreamMethod +{ + MessageType Method { get; } + Task InvokeAsync(StreamMessage request, IStreamContext streamContext); +} + +public abstract class StreamMethod : IStreamMethod +{ + public abstract MessageType Method { get; } + protected readonly IGrpcRequestProcessor GrpcRequestProcessor; + + protected StreamMethod(IGrpcRequestProcessor grpcRequestProcessor) + { + GrpcRequestProcessor = grpcRequestProcessor; + } + + public abstract Task InvokeAsync(StreamMessage request, IStreamContext streamContext); +} + +public class GetNodesMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.GetNodes; + + public GetNodesMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + return await GrpcRequestProcessor.GetNodesAsync(NodesRequest.Parser.ParseFrom(request.Message), streamContext.GetPeerInfo()); + } +} + +public class HealthCheckMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.HealthCheck; + + public HealthCheckMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + return Task.FromResult(new HealthCheckReply() as IMessage); + } +} + +public class PingMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.Ping; + + public PingMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + return Task.FromResult(new PongReply() as IMessage); + } +} + +public class DisconnectMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.Disconnect; + + public DisconnectMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.DisconnectAsync(DisconnectReason.Parser.ParseFrom(request.Message), streamContext.GetPeerInfo(), streamContext.GetPubKey(), request.RequestId); + return new VoidReply(); + } +} + +public class ConfirmHandShakeMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.ConfirmHandShake; + + public ConfirmHandShakeMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.ConfirmHandshakeAsync(streamContext.GetPeerInfo(), streamContext.GetPubKey(), request.RequestId); + return new VoidReply(); + } +} + +public class RequestBlockMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.RequestBlock; + + public RequestBlockMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + return await GrpcRequestProcessor.GetBlockAsync(BlockRequest.Parser.ParseFrom(request.Message), streamContext.GetPeerInfo(), streamContext.GetPubKey(), request.RequestId); + } +} + +public class RequestBlocksMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.RequestBlocks; + + public RequestBlocksMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + return await GrpcRequestProcessor.GetBlocksAsync(BlocksRequest.Parser.ParseFrom(request.Message), streamContext.GetPeerInfo(), request.RequestId); + } +} + +public class BlockBroadcastMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.BlockBroadcast; + + public BlockBroadcastMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.ProcessBlockAsync(BlockWithTransactions.Parser.ParseFrom(request.Message), streamContext.GetPubKey()); + return new VoidReply(); + } +} + +public class AnnouncementBroadcastMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.AnnouncementBroadcast; + + public AnnouncementBroadcastMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.ProcessAnnouncementAsync(BlockAnnouncement.Parser.ParseFrom(request.Message), streamContext.GetPubKey()); + return new VoidReply(); + } +} + +public class TransactionBroadcastMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.TransactionBroadcast; + + public TransactionBroadcastMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.ProcessTransactionAsync(Transaction.Parser.ParseFrom(request.Message), streamContext.GetPubKey()); + return new VoidReply(); + } +} + +public class LibAnnouncementBroadcastMethod : StreamMethod, ISingletonDependency +{ + public override MessageType Method => MessageType.LibAnnouncementBroadcast; + + public LibAnnouncementBroadcastMethod(IGrpcRequestProcessor grpcRequestProcessor) : base(grpcRequestProcessor) + { + } + + public override async Task InvokeAsync(StreamMessage request, IStreamContext streamContext) + { + await GrpcRequestProcessor.ProcessLibAnnouncementAsync(LibAnnouncement.Parser.ParseFrom(request.Message), streamContext.GetPubKey()); + return new VoidReply(); + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/IStreamService.cs b/src/AElf.OS.Network.Grpc/Service/IStreamService.cs new file mode 100644 index 0000000000..96017a6e52 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Service/IStreamService.cs @@ -0,0 +1,202 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using AElf.OS.Network.Application; +using AElf.OS.Network.Grpc.Helpers; +using AElf.OS.Network.Infrastructure; +using Google.Protobuf; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; + +namespace AElf.OS.Network.Grpc; + +public interface IStreamService +{ + Task ProcessStreamReplyAsync(ByteString reply, string clientPubKey); + Task ProcessStreamRequestAsync(StreamMessage request, ServerCallContext context); + Task ProcessStreamPeerException(NetworkException exception, IPeer peer); +} + +public class StreamService : IStreamService, ISingletonDependency +{ + public ILogger Logger { get; set; } + private readonly IConnectionService _connectionService; + private readonly IStreamTaskResourcePool _streamTaskResourcePool; + private readonly ITaskQueueManager _taskQueueManager; + private readonly Dictionary _streamMethods; + + + public StreamService(IConnectionService connectionService, IStreamTaskResourcePool streamTaskResourcePool, ITaskQueueManager taskQueueManager, + IEnumerable streamMethods) + { + Logger = NullLogger.Instance; + _connectionService = connectionService; + _streamTaskResourcePool = streamTaskResourcePool; + _taskQueueManager = taskQueueManager; + _streamMethods = streamMethods.ToDictionary(x => x.Method, y => y); + } + + public async Task ProcessStreamRequestAsync(StreamMessage message, ServerCallContext context) + { + var peer = _connectionService.GetPeerByPubkey(context.GetPublicKey()); + var streamPeer = peer as GrpcStreamPeer; + Logger.LogInformation("receive {requestId} {streamType} {meta}", message.RequestId, message.StreamType, message.Meta); + try + { + await DoProcessAsync(new StreamMessageMetaStreamContext(message.Meta), message, streamPeer); + Logger.LogInformation("handle stream call success, clientPubKey={clientPubKey} request={requestId} {streamType}-{messageType} latency={latency}", + context.GetPublicKey(), message.RequestId, message.StreamType, message.MessageType, CommonHelper.GetRequestLatency(message.RequestId)); + } + catch (Exception ex) + { + Logger.LogError(ex, "handle stream call failed, clientPubKey={clientPubKey} request={requestId} {streamType}-{messageType} latency={latency}", + context.GetPublicKey(), message.RequestId, message.StreamType, message.MessageType, CommonHelper.GetRequestLatency(message.RequestId)); + } + } + + public async Task ProcessStreamPeerException(NetworkException exception, IPeer peer) + { + await HandleNetworkExceptionAsync(peer, exception); + } + + public async Task ProcessStreamReplyAsync(ByteString reply, string clientPubKey) + { + var message = StreamMessage.Parser.ParseFrom(reply); + Logger.LogInformation("receive {requestId} {streamType} {meta}", message.RequestId, message.StreamType, message.Meta); + + var peer = _connectionService.GetPeerByPubkey(clientPubKey); + var streamPeer = peer as GrpcStreamPeer; + try + { + await DoProcessAsync(new StreamMessageMetaStreamContext(message.Meta), message, streamPeer); + Logger.LogInformation("handle stream call success, clientPubKey={clientPubKey} request={requestId} {streamType}-{messageType} latency={latency}", + clientPubKey, message.RequestId, message.StreamType, message.MessageType, CommonHelper.GetRequestLatency(message.RequestId)); + } + catch (Exception ex) + { + Logger.LogError(ex, "handle stream call failed, clientPubKey={clientPubKey} request={requestId} {streamType}-{messageType} latency={latency}", + clientPubKey, message.RequestId, message.StreamType, message.MessageType, CommonHelper.GetRequestLatency(message.RequestId)); + } + } + + private async Task DoProcessAsync(IStreamContext streamContext, StreamMessage request, GrpcStreamPeer responsePeer) + { + Logger.LogInformation("ProcessReceive {requestId} {streamType}-{messageType} latency={latency} messageSize={size}", request.RequestId, request.StreamType, request.MessageType, CommonHelper.GetRequestLatency(request.RequestId), + request.ToByteArray().Length); + if (!ValidContext(request, streamContext, responsePeer)) return; + switch (request.StreamType) + { + case StreamType.Reply: + _streamTaskResourcePool.TrySetResult(request.RequestId, request); + break; + + case StreamType.Request: + await ProcessRequestAsync(request, responsePeer, streamContext); + return; + case StreamType.Unknown: + default: + Logger.LogWarning("unhandled stream request: {requestId} {streamType}-{messageType}", request.RequestId, request.StreamType, request.MessageType); + return; + } + } + + + private async Task ProcessRequestAsync(StreamMessage request, GrpcStreamPeer responsePeer, IStreamContext streamContext) + { + try + { + if (!_streamMethods.TryGetValue(request.MessageType, out var method)) + Logger.LogWarning("unhandled stream request: {requestId} {streamType}-{messageType}", request.RequestId, request.StreamType, request.MessageType); + var reply = method == null ? new VoidReply() : await method.InvokeAsync(request, streamContext); + var message = new StreamMessage + { + StreamType = StreamType.Reply, MessageType = request.MessageType, + RequestId = request.RequestId, Message = reply == null ? new VoidReply().ToByteString() : reply.ToByteString() + }; + if (IsNeedAuth(request)) message.Meta.Add(GrpcConstants.SessionIdMetadataKey, responsePeer.Info.SessionId.ToHex()); + await responsePeer.WriteAsync(message, async ex => + { + if (ex != null) await HandleNetworkExceptionAsync(responsePeer, ex); + }); + } + catch (Exception e) + { + Logger.LogWarning(e, "request failed {requestId} {streamType}-{messageType}", request.RequestId, request.StreamType, request.MessageType); + throw; + } + } + + + private async Task HandleNetworkExceptionAsync(IPeer peer, NetworkException exception) + { + if (exception.ExceptionType == NetworkExceptionType.Unrecoverable) + { + Logger.LogInformation(exception, $"Removing unrecoverable {peer}."); + await _connectionService.TrySchedulePeerReconnectionAsync(peer); + } + else if (exception.ExceptionType == NetworkExceptionType.PeerUnstable) + { + Logger.LogDebug(exception, $"Queuing peer for reconnection {peer.RemoteEndpoint}."); + _taskQueueManager.Enqueue(async () => await RecoverPeerAsync(peer), NetworkConstants.PeerReconnectionQueueName); + } + } + + private async Task RecoverPeerAsync(IPeer peer) + { + if (peer.IsReady) // peer recovered already + return; + var success = await peer.TryRecoverAsync(); + if (success && peer is GrpcStreamPeer streamPeer) + { + await _connectionService.BuildStreamForPeerAsync(streamPeer); + } + + if (!success) await _connectionService.TrySchedulePeerReconnectionAsync(peer); + } + + private bool ValidContext(StreamMessage message, IStreamContext context, GrpcStreamPeer peer) + { + if (!IsNeedAuth(message)) return true; + if (peer == null) + { + Logger.LogWarning("Could not find peer {pubKey}", context.GetPubKey()); + return false; + } + + // check that the peers session is equal to one announced in the headers + var sessionId = context.GetSessionId(); + + if (peer.InboundSessionId.ToHex().Equals(sessionId)) + { + context.SetPeerInfo(peer.ToString()); + return true; + } + + if (peer.InboundSessionId == null) + { + Logger.LogWarning("Wrong inbound session id {peer}, {streamType}-{messageType}", context.GetPeerInfo(), message.StreamType, message.MessageType); + return false; + } + + if (sessionId == null) + { + Logger.LogWarning("Wrong inbound session id {peer}, {requestId}", peer, message.RequestId); + return false; + } + + Logger.LogWarning("Unequal session id, ({inboundSessionId} {infoSession} vs {sessionId}) {streamType}-{messageType} {pubkey} {peer}", peer.InboundSessionId.ToHex(), peer.Info.SessionId.ToHex(), + sessionId, message.StreamType, message.MessageType, peer.Info.Pubkey, peer); + return false; + } + + private bool IsNeedAuth(StreamMessage streamMessage) + { + if (streamMessage.StreamType == StreamType.Reply) return false; + return streamMessage.MessageType != MessageType.Ping && + streamMessage.MessageType != MessageType.HandShake && + streamMessage.MessageType != MessageType.HealthCheck; + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/Service/IStreamTaskResourcePool.cs b/src/AElf.OS.Network.Grpc/Service/IStreamTaskResourcePool.cs new file mode 100644 index 0000000000..bf05232c71 --- /dev/null +++ b/src/AElf.OS.Network.Grpc/Service/IStreamTaskResourcePool.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Volo.Abp.DependencyInjection; + +namespace AElf.OS.Network.Grpc; + +public interface IStreamTaskResourcePool +{ + Task RegistryTaskPromiseAsync(string requestId, MessageType messageType, TaskCompletionSource promise); + void TrySetResult(string requestId, StreamMessage reply); + Task> GetResultAsync(TaskCompletionSource promise, string requestId, int timeOut); +} + +public class StreamTaskResourcePool : IStreamTaskResourcePool, ISingletonDependency +{ + public ILogger Logger { get; set; } + private readonly ConcurrentDictionary>> _promisePool; + + public StreamTaskResourcePool() + { + Logger = NullLogger.Instance; + _promisePool = new ConcurrentDictionary>>(Environment.ProcessorCount, 1024); + } + + public Task RegistryTaskPromiseAsync(string requestId, MessageType messageType, TaskCompletionSource promise) + { + Logger.LogInformation("registry {requestId} {messageType}", requestId, messageType); + _promisePool[requestId] = new Tuple>(messageType, promise); + return Task.CompletedTask; + } + + public void TrySetResult(string requestId, StreamMessage reply) + { + AssertContains(requestId); + var promise = _promisePool[requestId]; + if (promise.Item1 != reply.MessageType) + { + throw new Exception($"invalid reply type set {reply.StreamType} expect {promise.Item1}"); + } + + promise.Item2.TrySetResult(reply); + } + + public async Task> GetResultAsync(TaskCompletionSource promise, string requestId, int timeOut) + { + try + { + var completed = await Task.WhenAny(promise.Task, Task.Delay(timeOut)); + if (completed != promise.Task) + return new Tuple(false, null); + + var message = await promise.Task; + return new Tuple(true, message); + } + finally + { + _promisePool.TryRemove(requestId, out _); + } + } + + private void AssertContains(string requestId) + { + if (!_promisePool.ContainsKey(requestId)) + { + throw new Exception($"{requestId} not found"); + } + } +} \ No newline at end of file diff --git a/src/AElf.OS.Network.Grpc/StreamJob.cs b/src/AElf.OS.Network.Grpc/StreamJob.cs index 02f3b8098a..831730f719 100644 --- a/src/AElf.OS.Network.Grpc/StreamJob.cs +++ b/src/AElf.OS.Network.Grpc/StreamJob.cs @@ -10,5 +10,6 @@ public class StreamJob public BlockAnnouncement BlockAnnouncement { get; set; } public BlockWithTransactions BlockWithTransactions { get; set; } public LibAnnouncement LibAnnouncement { get; set; } + public StreamMessage StreamMessage { get; set; } public Action SendCallback { get; set; } } \ No newline at end of file diff --git a/src/AElf.OS/AElf.OS.csproj b/src/AElf.OS/AElf.OS.csproj index c155aa5ca3..531d4f7d4d 100644 --- a/src/AElf.OS/AElf.OS.csproj +++ b/src/AElf.OS/AElf.OS.csproj @@ -1,5 +1,5 @@ - + net6.0 @@ -9,13 +9,13 @@ - - + + - - + + diff --git a/src/AElf.OS/Handlers/Network/BlockReceivedEventHandler.cs b/src/AElf.OS/Handlers/Network/BlockReceivedEventHandler.cs index 381a5dbec0..56e89d9b62 100644 --- a/src/AElf.OS/Handlers/Network/BlockReceivedEventHandler.cs +++ b/src/AElf.OS/Handlers/Network/BlockReceivedEventHandler.cs @@ -46,7 +46,7 @@ private async Task ProcessNewBlockAsync(BlockWithTransactions blockWithTransacti var chain = await _blockchainService.GetChainAsync(); Logger.LogDebug( - $"About to process new block: {blockWithTransactions.Header.GetHash()} of height {blockWithTransactions.Height}"); + $"About to process new block: {blockWithTransactions.Header.GetHash().ToHex()} of height {blockWithTransactions.Height}"); if (!await _blockSyncValidationService.ValidateBlockBeforeSyncAsync(chain, blockWithTransactions, senderPubkey)) diff --git a/src/AElf.OS/Handlers/Network/StreamMessageReceivedEventHandler.cs b/src/AElf.OS/Handlers/Network/StreamMessageReceivedEventHandler.cs new file mode 100644 index 0000000000..d6afc861fe --- /dev/null +++ b/src/AElf.OS/Handlers/Network/StreamMessageReceivedEventHandler.cs @@ -0,0 +1,37 @@ +using System; +using System.Threading.Tasks; +using AElf.OS.Network.Events; +using AElf.OS.Network.Grpc; +using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus; + +namespace AElf.OS.Handlers; + +public class StreamMessageReceivedEventHandler : ILocalEventHandler, ITransientDependency +{ + private readonly IStreamService _streamService; + + public ILogger Logger { get; set; } + + public StreamMessageReceivedEventHandler(IStreamService streamService) + { + _streamService = streamService; + } + + public Task HandleEventAsync(StreamMessageReceivedEvent eventData) + { + //because our message do not have relation between each other, so we want it to be processed concurrency + if (eventData.RequestId != null) + Logger.LogDebug("handleReceive {requestId} latency={latency}", eventData.RequestId, GetLatency(eventData.RequestId)); + _streamService.ProcessStreamReplyAsync(eventData.Message, eventData.ClientPubkey); + return Task.CompletedTask; + } + + private long GetLatency(string requestId) + { + var sp = requestId.Split("_"); + if (sp.Length != 2) return -1; + return long.TryParse(sp[0], out var start) ? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start : -1; + } +} \ No newline at end of file diff --git a/src/AElf.OS/Handlers/Network/StreamPeerExceptionEventHandler.cs b/src/AElf.OS/Handlers/Network/StreamPeerExceptionEventHandler.cs new file mode 100644 index 0000000000..5374b187d2 --- /dev/null +++ b/src/AElf.OS/Handlers/Network/StreamPeerExceptionEventHandler.cs @@ -0,0 +1,26 @@ +using System; +using System.Security.AccessControl; +using System.Threading.Tasks; +using AElf.OS.Network.Events; +using AElf.OS.Network.Grpc; +using Microsoft.Extensions.Logging; +using Volo.Abp.DependencyInjection; +using Volo.Abp.EventBus; + +namespace AElf.OS.Handlers; + +public class StreamPeerExceptionEventHandler : ILocalEventHandler, ITransientDependency +{ + private readonly IStreamService _streamService; + public ILogger Logger { get; set; } + + public StreamPeerExceptionEventHandler(IStreamService streamService) + { + _streamService = streamService; + } + + public async Task HandleEventAsync(StreamPeerExceptionEvent eventData) + { + await _streamService.ProcessStreamPeerException(eventData.Exception, eventData.Peer); + } +} \ No newline at end of file diff --git a/test/AElf.OS.Core.Tests/Network/Helpers/AElfPeerEndpointHelperTests.cs b/test/AElf.OS.Core.Tests/Network/Helpers/AElfPeerEndpointHelperTests.cs index 161e6ae695..46dc522ff8 100644 --- a/test/AElf.OS.Core.Tests/Network/Helpers/AElfPeerEndpointHelperTests.cs +++ b/test/AElf.OS.Core.Tests/Network/Helpers/AElfPeerEndpointHelperTests.cs @@ -30,4 +30,5 @@ public void ParsingTest(string endpointToParse, bool isValid, int expectedPort = if (isValid) endpoint.Port.ShouldBe(expectedPort); } + } \ No newline at end of file diff --git a/test/AElf.OS.Network.Grpc.Tests/AElf.OS.Network.Grpc.Tests.csproj b/test/AElf.OS.Network.Grpc.Tests/AElf.OS.Network.Grpc.Tests.csproj index 7e8368034e..4ff60eec84 100644 --- a/test/AElf.OS.Network.Grpc.Tests/AElf.OS.Network.Grpc.Tests.csproj +++ b/test/AElf.OS.Network.Grpc.Tests/AElf.OS.Network.Grpc.Tests.csproj @@ -5,14 +5,14 @@ AElf.OS.Network.Grpc - - + + all runtime; build; native; contentfiles; analyzers - + all @@ -24,9 +24,9 @@ - - + + - + diff --git a/test/AElf.OS.Network.Grpc.Tests/Connection/ConnectionServiceTests.cs b/test/AElf.OS.Network.Grpc.Tests/Connection/ConnectionServiceTests.cs index fe5268c8d4..55f0dde163 100644 --- a/test/AElf.OS.Network.Grpc.Tests/Connection/ConnectionServiceTests.cs +++ b/test/AElf.OS.Network.Grpc.Tests/Connection/ConnectionServiceTests.cs @@ -410,7 +410,7 @@ public void ConfirmHandshake_Test() eventData.ShouldNotBeNull(); } - private Handshake CreateHandshake(ECKeyPair initiatorPeer = null, int port = 0) + private Handshake CreateHandshake(ECKeyPair initiatorPeer = null, int port = 0, string nodeversion = "1.3.0.0") { if (initiatorPeer == null) initiatorPeer = CryptoHelper.GenerateKeyPair(); @@ -421,6 +421,7 @@ private Handshake CreateHandshake(ECKeyPair initiatorPeer = null, int port = 0) Version = KernelConstants.ProtocolVersion, Pubkey = ByteString.CopyFrom(initiatorPeer.PublicKey), Time = TimestampHelper.GetUtcNow(), + NodeVersion = nodeversion, ListeningPort = port }; diff --git a/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkTestModule.cs b/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkTestModule.cs index 21e6dab8d9..2a99e6e34c 100644 --- a/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkTestModule.cs +++ b/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkTestModule.cs @@ -90,7 +90,7 @@ public override void ConfigureServices(ServiceConfigurationContext context) NetworkTestConstants.FakePubkey, mockClient.Object); - return Task.FromResult(peer); + return Task.FromResult(peer as GrpcPeer); }); // Incorrect handshake @@ -116,7 +116,7 @@ public override void ConfigureServices(ServiceConfigurationContext context) var peer = GrpcTestPeerHelper.CreatePeerWithClient(NetworkTestConstants.GoodPeerEndpoint, NetworkTestConstants.FakePubkey, mockClient.Object); - return Task.FromResult(peer); + return Task.FromResult(peer as GrpcPeer); }); // Incorrect handshake signature diff --git a/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkWithPeerTestModule.cs b/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkWithPeerTestModule.cs index 0c667ae02a..2448bd3ee8 100644 --- a/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkWithPeerTestModule.cs +++ b/test/AElf.OS.Network.Grpc.Tests/GrpcNetworkWithPeerTestModule.cs @@ -33,7 +33,7 @@ public override void OnApplicationInitialization(ApplicationInitializationContex if (!AElfPeerEndpointHelper.TryParse(NetworkTestConstants.FakeIpEndpoint, out var peerEndpoint)) throw new Exception($"Ip {NetworkTestConstants.FakeIpEndpoint} is invalid."); - pool.TryAddPeer(new GrpcPeer(new GrpcClient(channel, new PeerService.PeerServiceClient(channel)), peerEndpoint, - connectionInfo)); + var client = new GrpcClient(channel, new PeerService.PeerServiceClient(channel)); + pool.TryAddPeer(new GrpcPeer(client, peerEndpoint, connectionInfo)); } } \ No newline at end of file diff --git a/test/AElf.OS.Network.Grpc.Tests/GrpcPeerTests.cs b/test/AElf.OS.Network.Grpc.Tests/GrpcPeerTests.cs index 856a51e97f..138bfb5239 100644 --- a/test/AElf.OS.Network.Grpc.Tests/GrpcPeerTests.cs +++ b/test/AElf.OS.Network.Grpc.Tests/GrpcPeerTests.cs @@ -1,10 +1,12 @@ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using AElf.CSharp.Core.Extension; using AElf.Kernel; using AElf.OS.Network.Application; using AElf.OS.Network.Infrastructure; +using AElf.OS.Network.Protocol.Types; using AElf.Types; using Google.Protobuf; using Grpc.Core; @@ -404,6 +406,17 @@ public async Task HandleRpcException_Test() grpcPeer.CheckHealthAsync() .ShouldThrow().ExceptionType.ShouldBe(NetworkExceptionType.Unrecoverable); + grpcPeer.IsConnected.ShouldBe(false); + grpcPeer.IsReady.ShouldBe(false); + grpcPeer.BufferedTransactionsCount.ShouldBe(0); + grpcPeer.BufferedBlocksCount.ShouldBe(0); + grpcPeer.BufferedAnnouncementsCount.ShouldBe(0); + + var grpcClient = new GrpcClient(new("127.0.0.1:9999", ChannelCredentials.Insecure), mockClient.Object); + var peer = new GrpcStreamPeer(grpcClient, null, new PeerConnectionInfo() { Pubkey = "0471b4ea88d8cf3d4c58c12e1306a4fdfde64b3a511e8b023c0412032869f47e1ddeaa9b82318a7d1b8d08eb484cfe93a51c319750b65e3df8ffd822a446251b64" }, null, + null, + new StreamTaskResourcePool(), new Dictionary() { { "tmp", "value" } }); + peer.HandleRpcException(new RpcException(Status.DefaultSuccess), ""); } } diff --git a/test/AElf.OS.Network.Grpc.Tests/Helpers/GrpcTestPeerHelper.cs b/test/AElf.OS.Network.Grpc.Tests/Helpers/GrpcTestPeerHelper.cs index d2928780d7..dca3231c74 100644 --- a/test/AElf.OS.Network.Grpc.Tests/Helpers/GrpcTestPeerHelper.cs +++ b/test/AElf.OS.Network.Grpc.Tests/Helpers/GrpcTestPeerHelper.cs @@ -21,7 +21,7 @@ public static GrpcPeer CreateBasicPeer(string ip, string pubkey) new PeerConnectionInfo { Pubkey = pubkey, - SessionId = new byte[] {0, 1, 2}, + SessionId = new byte[] { 0, 1, 2 }, ConnectionTime = TimestampHelper.GetUtcNow(), NodeVersion = NodeVersion }); @@ -30,17 +30,19 @@ public static GrpcPeer CreateBasicPeer(string ip, string pubkey) public static GrpcPeer CreatePeerWithInfo(string ip, PeerConnectionInfo info) { AElfPeerEndpointHelper.TryParse(ip, out var endpoint); - var peer = new GrpcPeer(new GrpcClient(CreateMockChannel(), null), endpoint, info); - peer.InboundSessionId = new byte[] {0, 1, 2}; + var client = new GrpcClient(CreateMockChannel(), null); + var peer = new GrpcPeer(client, endpoint, info); + peer.InboundSessionId = new byte[] { 0, 1, 2 }; return peer; } public static GrpcPeer CreatePeerWithClient(string ip, string pubkey, PeerService.PeerServiceClient client) { AElfPeerEndpointHelper.TryParse(ip, out var endpoint); - var peer = new GrpcPeer(new GrpcClient(CreateMockChannel(), client), endpoint, - new PeerConnectionInfo {Pubkey = pubkey, SessionId = new byte[] {0, 1, 2}, NodeVersion = NodeVersion}); - peer.InboundSessionId = new byte[] {0, 1, 2}; + var grpcClient = new GrpcClient(CreateMockChannel(), client); + var info = new PeerConnectionInfo { Pubkey = pubkey, SessionId = new byte[] { 0, 1, 2 }, NodeVersion = NodeVersion }; + var peer = new GrpcPeer(grpcClient, endpoint, info); + peer.InboundSessionId = new byte[] { 0, 1, 2 }; return peer; } @@ -66,14 +68,15 @@ public static GrpcPeer CreateNewPeer(string ipAddress = "127.0.0.1:2000", bool i Pubkey = pubkey, ProtocolVersion = KernelConstants.ProtocolVersion, ConnectionTime = TimestampHelper.GetUtcNow(), - SessionId = new byte[] {0, 1, 2}, + SessionId = new byte[] { 0, 1, 2 }, IsInbound = true, NodeVersion = NodeVersion }; AElfPeerEndpointHelper.TryParse(ipAddress, out var endpoint); - var peer = new GrpcPeer(new GrpcClient(channel, client), endpoint, connectionInfo); - peer.InboundSessionId = new byte[] {0, 1, 2}; + var grpcClient = new GrpcClient(channel, client); + var peer = new GrpcPeer(grpcClient, endpoint, connectionInfo); + peer.InboundSessionId = new byte[] { 0, 1, 2 }; return peer; } diff --git a/test/AElf.OS.Network.Grpc.Tests/Helpers/NetworkTestHelper.cs b/test/AElf.OS.Network.Grpc.Tests/Helpers/NetworkTestHelper.cs index a8e8cd0ca4..5a8acab627 100644 --- a/test/AElf.OS.Network.Grpc.Tests/Helpers/NetworkTestHelper.cs +++ b/test/AElf.OS.Network.Grpc.Tests/Helpers/NetworkTestHelper.cs @@ -9,7 +9,7 @@ namespace AElf.OS.Network.Grpc; public static class NetworkTestHelper { public static Handshake CreateValidHandshake(ECKeyPair producer, long bestChainHeight, - int chainId = NetworkTestConstants.DefaultChainId, int port = 0) + int chainId = NetworkTestConstants.DefaultChainId, int port = 0, string nodeVersion= "1.3.0.0") { var data = new HandshakeData { @@ -21,7 +21,8 @@ public static Handshake CreateValidHandshake(ECKeyPair producer, long bestChainH BestChainHeight = bestChainHeight, LastIrreversibleBlockHash = HashHelper.ComputeFrom("LastIrreversibleBlockHash"), LastIrreversibleBlockHeight = 1, - Time = TimestampHelper.GetUtcNow() + Time = TimestampHelper.GetUtcNow(), + NodeVersion = nodeVersion }; var signature = diff --git a/test/AElf.OS.Network.Grpc.Tests/Service/GrpcStreamServiceTests.cs b/test/AElf.OS.Network.Grpc.Tests/Service/GrpcStreamServiceTests.cs new file mode 100644 index 0000000000..32b9f64691 --- /dev/null +++ b/test/AElf.OS.Network.Grpc.Tests/Service/GrpcStreamServiceTests.cs @@ -0,0 +1,145 @@ +using System; +using System.Collections.Generic; +using System.Security.Cryptography; +using System.Threading; +using System.Threading.Tasks; +using AElf.CSharp.Core.Extension; +using AElf.Kernel; +using AElf.OS.Network.Application; +using AElf.OS.Network.Grpc.Helpers; +using AElf.OS.Network.Infrastructure; +using AElf.OS.Network.Protocol; +using AElf.OS.Network.Protocol.Types; +using Google.Protobuf; +using Google.Protobuf.Collections; +using Grpc.Core; +using Grpc.Core.Testing; +using Grpc.Core.Utils; +using Moq; +using NSubstitute.ExceptionExtensions; +using Shouldly; +using Xunit; + +namespace AElf.OS.Network.Grpc; + +public class GrpcStreamServiceTests : GrpcNetworkWithChainTestBase +{ + private readonly OSTestHelper _osTestHelper; + private readonly IStreamService _streamService; + private readonly IPeerPool _peerPool; + + public GrpcStreamServiceTests() + { + _osTestHelper = GetRequiredService(); + _streamService = GetRequiredService(); + _peerPool = GetRequiredService(); + } + + private ServerCallContext BuildServerCallContext(Metadata metadata = null, string address = null) + { + return TestServerCallContext.Create("mock", null, TimestampHelper.GetUtcNow().AddHours(1).ToDateTime(), + metadata ?? new Metadata(), CancellationToken.None, + address ?? "ipv4:127.0.0.1:5555", null, null, m => TaskUtils.CompletedTask, () => new WriteOptions(), + writeOptions => { }); + } + + [Fact] + public void StreamMessageMetaStreamContextTest() + { + var context = new StreamMessageMetaStreamContext(new MapField() + { + { GrpcConstants.PubkeyMetadataKey, "111" }, + { GrpcConstants.SessionIdMetadataKey, "222" }, + { GrpcConstants.PeerInfoMetadataKey, "333" }, + }); + Assert.True(context.GetPeerInfo().Equals("333")); + Assert.True(context.GetPubKey().Equals("111")); + Assert.True(context.GetSessionId().Equals("222")); + context.SetPeerInfo("123"); + Assert.True(context.GetPeerInfo().Equals("123")); + + var context1 = new ServiceStreamContext(BuildServerCallContext()); + Assert.True(context1.GetPeerInfo() == null); + Assert.True(context1.GetPubKey() == null); + Assert.True(context1.GetSessionId() == null); + context1.SetPeerInfo("123"); + Assert.True(context1.GetPeerInfo().Equals("123")); + } + + [Fact] + public async Task StreamTaskResourcePoolTest() + { + var pool = new StreamTaskResourcePool(); + var requestId = "23"; + var p = new TaskCompletionSource(); + pool.RegistryTaskPromiseAsync(requestId, MessageType.Ping, p); + pool.TrySetResult(requestId, new StreamMessage { Message = new PongReply().ToByteString(), MessageType = MessageType.Ping, RequestId = requestId, StreamType = StreamType.Reply }); + var res = await pool.GetResultAsync(p, requestId, 100); + res.Item1.ShouldBe(true); + } + + [Fact] + public void RequestIdTest() + { + var requestId = CommonHelper.GenerateRequestId(); + Task.Delay(5); + var l = CommonHelper.GetRequestLatency(requestId); + Assert.True(l >= 0); + } + + [Fact] + public async Task GrpcStreamPeerTest() + { + var mockClient = new Mock(); + var grpcClient = new GrpcClient(new("127.0.0.1:9999", ChannelCredentials.Insecure), mockClient.Object); + var peer = new GrpcStreamPeer(grpcClient, null, new PeerConnectionInfo() { Pubkey = "0471b4ea88d8cf3d4c58c12e1306a4fdfde64b3a511e8b023c0412032869f47e1ddeaa9b82318a7d1b8d08eb484cfe93a51c319750b65e3df8ffd822a446251b64" }, null, null, + new StreamTaskResourcePool(), new Dictionary() { { "tmp", "value" } }); + Assert.Throws(peer.BuildCall); + + peer.StartServe(null); + peer.SetStreamSendCallBack(null); + peer.DisposeAsync().ShouldThrow(); + peer.DisconnectAsync(true).ShouldThrow(); + + var block = new BlockWithTransactions + { Header = _osTestHelper.GenerateBlock(HashHelper.ComputeFrom("PreBlockHash"), 100).Header }; + peer.GetBlocksAsync(block.GetHash(), 10).ShouldThrow(); + peer.HandShakeAsync(new HandshakeRequest()).ShouldThrow(); + peer.ConfirmHandshakeAsync().ShouldThrow(); + peer.GetBlockByHashAsync(block.GetHash()).ShouldThrow(); + peer.GetNodesAsync().ShouldThrow(); + peer.CheckHealthAsync().ShouldThrow(); + peer.HandleRpcException(new RpcException(Status.DefaultSuccess), ""); + + var nodePubkey = "0471b4ea88d8cf3d4c58c12e1306a4fdfde64b3a511e8b023c0412032869f47e1ddeaa9b82318a7d1b8d08eb484cfe93a51c319750b65e3df8ffd822a446251b64"; + var peerback = new GrpcStreamBackPeer(null, new PeerConnectionInfo() { Pubkey = nodePubkey }, null, + new StreamTaskResourcePool(), new Dictionary() { { GrpcConstants.PubkeyMetadataKey, nodePubkey } }); + peerback.ConnectionStatus.ShouldBe("Stream Closed"); + var res = await peerback.TryRecoverAsync(); + res.ShouldBe(true); + var re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Cancelled, "")), ""); + re.ExceptionType.ShouldBe(NetworkExceptionType.Unrecoverable); + re = peerback.HandleRpcException(new RpcException(new Status(StatusCode.Unknown, "")), ""); + re.ExceptionType.ShouldBe(NetworkExceptionType.Unrecoverable); + } + + [Fact] + public async Task StreamServiceTests() + { + var mockClient = new Mock(); + var grpcClient = new GrpcClient(new("127.0.0.1:9999", ChannelCredentials.Insecure), mockClient.Object); + var nodePubkey = "0471b4ea88d8cf3d4c58c12e1306a4fdfde64b3a511e8b023c0412032869f47e1ddeaa9b82318a7d1b8d08eb484cfe93a51c319750b65e3df8ffd822a446251b64"; + var peer = new GrpcStreamPeer(grpcClient, null, new PeerConnectionInfo() { Pubkey = "0471b4ea88d8cf3d4c58c12e1306a4fdfde64b3a511e8b023c0412032869f47e1ddeaa9b82318a7d1b8d08eb484cfe93a51c319750b65e3df8ffd822a446251b64" }, null, null, + new StreamTaskResourcePool(), new Dictionary() { { GrpcConstants.PubkeyMetadataKey, nodePubkey } }); + peer.InboundSessionId = "123".GetBytes(); + _peerPool.TryAddPeer(peer); + var sessionId = HandshakeProvider.GenerateRandomToken(); + _streamService.ProcessStreamRequestAsync(new StreamMessage { StreamType = StreamType.Reply, MessageType = MessageType.HealthCheck, RequestId = "123", Message = new HealthCheckRequest().ToByteString() }, + BuildServerCallContext(new Metadata() { { GrpcConstants.SessionIdMetadataKey, sessionId }, })); + _streamService.ProcessStreamRequestAsync(new StreamMessage { StreamType = StreamType.Request, MessageType = MessageType.HealthCheck, RequestId = "123", Message = new HealthCheckRequest().ToByteString() }, + BuildServerCallContext(new Metadata() { { GrpcConstants.SessionIdMetadataKey, sessionId }, })); + + _streamService.ProcessStreamPeerException(new NetworkException("", NetworkExceptionType.Unrecoverable), peer).ShouldThrow(); + await _streamService.ProcessStreamPeerException(new NetworkException("", NetworkExceptionType.PeerUnstable), peer); + } +} \ No newline at end of file