-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathscrape.py
127 lines (95 loc) · 3.35 KB
/
scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import multiprocessing
import sys
import re
import requests
import urllib3
from urllib.parse import urlparse
# 1. Use this to scrape a resource from a list of given URLs
# 2. In Burp start a new scan and them as "URLs to Scan"
# 3. Selectively disable other extensions adding active scanner checks and run a "Audit checks - extensions only" scan.
RESOURCES_PATTERN = r'(?:(?:href|src)=(?:["\']([^\'"]*)[\'"]|([^\s<>]+)))' # @d0nutptr
EXCLUDED_EXTENSIONS = [r"html?", r"as.x?", r"php\d?"]
RESULTS_FILE = "results.txt"
PROCESSES_COUNT = 4
DONE_FLAG = "__done__"
def initiate(pool, results, urls):
jobs = []
for url in urls:
job = pool.apply_async(scrape, (url, results))
jobs.append(job)
try:
for job in jobs:
job.get()
except KeyboardInterrupt:
print("Killed.")
try:
pool.terminate()
pool.close()
finally:
sys.exit(0)
def scrape(url, queue):
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
results = set()
print("Scraping %s ..." % url)
try:
response = requests.get(url, verify=False, timeout=3)
if response.history:
url = "{uri.scheme}://{uri.netloc}".format(uri=urlparse(response.url))
content = response.content
except:
print("Failed on %s: %s" % (url, sys.exc_info()[1]))
return
matches = re.findall(RESOURCES_PATTERN, content.decode("utf-8", "replace"))
for match in matches:
for group in match:
results.add(group)
results = [result for result in results if is_same_origin(url, result) or is_relative(result)]
results = [
result
for result in results
if ("." in result.split("/")[-1] and not is_excluded(result.split("/")[-1].split(".")[-1]))
]
results = [get_full_url(url, result) for result in results]
print("Found %s resources on %s" % (len(results), url))
for result in results:
queue.put(result.replace(" ", "%20"))
def writer(queue):
results = set()
while True:
try:
entry = queue.get()
if entry == DONE_FLAG:
return results
results.add(entry)
except:
# KeyboardInterrupt
break
def is_same_origin(origin, url):
return url.startswith(origin + "/") or url.startswith("//%s/" % origin.split("/")[2])
def is_relative(url):
return url.startswith("/") and not (url.startswith("//") or url.startswith("/\\"))
def is_excluded(extension):
return any(re.match(ep, extension) for ep in EXCLUDED_EXTENSIONS)
def get_full_url(origin, url):
if url.startswith(origin):
return url
if url.startswith("//"):
return origin.split("/")[0] + url
if url.startswith("/"):
return origin + url
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: %s <domain_list>" % sys.argv[0])
sys.exit()
with open(sys.argv[1]) as f:
urls = [line.strip().rstrip("/") for line in f.readlines()]
results = multiprocessing.Manager().Queue()
p = multiprocessing.Pool(4)
wjob = p.apply_async(writer, (results,))
initiate(p, results, urls)
results.put(DONE_FLAG)
resources = wjob.get()
p.close()
with open(RESULTS_FILE, "w", encoding="utf-8") as f:
for resource in resources:
f.write("%s\n" % resource)