-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsensus-contract.py
144 lines (112 loc) · 4.97 KB
/
consensus-contract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
"""
Author:
Usage: mpiexec -n <thread #> python -O mpi_blockchain.py
Input: None
Output: <thread #> of files with commited transactions in ./output/
Start Date: 02/05/2022
Desc: A quick prototype of blockchain using MPI
Change History:
Idea: once the hpc cluster receives the data request, the receiver node will broadcast the request to the fellow nodes (no cluster concept like BAASH), the secret holders nodes will forward the secrets along with their votes, the receiver compute node does not need to know who are the fellow nodes that hold the secrets.
"""
from mpi4py import MPI
import sys, datetime, os, contract
### 2PQC: 2-Phase Quorum Commit Protocol: A MPI-based 2-phase commit protocol with quorum check
### The idea is simple: why can't we plug in the quorum check into the 2PC protocol in HPC blockchain?
### If we really think about 2PC and consensus, 2PC is nothing but checking all participants are ready/done for commit
### On the other hand, consensus requires us to reach 51% decision. That's it.
### In HPC, we don't care about Bazyntine failures; there's only fail-restart scenario.
def dc_2pqc(received_txn, output_path):
"""A distributed commit protocol named two-phase quorum commit.
Args:
received_txn (string): received transaction to be committed
output_path (string): the output directory of the committed transactions
Returns:
None
"""
comm = MPI.COMM_WORLD
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
##################
# Phase 1: prepare
##################
request = comm.bcast("prepare", root=0)
if __debug__:
sys.stdout.write("Rank %d receives request: %s\n" % (rank, request))
# for now, assume all ranks are ready to commit; we can change it later
ready = 1 # 1: ready; 0: not ready
reply = comm.gather(ready, root=0)
if __debug__:
if 0 == rank:
sys.stdout.write("Phase 1 done. Reply: %s (rank %d)\n" % (reply, rank))
#################
# Phase 2: commit
#################
ready_to_commit = 0
if 0 == rank:
if sum(reply) >= size/2: #we don't need to have all votes, but only the majority
ready_to_commit = 1
#broadcast the decision
ready_to_commit = comm.bcast(ready_to_commit, root=0) #this is very important, easy to make mistakes
if __debug__ and 0 == rank:
sys.stdout.write("sum(reply) = %d, ready_to_commit = %d\n" % (sum(reply), ready_to_commit))
local_commit = 0
if ready_to_commit:
#commit the local transaction, e.g., write the txn to a local file
for r in range (0, size):
if rank == r:
fp = open(output_path+"/log"+str(rank)+".txt", "a+")
fp.write("log"+str(rank)+" at "+str(datetime.datetime.now())+": "+received_txn+"\n")
fp.close()
local_commit = 1 #so, now the transaction is committed, i.e., written to the disk
done = comm.gather(local_commit, root=0)
#report the final result of the (decentralized) transaction
if 0 == rank:
if sum(done) >= size/2.0:
sys.stdout.write("Transaction committed successfully.\n")
else:
sys.stdout.write("Transaction failed to commit.\n")
### submit a txn request from a client, which is always rank-0
def submit_txn(txn):
txn = comm.bcast(txn, root=0)
return txn
### process the txn on each ndoe (i.e., rank)
def process_txn():
# For example, deduct $100 from A and credit it to B on a local replica
pass
### commit a txn request
def commit_txn(received_txn, output_path):
# there're many distributed commit protocols, e.g., 2PC, PBFT; we'll use 2pc for now
dc_2pqc(received_txn, output_path)
### Entry point
if __name__ == '__main__':
comm = MPI.COMM_WORLD
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
# Create the output directory
#if 0 == rank:
output_path = "./consensus_output"
if not os.path.exists(output_path):
os.makedirs(output_path)
# simple test for various ranks
if __debug__:
if rank == 0:
data = {'a': 7, 'b': 3.14}
comm.send(data, dest=1, tag=11)
elif rank == 1:
data = comm.recv(source=0, tag=11)
sys.stdout.write("%s received at rank %d\n"
% (data, rank))
# transaction submitter, usually from a client, assuming always on rank 0, need a broadcast here
txn = ""
if 0 == rank:
txn = "A transfers $100.00 USD to B."
for i in range(500):
received_txn = submit_txn(txn+str(i))
if __debug__:
sys.stdout.write("received_txn = %s received at rank %d \n" % (received_txn, rank))
# proceed the transaction, on all nodes
process_txn()
# commit the transaction, usually initiated by a leader
commit_txn(received_txn, output_path)