Skip to content

Commit

Permalink
Merge pull request #3404 from AElfProject/release/1.4.0
Browse files Browse the repository at this point in the history
Release v1.4.0
  • Loading branch information
jason-aelf authored Jun 14, 2023
2 parents a0804d7 + c0f24b9 commit ff547f2
Show file tree
Hide file tree
Showing 52 changed files with 2,098 additions and 376 deletions.
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
parameters:
parts: 3
n: 2
codecoverage: true
codecoverage: false
- template: templates/build-template-window.yml
parameters:
parts: 3
Expand Down
61 changes: 47 additions & 14 deletions protobuf/peer_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@ import "aelf/core.proto";

service PeerService {

rpc Ping (PingRequest) returns (PongReply) {}
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}

rpc RequestBlock (BlockRequest) returns (BlockReply) {}
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}
rpc Ping (PingRequest) returns (PongReply) {}
rpc CheckHealth (HealthCheckRequest) returns (HealthCheckReply) {}

rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}

rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}
rpc RequestBlock (BlockRequest) returns (BlockReply) {}
rpc RequestBlocks (BlocksRequest) returns (BlockList) {}

rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}
rpc BlockBroadcastStream (stream BlockWithTransactions) returns (VoidReply) {}

rpc GetNodes (NodesRequest) returns (NodeList) {}
rpc TransactionBroadcastStream (stream aelf.Transaction) returns (VoidReply) {}
rpc AnnouncementBroadcastStream (stream BlockAnnouncement) returns (VoidReply) {}

rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}
rpc LibAnnouncementBroadcastStream (stream LibAnnouncement) returns (VoidReply) {}

rpc Disconnect (DisconnectReason) returns (VoidReply) {}
rpc RequestByStream (stream StreamMessage) returns (stream StreamMessage) {}

rpc GetNodes (NodesRequest) returns (NodeList) {}

rpc DoHandshake (HandshakeRequest) returns (HandshakeReply) {}
rpc ConfirmHandshake (ConfirmHandshakeRequest) returns (VoidReply) {}

rpc Disconnect (DisconnectReason) returns (VoidReply) {}
}

// **** No reply *****
Expand All @@ -45,3 +47,34 @@ message HealthCheckRequest {
message HealthCheckReply {
}

message StreamMessage {
StreamType stream_type = 1;
MessageType message_type = 2;
string request_id = 3;
bytes message = 4;
map<string, string> meta = 5;
}

enum StreamType {
UNKNOWN = 0;
REQUEST = 1;
REPLY = 2;
}

enum MessageType {
ANY = 0;

HAND_SHAKE = 1;
PING = 2;
CONFIRM_HAND_SHAKE = 3;
HEALTH_CHECK = 4;
REQUEST_BLOCK = 5;
REQUEST_BLOCKS = 6;
GET_NODES = 7;

BLOCK_BROADCAST = 8;
TRANSACTION_BROADCAST = 9;
ANNOUNCEMENT_BROADCAST = 10;
LIB_ANNOUNCEMENT_BROADCAST = 11;
DISCONNECT = 12;
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ await _transactionBlockIndexService.ValidateTransactionBlockIndexExistsInBranchA
block.Header.PreviousBlockHash);
if (!blockIndexExists)
continue;
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId);
Logger.LogDebug("Transaction: {TransactionId} repackaged", transactionId.ToHex());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public async Task MergeBlockStateAsync(long lastIrreversibleBlockHeight, Hash la

Logger.LogDebug(
"Start merge lib height: {LastIrreversibleBlockHeight}, lib block hash: {LastIrreversibleBlockHash}, merge count: {BlockIndexesCount}",
lastIrreversibleBlockHeight, lastIrreversibleBlockHash, blockIndexes.Count);
lastIrreversibleBlockHeight, lastIrreversibleBlockHash.ToHex(), blockIndexes.Count);

foreach (var blockIndex in blockIndexes)
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ await _transactionResultService.GetTransactionResultAsync(transactionId, blockHa
== null)
{
Logger.LogWarning(
$"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}");
"Fail to load transaction result. block hash : {blockHash}, tx id: {transactionId}", blockHash.ToHex(), transactionId.ToHex());

