-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmyToken.py
218 lines (165 loc) · 6.23 KB
/
myToken.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import sys
import threading
import time
from threading import Thread, Lock
from time import sleep
from channel import *
writing_count = 0
accumulate = 0
NP = 20
delta = 100
DATAFILE = "datafile.txt"
LOGFILE = "log.txt"
TOTALCOUNT = 20
mutex = Lock()
channel_mutex = Lock()
ids_in_ring_mutex = Lock()
log_file_mutex = Lock()
datafile_mutex = Lock()
writing_count_mutex = Lock()
accumulate_mutex = Lock()
creation_cond = threading.Condition()
start_time = 0
c = Channel()
ids_in_ring = []
MAX_TIME = 50000
def pass_token(current_node, left_node):
with channel_mutex:
current_node_ind = c.subgroup("RING").index(str(current_node).encode("UTF-8"))
left_node_ind = c.subgroup("RING").index(str(left_node).encode("UTF-8"))
sender = c.subgroup("RING")[current_node_ind].decode("utf-8")
c.bind(sender)
destination = [c.subgroup("RING")[left_node_ind].decode("utf-8")]
c.sendTo(destination, "token")
def pass_request(current_node, right_node):
with channel_mutex:
current_node_ind = c.subgroup("RING").index(str(current_node).encode("UTF-8"))
right_node_ind = c.subgroup("RING").index(str(right_node).encode("UTF-8"))
sender = c.subgroup("RING")[current_node_ind].decode("utf-8")
c.bind(sender)
destination = [c.subgroup("RING")[right_node_ind].decode("utf-8")]
c.sendTo(destination, "token")
def receive_token(current_node, right_node):
with channel_mutex:
current_node_ind = c.subgroup("RING").index(str(current_node).encode("UTF-8"))
right_node_ind = c.subgroup("RING").index(str(right_node).encode("UTF-8"))
receiver = c.subgroup("RING")[current_node_ind].decode("utf-8")
c.bind(receiver)
destination = [c.subgroup("RING")[right_node_ind].decode("utf-8")]
rec = c.recvFrom(destination, 1)
if rec is not None:
return True
if rec is None:
return False
def receive_request_for_token(current_node, left_node):
with channel_mutex:
current_node_ind = c.subgroup("RING").index(str(current_node).encode("UTF-8"))
left_node_ind = c.subgroup("RING").index(str(left_node).encode("UTF-8"))
receiver = c.subgroup("RING")[current_node_ind].decode("utf-8")
c.bind(receiver)
destination = [c.subgroup("RING")[left_node_ind].decode("utf-8")]
rec = c.recvFrom(destination, 1)
if rec is not None:
return True
if rec is None:
return False
def write_log(line, filename):
with log_file_mutex:
with open(filename, 'a+') as file:
file.write(line)
def write_datafile(acc, count, filename):
with datafile_mutex:
with open(filename, 'w+') as file:
file.write(str(acc) + "\n")
file.write(str(count))
def process(pid):
local_random = random.Random()
local_random.seed(start_time + pid)
os_pid = threading.get_native_id()
global accumulate
global writing_count
my_updates_count = 0
with channel_mutex:
id_in_ring = int(c.join("RING"))
with ids_in_ring_mutex:
ids_in_ring.append(id_in_ring)
has_token = False
wants_token = False
received_request = False
passed_request = False
global mutex
if pid == 0:
has_token = True
if pid != NP - 1:
with creation_cond:
creation_cond.wait()
if pid == NP - 1:
with creation_cond:
creation_cond.notify_all()
left_neighbor = ids_in_ring[(pid + 1) % NP]
right_neighbor = ids_in_ring[(pid - 1) % NP]
time_for_next_request = -1
time_after_last_reset = 0
requests_count = 0
while True:
if writing_count >= TOTALCOUNT:
return
if time_for_next_request == -1:
# generate random time
time_for_next_request = local_random.randint(0, MAX_TIME)
time_after_last_reset = time.time() * 1000
passed_request = False
remaining_time = time_for_next_request - ((time.time() * 1000) - time_after_last_reset)
if remaining_time <= 0:
wants_token = True
if wants_token:
if not has_token:
has_token = receive_token(id_in_ring, right_node=right_neighbor)
if not has_token and not passed_request:
pass_request(id_in_ring, right_node=right_neighbor)
passed_request = True
print("process %d wants the token" % (pid))
if has_token:
with mutex:
with accumulate_mutex:
accumulate += delta
with writing_count_mutex:
writing_count += 1
my_updates_count += 1
write_datafile(accumulate, writing_count, DATAFILE)
print("Writing %d" % pid)
line = "t=%d, pid= %d, ospid=%d, new=%d, my_updates=%d ,total_count=%d is writing to file\n" % (
(time.time() * 1000) - start_time, pid, os_pid, accumulate, my_updates_count, writing_count)
write_log(line, LOGFILE)
time_for_next_request = -1
wants_token = False
if not has_token and not wants_token and requests_count > 0:
has_token = receive_token(id_in_ring, right_node=right_neighbor)
received_request = receive_request_for_token(id_in_ring, left_node=left_neighbor)
if received_request:
requests_count += 1
if not has_token:
pass_request(id_in_ring, right_neighbor)
if has_token:
if requests_count > 0:
has_token = False
pass_token(id_in_ring, left_neighbor)
requests_count -= 1
def get_time_since_start():
return (time.time() * 1000) - start_time
if __name__ == '__main__':
NP = int(sys.argv[1])
DATAFILE = sys.argv[2]
delta = int(sys.argv[3])
TOTALCOUNT = int(sys.argv[4])
LOGFILE = sys.argv[5]
MAX_TIME = int(sys.argv[6])
start_time = round(time.time() * 1000)
print("Start time:")
print(start_time)
print("Please wait for initalization....\n")
# process()
for i in range(0, NP):
sleep(1)
t = Thread(target=process, args=(i,))
t.start()