Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bls proof of replication #1603

Merged
merged 10 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/archethic/db/embedded_impl/encoding.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Archethic.DB.EmbeddedImpl.Encoding do

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ProofOfValidation
alias Archethic.TransactionChain.Transaction.ProofOfReplication
alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionData.Ledger
alias Archethic.TransactionChain.TransactionData.Ownership
Expand Down Expand Up @@ -167,8 +168,14 @@ defmodule Archethic.DB.EmbeddedImpl.Encoding do
]
end

defp encode_validation_fields(%Transaction{proof_of_validation: proof_of_validation}) do
[{"proof_of_validation", ProofOfValidation.serialize(proof_of_validation)}]
defp encode_validation_fields(%Transaction{
proof_of_validation: proof_of_validation,
proof_of_replication: proof_of_replication
}) do
[
{"proof_of_validation", ProofOfValidation.serialize(proof_of_validation)},
{"proof_of_replication", ProofOfReplication.serialize(proof_of_replication)}
]
end

def decode(_tx_version, _protocol_version, "type", <<type::8>>, acc),
Expand Down Expand Up @@ -331,6 +338,11 @@ defmodule Archethic.DB.EmbeddedImpl.Encoding do
Map.put(acc, :proof_of_validation, proof_of_validation)
end

def decode(_tx_version, _protocol_version, "proof_of_replication", <<rest::bitstring>>, acc) do
{proof_of_replication, _} = ProofOfReplication.deserialize(rest)
Map.put(acc, :proof_of_replication, proof_of_replication)
end

def decode(_tx_version, _protocol_version, column, data, acc), do: Map.put(acc, column, data)

defp deserialize_ownerships(_, 0, _, _), do: []
Expand Down
31 changes: 30 additions & 1 deletion lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ defmodule Archethic.Mining do

alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.CrossValidationStamp
alias Archethic.TransactionChain.Transaction.ProofOfReplication
alias Archethic.TransactionChain.Transaction.ProofOfReplication.Signature
alias Archethic.TransactionChain.Transaction.ProofOfValidation
alias Archethic.TransactionChain.Transaction.ValidationStamp

Expand All @@ -34,7 +36,7 @@ defmodule Archethic.Mining do
# version 5->6 the POI changed and is now done with tx.data.recipients.args serialized with :extended mode
# version 6->7 add Add consumed inputs in tx.validation_stamp.ledger_operations
# version 7->8 movement resolved address are now the genesis address of the destination
# version 8->9 add proof of validation workflow and field tx.proof_of_validation
# version 8->9 add proof of validation / replication workflow and field tx.proof_of_validation / tx.proof_of_replication
@protocol_version 9

@lock_threshold 0.75
Expand Down Expand Up @@ -249,6 +251,33 @@ defmodule Archethic.Mining do
|> DistributedWorkflow.add_proof_of_validation(proof, from)
end

@doc """
Add a replication signature to the transaction process
"""
@spec add_replication_signature(
tx_address :: Crypto.prepended_hash(),
replication_signature :: Signature.t()
) :: :ok
def add_replication_signature(tx_address, replication_signature = %Signature{}) do
pid = get_mining_process!(tx_address)
if pid, do: send(pid, {:add_replication_signature, replication_signature})
:ok
end

@doc """
Add a proof of replication to the transaction mining process
"""
@spec add_proof_of_replication(
tx_address :: Crypto.prepended_hash(),
proof_of_validation :: ProofOfReplication.t(),
from :: Crypto.key()
) :: :ok
def add_proof_of_replication(tx_address, proof, from) do
tx_address
|> get_mining_process!()
|> DistributedWorkflow.add_proof_of_replication(proof, from)
end

@doc """
Confirm the replication from a storage node
"""
Expand Down
183 changes: 170 additions & 13 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ defmodule Archethic.Mining.DistributedWorkflow do
alias Archethic.P2P.Message.CrossValidationDone
alias Archethic.P2P.Message.NotifyPreviousChain
alias Archethic.P2P.Message.ProofOfValidationDone
alias Archethic.P2P.Message.ProofOfReplicationDone
alias Archethic.P2P.Message.ReplicationAttestationMessage
alias Archethic.P2P.Message.RequestReplicationSignature
alias Archethic.P2P.Message.ReplicatePendingTransactionChain
alias Archethic.P2P.Message.ReplicateTransaction
alias Archethic.P2P.Message.ValidateTransaction
Expand All @@ -42,6 +44,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.CrossValidationStamp
alias Archethic.TransactionChain.Transaction.ProofOfReplication
alias Archethic.TransactionChain.Transaction.ProofOfValidation
alias Archethic.TransactionChain.Transaction.ValidationStamp