return null;
}
Expand Down
2 changes: 1 addition & 1 deletion src/AElf.Kernel.Types/Block/BlockHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public byte[] GetHashBytes()
private byte[] GetSignatureData()
{
if (!VerifyFields())
throw new InvalidOperationException($"Invalid block header: {this}.");
throw new InvalidOperationException($"Invalid block header: PreviousBlockHash={PreviousBlockHash?.ToHex()}, mtr={MerkleTreeRootOfTransactions?.ToHex()}, ChainId={ChainId}, Height={Height}, Time={Time}.");

if (Signature.IsEmpty)
return this.ToByteArray();
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.Kernel.Types/KernelConstants.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using Google.Protobuf.WellKnownTypes;

namespace AElf.Kernel;
Expand All @@ -16,4 +17,5 @@ public static class KernelConstants
public const string SignaturePlaceholder = "SignaturePlaceholder";
public const string BlockExecutedDataKey = "BlockExecutedData";
public static Duration AllowedFutureBlockTimeSpan = new() { Seconds = 4 };
public static string SupportStreamMinVersion = "1.4.0.0";
}
10 changes: 5 additions & 5 deletions src/AElf.Launcher/AElf.Launcher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<ServerGarbageCollection>true</ServerGarbageCollection>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj"/>
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj"/>
<ProjectReference Include="..\AElf.Blockchains.MainChain\AElf.Blockchains.MainChain.csproj" />
<ProjectReference Include="..\AElf.Blockchains.SideChain\AElf.Blockchains.SideChain.csproj" />
<None Update="Dockerfile">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand All @@ -15,9 +15,9 @@
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2"/>
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2"/>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Volo.Abp.AspNetCore.Mvc" Version="5.2.2" />
<PackageReference Include="Volo.Abp.Autofac" Version="5.2.2" />
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<ItemGroup>
<Content Include="appsettings.Development.json">
Expand Down
6 changes: 3 additions & 3 deletions src/AElf.OS.Core/AElf.OS.Core.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props"/>
<Import Project="..\..\common.props" />
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>AElf.OS</RootNamespace>
Expand All @@ -8,8 +8,8 @@
<Description>Core module for the OS layer.</Description>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj"/>
<ProjectReference Include="..\AElf.Kernel.Node\AElf.Kernel.Node.csproj" />
<ProjectReference Include="..\AElf.Kernel.Token\AElf.Kernel.Token.csproj" />
</ItemGroup>
<ItemGroup>
<CommonMessage Include="..\..\protobuf\network_types.proto">
Expand Down
2 changes: 2 additions & 0 deletions src/AElf.OS.Core/Network/Application/INetworkService.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using AElf.Kernel;
using AElf.OS.Network.Infrastructure;
using AElf.OS.Network.Types;
using AElf.Types;

Expand Down Expand Up @@ -28,4 +29,5 @@ Task<bool> RemovePeerByPubkeyAsync(string peerPubkey,
Task CheckPeersHealthAsync();
void CheckNtpDrift();
bool IsPeerPoolFull();
Task<List<NodeInfo>> GetNodesAsync(IPeer peer);
}
38 changes: 29 additions & 9 deletions src/AElf.OS.Core/Network/Application/NetworkService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Task BroadcastAnnounceAsync(BlockHeader blockHeader)
Logger.LogInformation(ex, $"Could not broadcast announcement to {peer} " +
$"- status {peer.ConnectionStatus}.");

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -171,7 +171,7 @@ public Task BroadcastTransactionAsync(Transaction transaction)
Logger.LogWarning(ex, $"Could not broadcast transaction to {peer} " +
$"- status {peer.ConnectionStatus}.");

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -201,7 +201,7 @@ public Task BroadcastLibAnnounceAsync(Hash libHash, long libHeight)
{
Logger.LogWarning(ex, $"Could not broadcast lib announcement to {peer} " +
$"- status {peer.ConnectionStatus}.");
await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -256,7 +256,6 @@ public async Task<Response<List<BlockWithTransactions>>> GetBlocksAsync(Hash pre

if (peer == null)
throw new InvalidOperationException($"Could not find peer {peerPubkey}.");

var response = await Request(peer, p => p.GetBlocksAsync(previousBlock, count));

if (response.Success && response.Payload != null
Expand Down Expand Up @@ -343,7 +342,7 @@ private void EnqueueBlock(IPeer peer, BlockWithTransactions blockWithTransaction
if (ex != null)
{
Logger.LogWarning(ex, $"Could not broadcast block to {peer} - status {peer.ConnectionStatus}.");
await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}
});
}
Expand Down Expand Up @@ -372,13 +371,33 @@ private async Task<Response<T>> Request<T>(IPeer peer, Func<IPeer, Task<T>> func
if (ex.ExceptionType == NetworkExceptionType.HandlerException)
return new Response<T>(default);

await HandleNetworkException(peer, ex);
await HandleNetworkExceptionAsync(peer, ex);
}

return new Response<T>();
}

private async Task HandleNetworkException(IPeer peer, NetworkException exception)
public async Task<List<NodeInfo>> GetNodesAsync(IPeer peer)
{
try
{
var nodeList = await peer.GetNodesAsync();

if (nodeList?.Nodes == null)
return new List<NodeInfo>();

Logger.LogDebug("get nodes: {nodeList} from peer: {peer}.", nodeList, peer);
return nodeList.Nodes.ToList();
}
catch (Exception e)
{
if (e is NetworkException exception) await HandleNetworkExceptionAsync(peer, exception);
Logger.LogWarning(e, "get nodes failed. peer={peer}", peer);
return new List<NodeInfo>();
}
}

