-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsniffer.py
116 lines (103 loc) · 4.03 KB
/
sniffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#! /usr/local/bin/python3.5
from threading import Thread
import readchar
from frame_sender.frame_sender import FrameSender
from network_analyzer.tracking_connection import TrackingConnection
from printer.data_printer import print_ethernet_frame, print_ipv4_packet, \
print_tcp_segment, print_udp_segment, print_ipv6_packet, print_arp_packet
from settings.mode_parser import ModeParser
from unpacker.frame import Frame
from unpacker.packet import IPv4Packet, IPv6Packet, ARPPacket
from unpacker.segment import TCPSegment, UDPSegment
class Sniffer:
def __init__(self):
mode_parser = ModeParser()
self.console, self.plot, self.dump, self.send = \
mode_parser.get_settings()
if (
not self.console
and not self.plot
and not self.dump
and not self.send):
print(
'Use "-h" or "--help" as arguments to get info about '
'available options'
)
exit(0)
self.sock = mode_parser.get_socket()
if self.send:
try:
filename = mode_parser.get_dump_name()
except IndexError:
print('Wrong filename')
exit(-1)
frame_sender = FrameSender(filename, self.sock)
frame_sender.send_frames()
print('Completed')
exit(0)
self.tracking_connections = []
if self.plot:
from network_analyzer.plot_handler import PlotHandler
self.tracking_connections = mode_parser.get_tracking_connections(
self.dump)
self.plot_handler = PlotHandler(self.tracking_connections)
else:
self.tracking_connections = [
TrackingConnection('ALL', 'ALL', self.dump, True)
]
self.sniffer = Thread(target=self.sniff)
self.quit_handler = Thread(target=self.wait_for_quit)
self.finish = False
def start(self):
self.sniffer.start()
self.quit_handler.start()
if self.plot:
self.plot_handler.start()
self.sniffer.join()
self.quit_handler.join()
def wait_for_quit(self):
while True:
a = readchar.readchar()
if a == ' ' or a == b' ':
self.finish = True
if self.plot:
self.plot_handler.finish = True
print("Closing...")
return
def sniff(self):
while not self.finish:
raw_data = self.sock.get_raw_frame()
frame = Frame(raw_data)
if self.console:
print('-' * 20)
print_ethernet_frame(frame)
parsed_packet = None
if frame.ether_type == 'IPv4':
parsed_packet = IPv4Packet(frame.data)
if self.console:
print_ipv4_packet(parsed_packet)
elif frame.ether_type == 'IPv6':
parsed_packet = IPv6Packet(frame.data)
if self.console:
print_ipv6_packet(parsed_packet)
elif frame.ether_type == 'ARP':
parsed_packet = ARPPacket(frame.data)
if self.console:
print_arp_packet(parsed_packet)
for tunnel in self.tracking_connections:
tunnel.check_packet(raw_data, parsed_packet)
if (isinstance(parsed_packet, IPv4Packet) or
isinstance(parsed_packet, IPv6Packet)):
if parsed_packet.protocol == 6:
tcp_segment = TCPSegment(parsed_packet.data)
if self.console:
print_tcp_segment(tcp_segment)
elif parsed_packet.protocol == 17:
udp_segment = UDPSegment(parsed_packet.data)
if self.console:
print_udp_segment(udp_segment)
for connection in self.tracking_connections:
connection.close()
if __name__ == "__main__":
sniffer = Sniffer()
sniffer.start()