-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumerTest.py
executable file
·72 lines (63 loc) · 2.65 KB
/
ConsumerTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python3.5
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
import argparse
import signal
import time
import struct
import codecs
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-b", help = "Broker to connect to.", type = str)
parser.add_argument("-t", help = "Topic to subscribe to.", type = str)
parser.add_argument("-p", help = "Name of the file holding the parser. Defaults to \"BaseDataParser\".", type = str)
parser.add_argument("-d", help = "Run in debug mode. Will not clean up curses-library on exit so that error messages can be seen.", action="store_true")
parser.add_argument("-s", help = "Set to consume messages from the beginning.", action="store_true")
args = parser.parse_args()
class_name_str = "BaseDataParser"
if(args.p != None):
class_name_str = args.p
if (args.b == None or args.t == None):
print("Broker and topic must be given as arguments.")
exit(0)
client = KafkaClient(hosts=args.b)
#topic = client.topics[bytes(args.t, "utf-8")]
topic = client.topics[codecs.encode(args.t, "utf-8")]
#consumer = topic.get_simple_consumer(fetch_message_max_bytes = 1024 * 1024 * 50)
#consumer = topic.get_simple_consumer(fetch_message_max_bytes = 1024 * 1024 * 50, consumer_group=bytes("mygroup", "utf-8"), auto_offset_reset=OffsetType.LATEST, reset_offset_on_start=True, consumer_timeout_ms=50)
start_offset = OffsetType.LATEST
if (args.s):
start_offset = OffsetType.EARLIEST
consumer = topic.get_simple_consumer(fetch_message_max_bytes = 1024 * 1024 * 50, consumer_group=codecs.encode(args.t, "utf-8"), auto_offset_reset=start_offset, reset_offset_on_start=True, consumer_timeout_ms=50)
#consumer.
parser = None
try:
exec("from %s import %s" % (class_name_str, class_name_str))
except Exception as e:
print("Unable to import parser with name \"%s\"." % class_name_str)
print("Got error: " + str(e))
exit(1)
try:
parser = eval("%s()" % class_name_str)
if (not args.d):
parser.debug = False
except Exception as e:
try:
del parser
except:
pass
print("Unable to instantiate \"%s\"" % class_name_str)
print("Got error: " + str(e))
exit(1)
while (True):
try:
msg = consumer.consume(block = True)
if (msg != None):
parser.parse_data(msg.value, msg.offset)
else:
parser.no_data()
except KeyboardInterrupt:
break
if __name__ == "__main__":
main()