private async Task HandleNetworkExceptionAsync(IPeer peer, NetworkException exception)
{
if (exception.ExceptionType == NetworkExceptionType.Unrecoverable)
{
Expand All @@ -398,8 +417,9 @@ private async Task RecoverPeerAsync(IPeer peer)
return;

var success = await peer.TryRecoverAsync();

if (!success)
if (success)
await _networkServer.BuildStreamForPeerAsync(peer);
else
await _networkServer.TrySchedulePeerReconnectionAsync(peer);
}

Expand Down
19 changes: 19 additions & 0 deletions src/AElf.OS.Core/Network/Events/StreamMessageReceivedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Google.Protobuf;

namespace AElf.OS.Network.Events;

public class StreamMessageReceivedEvent
{
public StreamMessageReceivedEvent(ByteString message, string clientPubkey, string requestId)
{
Message = message;
ClientPubkey = clientPubkey;
RequestId = requestId;
}

public ByteString Message { get; }

public string ClientPubkey { get; }

public string RequestId { get; }
}
16 changes: 16 additions & 0 deletions src/AElf.OS.Core/Network/Events/StreamPeerExceptionEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using AElf.OS.Network.Application;
using AElf.OS.Network.Infrastructure;

namespace AElf.OS.Network.Events;

public class StreamPeerExceptionEvent
{
public NetworkException Exception { get; }
public IPeer Peer { get; }

public StreamPeerExceptionEvent(NetworkException exception, IPeer peer)
{
Exception = exception;
Peer = peer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ public interface IAElfNetworkServer
Task StopAsync(bool gracefulDisconnect = true);
void CheckNtpDrift();
Task<bool> CheckEndpointAvailableAsync(DnsEndPoint endpoint);
Task<bool> BuildStreamForPeerAsync(IPeer peer);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using AElf.Kernel.Account.Application;
using AElf.OS.Network.Application;
using AElf.OS.Network.Domain;
using AElf.OS.Network.Extensions;
using Microsoft.Extensions.Logging;
Expand All @@ -28,18 +28,20 @@ public class PeerDiscoveryJobProcessor : IPeerDiscoveryJobProcessor, ISingletonD
private readonly IDiscoveredNodeCacheProvider _discoveredNodeCacheProvider;
private readonly IAElfNetworkServer _networkServer;
private readonly INodeManager _nodeManager;
private readonly INetworkService _networkService;

private TransformManyBlock<IPeer, NodeInfo> _discoverNodesDataflow;
private ActionBlock<NodeInfo> _processNodeDataflow;

public PeerDiscoveryJobProcessor(INodeManager nodeManager,
IDiscoveredNodeCacheProvider discoveredNodeCacheProvider, IAElfNetworkServer networkServer,
IAccountService accountService)
IAccountService accountService, INetworkService networkService)
{
_nodeManager = nodeManager;
_discoveredNodeCacheProvider = discoveredNodeCacheProvider;
_networkServer = networkServer;
_accountService = accountService;
_networkService = networkService;
CreatePeerDiscoveryDataflow();

Logger = NullLogger<PeerDiscoveryJobProcessor>.Instance;
Expand Down Expand Up @@ -80,21 +82,7 @@ private void CreatePeerDiscoveryDataflow()

private async Task<List<NodeInfo>> DiscoverNodesAsync(IPeer peer)
{
try
{
var nodeList = await peer.GetNodesAsync();

if (nodeList?.Nodes == null)
return new List<NodeInfo>();

Logger.LogDebug($"Discover nodes: {nodeList} from peer: {peer}.");
return nodeList.Nodes.ToList();
}
catch (Exception e)
{
Logger.LogWarning(e, "Discover nodes failed.");
return new List<NodeInfo>();
}
return await _networkService.GetNodesAsync(peer);
}

private async Task ProcessNodeAsync(NodeInfo node)
Expand Down
1 change: 1 addition & 0 deletions src/AElf.OS.Core/Network/NetworkConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static class NetworkConstants
public const int DefaultMaxBufferedTransactionCount = 100;
public const int DefaultMaxBufferedBlockCount = 50;
public const int DefaultMaxBufferedAnnouncementCount = 200;
public const int DefaultMaxBufferedStreamCount = 200;

public const int DefaultPeerReconnectionPeriod = 60_000; // 1 min
public const int DefaultMaximumReconnectionTime = 60_000 * 60 * 24; // 1 day
Expand Down
Loading

0 comments on commit ff547f2

Please sign in to comment.