Skip to content

Commit

Permalink
Add new feature
Browse files Browse the repository at this point in the history
Feature to verify the url before starting the session. Improve code quality.
  • Loading branch information
kaustubhrprabhu committed Sep 10, 2023
1 parent e78e94e commit 918d116
Showing 1 changed file with 57 additions and 56 deletions.
113 changes: 57 additions & 56 deletions knockknock.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@ class KnockKnock:
cgreenbg = '\33[30;42m'
cyellowbg = '\33[30;43m'

timeout = 5
headers = None
proxies = None

print_lock_ = threading.Lock()
paths = None
agents = None
found = []
total_found = 0
total_scanned = 0

def __init__(self, url:str, random_agent:bool) -> None:
self.url = self.validate_url(url)
def __init__(self, url:str, multithread:bool, random_agent:bool) -> None:
self.get_paths()
self.random_agent = random_agent
if random_agent:
self.get_agents()
self.url = self.validate_url(url)
self.multithread = multithread
self.random_agent = random_agent

def get_agents(self):
try:
Expand All @@ -39,6 +45,16 @@ def validate_url(self, url):
valid_url = url.removesuffix('/')
if url[:4] != 'http':
valid_url = 'http://' + url
print(f'\n{self.cyellow}Verifying url, please wait...{self.cend}', end='')
try:
requests.get(valid_url)
except requests.exceptions.RequestException:
print(f'\x1b[1K\r{self.cred}Failed to validate:{self.cend} {valid_url}')
print(f'\t╟═══ Check the url format')
print(f'\t╟═══ Check whether url is valid')
print(f'\t╚═══ Check your network connection')
sys.exit(1)
print(f'\x1b[1K\r{self.cgreen}Valid:{self.cend} {valid_url}\n')
return valid_url

def get_paths(self):
Expand All @@ -49,56 +65,47 @@ def get_paths(self):
print(f'{self.cred} paths.txt file is missing!{self.cend}')
sys.exit(1)

def scan(self, path, headers=None):
def scan(self, path):
full_url = self.url + path
if self.random_agent:
self.headers = {'user-agent': random.choice(self.agents)}
sys.stdout.write(f'\x1b[1K\r{self.cyellow}Scanning:{self.cend} {path}')
try:
r = requests.get(full_url, timeout=5, headers=headers)
r = requests.get(full_url, timeout=self.timeout, headers=self.headers, proxies=self.proxies)
except requests.exceptions.RequestException:
with self.print_lock_:
sys.stdout.write(f'\x1b[1K\r{self.cred}Unable to scan:{self.cend} {path}')
else:
self.total_scanned += 1
if r.status_code == 200:
self.found.append(full_url)
self.total_found += 1
with self.print_lock_:
return print(f'{self.cgreen}[+]{self.cend} {full_url}')
with self.print_lock_:
return print(f'{self.cred}[-]{self.cend} {full_url}')
except requests.exceptions.RequestException:
pass

def run_single_thread(self):
sys.stdout.write(f'\x1b[1K\r{self.cgreen}[+] {self.cend}{full_url}\n')
else:
with self.print_lock_:
sys.stdout.write(f'\x1b[1K\r{self.cred}[-] {self.cend}{full_url}')

def run_scan(self):
print(f'{self.cgreen}Session started...{self.cend}\n')
header = None
try:
for path in self.paths:
if self.random_agent:
header = {'user-agent': random.choice(self.agents)}
self.scan(path, header)
if self.multithread:
threads = []
for path in self.paths:
thread = threading.Thread(target=self.scan, args=(path,))
threads.append(thread)
thread.daemon = True
thread.start()
for t in threads:
t.join()
else:
for path in self.paths:
self.scan(path)
print(f'\n\n{self.cgreen}Session completed!{self.cend}')
print(f'\t╟═══[📄] Total pages found: {self.total_found}')
print(f'\t╚═══[📚] Total pages scanned: {self.total_scanned}')
except KeyboardInterrupt:
print(f'\n{self.cred}Session terminated{self.cend}')

def run_multi_thread(self):
print(f'{self.cgreen}Session started...{self.cend}\n')
header = None
threads = []
for path in self.paths:
try:
if self.random_agent:
header = {'user-agent': random.choice(self.agents)}
thread = threading.Thread(target=self.scan, args=(path, header))
threads.append(thread)
thread.start()
except KeyboardInterrupt:
print(f'\n{self.cred}Session is about to stop...{self.cend}\n')
break
for t in threads:
try:
t.join()
except KeyboardInterrupt:
print(f'\n{self.cred}Please wait...{self.cend}\n')
pass

def result(self):
print(f'\n\t{self.cyellowbg} Total found: {len(self.found)} {self.cend}')
if self.found:
for url in self.found:
print(f'\t{self.cgreen}{url}{self.cend}')
sys.stdout.write(f'\x1b[1K\r\n{self.cred}Session terminated!{self.cend}\n\x1b[1K\r')
sys.exit(1)


if __name__ == '__main__':
Expand All @@ -107,7 +114,7 @@ def result(self):
█▄▀ █▄░█ █▀█ █▀▀ █▄▀ █▄▀ █▄░█ █▀█ █▀▀ █▄▀
█░█ █░▀█ █▄█ █▄▄ █░█ █░█ █░▀█ █▄█ █▄▄ █░█
🔥 v0.2 made by Kaustubh Prabhu 🔥
🔥 v0.2.1 made by Kaustubh Prabhu 🔥
[https://github.com/kaustubhrprabhu/KnockKnock.git]
''' + '\33[0m')
Expand All @@ -118,11 +125,5 @@ def result(self):
parser.add_argument('-r', '--random-agent', help='use random user-agents', dest='ragent', action='store_true')
args = parser.parse_args()

knockknock = KnockKnock(args.url, args.ragent)

if args.fast:
knockknock.run_multi_thread()
else:
knockknock.run_single_thread()

knockknock.result()
knockknock = KnockKnock(args.url, args.fast, args.ragent)
knockknock.run_scan()

0 comments on commit 918d116

Please sign in to comment.