Expand Down Expand Up @@ -144,6 +147,18 @@ defmodule Archethic.Mining.DistributedWorkflow do
GenStateMachine.cast(pid, {:add_proof_of_validation, proof, node_public_key})
end

@doc """
Add the proof of replication in the mining process
"""
@spec add_proof_of_replication(
worker_pid :: pid(),
proof :: ProofOfReplication.t(),
node_public_key :: Crypto.key()
) :: :ok
def add_proof_of_replication(pid, proof, node_public_key) do
GenStateMachine.cast(pid, {:add_proof_of_replication, proof, node_public_key})
end

defp get_context_timeout(:hosting),
do: @start_mining_message_drift + @context_notification_timeout * 2

Expand Down Expand Up @@ -255,7 +270,10 @@ defmodule Archethic.Mining.DistributedWorkflow do
resolved_addresses: resolved_addresses,
contract_context: contract_context,
genesis_address: genesis_address,
proof_elected_nodes: ProofOfValidation.get_election(authorized_nodes, tx.address)
validation_proof_elected_nodes:
ProofOfValidation.get_election(authorized_nodes, tx.address),
replication_proof_elected_nodes:
ProofOfReplication.get_election(authorized_nodes, tx.address)
)

role = if node_public_key == coordinator_key, do: :coordinator, else: :cross_validator
Expand Down Expand Up @@ -743,6 +761,11 @@ defmodule Archethic.Mining.DistributedWorkflow do
end
end

def handle_event(:info, {:add_cross_validation_stamp, _}, _, _) do
# Receiving remaining cross validation stamp while proof of validation is already created
:keep_state_and_data
end

