Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit phase - AKKA Model #422

Closed
wants to merge 10 commits into from
30 changes: 30 additions & 0 deletions neo/Consensus/CommitAgreement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.IO;
using Neo.IO;

namespace Neo.Consensus
{
internal class CommitAgreement : ConsensusMessage
{
/// <summary>
/// Block hash of the signature
/// </summary>
public UInt256 BlockHash;

/// <summary>
/// Constructors
/// </summary>
public CommitAgreement() : base(ConsensusMessageType.CommitAgreement) { }

public override void Deserialize(BinaryReader reader)
{
base.Deserialize(reader);
BlockHash = reader.ReadSerializable<UInt256>();
}

public override void Serialize(BinaryWriter writer)
{
base.Serialize(writer);
writer.Write(BlockHash);
}
}
}
39 changes: 38 additions & 1 deletion neo/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,42 @@ internal class ConsensusContext : IDisposable
public byte[] ExpectedView;
public KeyPair KeyPair;

private UInt256[] Commits;
private Block _header = null;
public UInt256 CommitHash => _header?.Hash;

public int M => Validators.Length - (Validators.Length - 1) / 3;

public bool TryToCommit(ConsensusPayload payload, CommitAgreement message)
{
// Already received
if (Commits[payload.ValidatorIndex] != null) return false;

// Store received block hash
Commits[payload.ValidatorIndex] = message.BlockHash;

// Check count
return _header != null && Commits.Where(u => u != null && u == _header.Hash).Count() >= M;
}

public void ChangeView(byte view_number)
{
State &= ConsensusState.SignatureSent;
ViewNumber = view_number;
PrimaryIndex = GetPrimaryIndex(view_number);

if (State == ConsensusState.Initial)
{
TransactionHashes = null;
Signatures = new byte[Validators.Length][];
Commits = new UInt256[Validators.Length];
}

if (MyIndex >= 0)
{
ExpectedView[MyIndex] = view_number;
}

_header = null;
}

Expand All @@ -67,10 +89,10 @@ public ConsensusPayload MakeChangeView()
});
}

private Block _header = null;
public Block MakeHeader()
{
if (TransactionHashes == null) return null;

if (_header == null)
{
_header = new Block
Expand All @@ -84,10 +106,23 @@ public Block MakeHeader()
NextConsensus = NextConsensus,
Transactions = new Transaction[0]
};

Commits[MyIndex] = _header.Hash;
}

return _header;
}

public ConsensusPayload MakeCommitAgreement()
{
if (_header == null) return null;

return MakePayload(new CommitAgreement()
{
BlockHash = _header.Hash
});
}

private ConsensusPayload MakePayload(ConsensusMessage message)
{
message.ViewNumber = ViewNumber;
Expand Down Expand Up @@ -137,6 +172,8 @@ public void Reset(Wallet wallet)
Signatures = new byte[Validators.Length][];
ExpectedView = new byte[Validators.Length];
KeyPair = null;
Commits = new UInt256[Validators.Length];

for (int i = 0; i < Validators.Length; i++)
{
WalletAccount account = wallet.GetAccount(Validators[i]);
Expand Down
4 changes: 3 additions & 1 deletion neo/Consensus/ConsensusMessageType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ internal enum ConsensusMessageType : byte
PrepareRequest = 0x20,
[ReflectionCache(typeof(PrepareResponse))]
PrepareResponse = 0x21,
[ReflectionCache(typeof(CommitAgreement))]
CommitAgreement = 0x022,
}
}
}
84 changes: 76 additions & 8 deletions neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ private void ChangeTimer(TimeSpan delay)
private void CheckExpectedView(byte view_number)
{
if (context.ViewNumber == view_number) return;
if (context.State.HasFlag(ConsensusState.CommitSent))
{
if (context.State.HasFlag(ConsensusState.SignatureSent))
{
// If signature was sent, we send again

SignAndRelay(context.MakePrepareResponse(context.Signatures[context.MyIndex]));
Copy link
Contributor

@igormcoelho igormcoelho Oct 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you need to Relay your CommitAgreement again too. Not sure. Or is it implicit by PrepareResponse message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shargon ,if a node (in commit phase) receives change view request, and re-send prepareResponse. From current code, the node (who have sent the changeView request ) will not accept prepareResponse because the viewNumber does not match. We may need to find a way to accept the re-send prepareResponse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is just for dropped packets (network errors), maybe there are a disconnection, and this packet never arrive to your CN.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Imagine one CN(A) sent ChangeView request to other CN if it did not collect enough PrepareResponse on time because the network error. Then A increases its view number. But when other CN(B) receives the ChangeView message, it resends prepareResponse because B is in commit phase now.

In this situation, even if A re-connect with B, A will not accept B's resent prepareResponse message because view numbers do not match anymore. But we need to let A accepts this message and move on to commit phase as B, right ?

}

return;
}

if (context.ExpectedView.Count(p => p == view_number) >= context.M)
{
InitializeConsensus(view_number);
Expand All @@ -82,8 +94,30 @@ private void CheckExpectedView(byte view_number)

private void CheckSignatures()
{
if (!context.State.HasFlag(ConsensusState.CommitSent) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shargon,
I believe that here we should check the partial signatures instead if the complete and, then, at lines 353 (

context.Signatures[context.MyIndex] = context.MakeHeader().Sign(context.KeyPair);
) and 52 (
context.Signatures[context.MyIndex] = context.MakeHeader().Sign(context.KeyPair);
) they should not be send.

context.Signatures.Count(p => p != null) >= context.M &&
context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
{
// Send my commit

context.State |= ConsensusState.CommitSent;
SignAndRelay(context.MakeCommitAgreement());

Log($"Commit sent: height={context.BlockIndex} hash={context.CommitHash} state={context.State}");
}
}

private void OnCommitAgreement(ConsensusPayload payload, CommitAgreement message)
{
if (context.State.HasFlag(ConsensusState.BlockSent) ||
!context.TryToCommit(payload, message)) return;

Log($"{nameof(OnCommitAgreement)}: height={payload.BlockIndex} hash={message.BlockHash.ToString()} view={message.ViewNumber} index={payload.ValidatorIndex}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move log() before if()? From the log, it looks like CN send out block when it receives only one commitAgreement message. This is very confusing.

commit
commit-2


if (context.Signatures.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
{
context.State |= ConsensusState.BlockSent;

Contract contract = Contract.CreateMultiSigContract(context.M, context.Validators);
Block block = context.MakeHeader();
ContractParametersContext sc = new ContractParametersContext(block);
Expand All @@ -96,8 +130,8 @@ private void CheckSignatures()
sc.Verifiable.Witnesses = sc.GetWitnesses();
block.Transactions = context.TransactionHashes.Select(p => context.Transactions[p]).ToArray();
Log($"relay block: {block.Hash}");

system.LocalNode.Tell(new LocalNode.Relay { Inventory = block });
context.State |= ConsensusState.BlockSent;
}
}

Expand Down Expand Up @@ -178,18 +212,21 @@ private void Log(string message, LogLevel level = LogLevel.Info)

private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message)
{
Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}");
if (message.NewViewNumber <= context.ExpectedView[payload.ValidatorIndex])
return;

Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}");

context.ExpectedView[payload.ValidatorIndex] = message.NewViewNumber;
CheckExpectedView(message.NewViewNumber);
}

private void OnConsensusPayload(ConsensusPayload payload)
{
if (payload.ValidatorIndex == context.MyIndex) return;
if (payload.Version != ConsensusContext.Version)
return;
if (payload.ValidatorIndex == context.MyIndex ||
payload.Version != ConsensusContext.Version ||
payload.ValidatorIndex >= context.Validators.Length) return;

if (payload.PrevHash != context.PrevHash || payload.BlockIndex != context.BlockIndex)
{
if (context.Snapshot.Height + 1 < payload.BlockIndex)
Expand All @@ -198,7 +235,7 @@ private void OnConsensusPayload(ConsensusPayload payload)
}
return;
}
if (payload.ValidatorIndex >= context.Validators.Length) return;

ConsensusMessage message;
try
{
Expand All @@ -208,8 +245,10 @@ private void OnConsensusPayload(ConsensusPayload payload)
{
return;
}

if (message.ViewNumber != context.ViewNumber && message.Type != ConsensusMessageType.ChangeView)
return;

switch (message.Type)
{
case ConsensusMessageType.ChangeView:
Expand All @@ -221,6 +260,9 @@ private void OnConsensusPayload(ConsensusPayload payload)
case ConsensusMessageType.PrepareResponse:
OnPrepareResponseReceived(payload, (PrepareResponse)message);
break;
case ConsensusMessageType.CommitAgreement:
OnCommitAgreement(payload, (CommitAgreement)message);
break;
}
}

Expand All @@ -233,15 +275,22 @@ private void OnPersistCompleted(Block block)

private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message)
{
if (context.State.HasFlag(ConsensusState.RequestReceived))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good move! Avoiding this repeated message.

return;

Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}");
if (!context.State.HasFlag(ConsensusState.Backup) || context.State.HasFlag(ConsensusState.RequestReceived))

if (!context.State.HasFlag(ConsensusState.Backup))
return;

if (payload.ValidatorIndex != context.PrimaryIndex) return;

if (payload.Timestamp <= context.Snapshot.GetHeader(context.PrevHash).Timestamp || payload.Timestamp > DateTime.UtcNow.AddMinutes(10).ToTimestamp())
{
Log($"Timestamp incorrect: {payload.Timestamp}", LogLevel.Warning);
return;
}

context.State |= ConsensusState.RequestReceived;
context.Timestamp = payload.Timestamp;
context.Nonce = message.Nonce;
Expand Down Expand Up @@ -271,9 +320,11 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m

private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message)
{
Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}");
if (context.State.HasFlag(ConsensusState.BlockSent)) return;
if (context.Signatures[payload.ValidatorIndex] != null) return;

Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good!