def handle_event(
:internal,
:create_proof_of_validation,
Expand Down Expand Up @@ -772,7 +795,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
|> ValidationContext.get_confirmed_validation_nodes()
|> P2P.broadcast_message(message)

{:next_state, :replication, %{data | context: new_context}}
{:next_state, :wait_proof_of_replication, %{data | context: new_context}}
end

def handle_event(
Expand Down Expand Up @@ -812,25 +835,135 @@ defmodule Archethic.Mining.DistributedWorkflow do
if ValidationContext.valid_proof_of_validation?(context, proof) do
Logger.info("Received proof of validation", meta)
new_context = ValidationContext.add_proof_of_validation(context, proof)
{:next_state, :replication, Map.put(data, :context, new_context)}
{:next_state, :wait_proof_of_replication, Map.put(data, :context, new_context)}
else
Logger.warning("Received invalid proof of validation", meta)
:keep_state_and_data
end
end

def handle_event(:enter, :wait_cross_replication_stamps, :wait_proof_of_replication, %{
context: context
}) do
request_replication_signature(context)
:keep_state_and_data
end

def handle_event(
:enter,
:wait_cross_replication_stamps,
:replication,
%{context: context}
:info,
{:add_replication_signature, replication_signature},
:wait_proof_of_replication,
data = %{
node_public_key: node_public_key,
context:
context = %ValidationContext{
transaction: %Transaction{address: tx_address, type: type},
coordinator_node: %Node{last_public_key: coordinator_key}
}
}
) do
request_replication(context)
Logger.info("Add replication signature",
transaction_address: Base.encode16(tx_address),
transaction_type: type
)

new_context = ValidationContext.add_replication_signature(context, replication_signature)
new_data = Map.put(data, :context, new_context)

case ValidationContext.get_replication_signatures_state(new_context) do
:reached ->
if node_public_key == coordinator_key,
do: {:keep_state, new_data, {:next_event, :internal, :create_proof_of_replication}},
else: {:keep_state, new_data}

:not_reached ->
{:keep_state, new_data}
end
end

def handle_event(:info, {:add_replication_signature, _}, _, _) do
# Receiving remaining replication signature while proof of replication is already created
:keep_state_and_data
end

def handle_event(:info, {:add_cross_validation_stamp, _}, :replication, _) do
# Receiving remaining cross validation stamp while proof of validation is already created
def handle_event(
:internal,
:create_proof_of_replication,
:wait_proof_of_replication,
data = %{
context:
context = %ValidationContext{
transaction: %Transaction{address: tx_address, type: type}
}
}
) do
Logger.info("Create proof of replication",
transaction_address: Base.encode16(tx_address),
transaction_type: type
)

new_context =
%ValidationContext{proof_of_replication: proof_of_replication} =
ValidationContext.create_proof_of_replication(context)

message = %ProofOfReplicationDone{
address: tx_address,
proof_of_replication: proof_of_replication
}

context
|> ValidationContext.get_confirmed_validation_nodes()
|> P2P.broadcast_message(message)

{:next_state, :replication, %{data | context: new_context}}
end

def handle_event(
:cast,
{:add_proof_of_replication, _proof, from},
:wait_proof_of_replication,
%{
context: %ValidationContext{
coordinator_node: %Node{first_public_key: coordinator_key},
transaction: %Transaction{address: tx_address, type: type}
}
}
)
when from != coordinator_key do
Logger.warning(
"Received proof of replication from non coordinator node #{Base.encode16(from)} while coordinator is #{Base.encode16(coordinator_key)}",
transaction_address: Base.encode16(tx_address),
transaction_type: type
)

:keep_state_and_data
end

def handle_event(
:cast,
{:add_proof_of_replication, proof, _},
:wait_proof_of_replication,
data = %{
context:
context = %ValidationContext{
transaction: %Transaction{address: tx_address, type: type}
}
}
) do
meta = [transaction_address: Base.encode16(tx_address), transaction_type: type]

if ValidationContext.valid_proof_of_replication?(context, proof) do
Logger.info("Received proof of replication", meta)
new_context = ValidationContext.add_proof_of_replication(context, proof)
{:next_state, :replication, Map.put(data, :context, new_context)}
else
Logger.warning("Received invalid proof of replication", meta)
:keep_state_and_data
end
end

def handle_event(:enter, :wait_proof_of_replication, :replication, %{context: context}) do
request_replication(context)
:keep_state_and_data
end

Expand Down Expand Up @@ -1168,7 +1301,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)

Logger.info(
"Send validated transaction to #{Enum.map_join(storage_nodes, ",", &Node.endpoint/1)} for replication's validation",
"Request transaction validation to storage nodes #{Enum.map_join(storage_nodes, ",", &Node.endpoint/1)}",
transaction_address: Base.encode16(tx.address),
transaction_type: tx.type
)
Expand All @@ -1185,7 +1318,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
P2P.broadcast_message(storage_nodes, message)
end

defp request_replication(
defp request_replication_signature(
context = %ValidationContext{
transaction: %Transaction{address: address, type: type},
genesis_address: genesis_address,
Expand All @@ -1194,6 +1327,30 @@ defmodule Archethic.Mining.DistributedWorkflow do
) do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)

Logger.info(
"Request replication signature to storage nodes #{Enum.map_join(storage_nodes, ",", &Node.endpoint/1)}",
transaction_address: Base.encode16(address),
transaction_type: type
)

message = %RequestReplicationSignature{
address: address,
genesis_address: genesis_address,
proof_of_validation: proof_of_validation
}

P2P.broadcast_message(storage_nodes, message)
end

defp request_replication(
context = %ValidationContext{
transaction: %Transaction{address: address, type: type},
genesis_address: genesis_address,
proof_of_replication: proof_of_replication
}
) do
storage_nodes = ValidationContext.get_chain_replication_nodes(context)

Logger.info(
"Send validated transaction to #{Enum.map_join(storage_nodes, ",", &Node.endpoint/1)}",
transaction_address: Base.encode16(address),
Expand All @@ -1203,7 +1360,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
message = %ReplicatePendingTransactionChain{
address: address,
genesis_address: genesis_address,
proof_of_validation: proof_of_validation
proof_of_replication: proof_of_replication
}

P2P.broadcast_message(storage_nodes, message)
Expand Down
Loading