Block header = context.MakeHeader();
if (header == null || !Crypto.Default.VerifySignature(header.GetHashData(), message.Signature, context.Validators[payload.ValidatorIndex].EncodePoint(false))) return;
context.Signatures[payload.ValidatorIndex] = message.Signature;
Expand Down Expand Up @@ -360,6 +411,20 @@ public static Props Props(NeoSystem system, Wallet wallet)

private void RequestChangeView()
{
if (context.State.HasFlag(ConsensusState.CommitSent))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove this and filter before, @shargon.

else if ((context.State.HasFlag(ConsensusState.Primary) && context.State.HasFlag(ConsensusState.RequestSent)) || (context.State.HasFlag(ConsensusState.Backup) && !context.State.HasFlag(ConsensusState.CommitSent))    )

at line

else if ((context.State.HasFlag(ConsensusState.Primary) && context.State.HasFlag(ConsensusState.RequestSent)) || context.State.HasFlag(ConsensusState.Backup))

{
// Lock view change on timer

if (context.State.HasFlag(ConsensusState.SignatureSent))
{
// If signature was sent, we send again

SignAndRelay(context.MakePrepareResponse(context.Signatures[context.MyIndex]));
}

return;
}

context.State |= ConsensusState.ViewChanging;
context.ExpectedView[context.MyIndex]++;
Log($"request change view: height={context.BlockIndex} view={context.ViewNumber} nv={context.ExpectedView[context.MyIndex]} state={context.State}");
Expand All @@ -370,6 +435,8 @@ private void RequestChangeView()

private void SignAndRelay(ConsensusPayload payload)
{
if (payload == null) return;

ContractParametersContext sc;
try
{
Expand All @@ -380,6 +447,7 @@ private void SignAndRelay(ConsensusPayload payload)
{
return;
}

sc.Verifiable.Witnesses = sc.GetWitnesses();
system.LocalNode.Tell(new LocalNode.SendDirectly { Inventory = payload });
}
Expand Down
3 changes: 2 additions & 1 deletion neo/Consensus/ConsensusState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal enum ConsensusState : byte
SignatureSent = 0x10,
BlockSent = 0x20,
ViewChanging = 0x40,
CommitSent = 0x80,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct 0x80.

}
}
}