Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add duo univ prompt support #50

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion alohomora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import sys

__version__ = '3.0.3'
__version__ = '3.1.0'
__author__ = 'Viasat'
__author_email__ = 'vice-support@viasat.com'
__license__ = '(c) 2022 Viasat, Inc. See the LICENSE file for more details.'
Expand Down
323 changes: 209 additions & 114 deletions alohomora/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,199 @@ def login_one_factor(self, username, password):
return (True, assertion)
return (False, response)

def _get_duo_plugin_payload(self, soup, post_url, auth_device):
''' Get the DUO plugin payload '''
payload = { inputtag.get('name', ''): inputtag.get('value', '')
for inputtag in soup.find_all('input') }

xsrf = payload.get('_xsrf', '')

# Post data to emulate the plugin determination
(response, soup) = self._do_post(post_url, data=payload)

sid = unquote(urlparse.urlparse(response.request.url).query[4:])
new_action = self._get_form_action(soup)
device = self._get_duo_device(soup, auth_device)
factor = self._get_auth_factor(soup, device)

do_wa = device.value.startswith('WA')
plugin_payload = {
'sid': sid,
'device': device.value,
'factor': factor.name,
'out_of_date': ''
}

if factor.name == "Passcode":
plugin_payload['passcode'] = factor.value

if do_wa:
# configure webauthn plugin info
plugin_payload['factor'] = factor.name if (
device.name != "Security Key (U2F)"
and not device.value.startswith('WA')) else "WebAuthn Credential"

return (plugin_payload, xsrf, new_action)

def _wait_for_duo_status(self, allowed, duo_status_endpoint, sid, txid):
''' Poll Duo for MFA status '''
while not allowed:
# call again to get status of request
# for a push notification, this will hang until the user approves/denies
# for a phone call, you need to keep polling until the user approves/denies
(status, _) = self._do_post(duo_status_endpoint,
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)

if status_data['stat'] != 'OK':
LOG.error(f"Returned from second status call: {status.text}")
alohomora.die("Sorry, there was a problem talking to Duo.")
if status_data['response']['status_code'] == 'allow':
LOG.info("Login allowed!")
allowed = True
elif status_data['response']['status_code'] == 'deny':
LOG.error(f"Login disallowed: {status.text}")
alohomora.die("The login was blocked!")
else:
print(f"Still waiting... ({status_data['response']['status_code']})")
LOG.info(f"Still waiting... ({status_data['response']['status_code']})")
LOG.debug(str(status_data))
time.sleep(2)

return status_data

def _process_webauthn_request(self, status_data, plugin_payload, duo_host, sid, iframe=False):
''' Wait for webauthn device to be approved '''
if not iframe:
duo_prompt_endpoint = f'https://{duo_host}/frame/v4/prompt'
duo_status_endpoint = f'https://{duo_host}/frame/v4/status'
else:
duo_prompt_endpoint = f'https://{duo_host}/frame/prompt'
duo_status_endpoint = f'https://{duo_host}/frame/status'
opts = status_data['response']['webauthn_credential_request_options']
challenges = [r for r in opts['allowCredentials']
if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])]
if not challenges:
alohomora.die('Sorry, there was a problem talking to Duo.')
resp = self._get_webauthn_response(opts)
LOG.debug(resp)

# include the session ID as passed to us earlier
plugin_payload['sid'] = sid
# webauthn_credential and webauthn_finish are magic strings here
plugin_payload['device'] = 'webauthn_credential'
plugin_payload['factor'] = 'webauthn_finish'
# these are a copy/paste from the duo integration's POST data
plugin_payload['out_of_date'] = None
plugin_payload['days_out_of_date'] = None
plugin_payload['days_to_block'] = 'None'
# finally, the response data itself needs to be a JSON string
plugin_payload['response_data'] = json.dumps(resp,separators=(',', ':'))

LOG.debug(plugin_payload)

(status, _) = self._do_post(duo_prompt_endpoint, data=plugin_payload,
soup=False)
status_data = json.loads(status.text)
# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
txid = json.loads(status.text)['response']['txid']
LOG.debug("Received transaction ID %s from response %s", txid, status.text)

# Initial call will NOT block
(status, _) = self._do_post(duo_status_endpoint,
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)
LOG.info(str(status_data))
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
allowed = status_data['response']['status_code'] == 'allow'
if not allowed:
alohomora.die("Sorry, there was a problem with your security key, try again.")

factor_name = 'webauthn_finish'
return (factor_name, txid, allowed, status_data)

def login_two_factor(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
# If redirected to duosecurity we are doing the oidc flow
if urlparse.urlparse(response_1fa.url).netloc.endswith('duosecurity.com'):
LOG.debug("Using DUO universal prompt")
return self._login_two_factor_duo_univ(response_1fa, auth_device)
else:
LOG.debug("Using DUO iframe prompt")
return self._login_two_factor_iframe(response_1fa, auth_device)

def _login_two_factor_duo_univ(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
soup = BeautifulSoup(response_1fa.content, 'html.parser')
post_url = response_1fa.url
duo_host = urlparse.urlparse(post_url).netloc

(plugin_payload, xsrf, _) = self._get_duo_plugin_payload(soup, post_url, auth_device)

sid = plugin_payload['sid']
factor_name = plugin_payload['factor']

# Start the Duo Auth request
(status, _) = self._do_post(f'https://{duo_host}/frame/v4/prompt',
data=plugin_payload,
soup=False)

# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
response = json.loads(status.text)['response']
txid = response['txid']

LOG.debug("Received response: %s", status.text)
LOG.debug("Received response %s", response)
LOG.debug("Received transaction ID %s", txid)

# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/v4/status',
data={'sid': sid, 'txid': txid}, soup=False)

status_data = json.loads(status.text)
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
allowed = status_data['response']['status_code'] == 'allow'

# If not immediately approved, poll for status
if not allowed:
# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
(factor_name, txid, allowed, _) = self._process_webauthn_request(
status_data,
plugin_payload,
duo_host,
sid)
else:
self._wait_for_duo_status(allowed,
f'https://{duo_host}/frame/v4/status',
sid,
txid)

payload = {
"sid": sid,
"txid": txid,
"factor": factor_name,
"device_key": "",
"_xsrf": xsrf,
"dampen_choice": False,
}

duo_exit_url = f'https://{duo_host}/frame/v4/oidc/exit'

(response, soup) = self._do_post(duo_exit_url,
data=payload, soup=True)

assertion = self._get_assertion(soup)
return (True, assertion)

def _login_two_factor_iframe(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
soup_1fa = BeautifulSoup(response_1fa.text, 'html.parser')
duo_host = None
sig_request = None
Expand All @@ -394,68 +584,23 @@ def login_two_factor(self, response_1fa, auth_device=None):
'Origin': origin_duo_host
})

payload = {}
for inputtag in soup.find_all('input'):
name = inputtag.get('name', '')
value = inputtag.get('value', '')
# Populate all parameters with the existing value (picks up hidden fields too)
payload[name] = value

# Post data to emulate the plugin determination
LOG.info('Posting plugin information to Duo')
(response, soup) = self._do_post(frame_url, data=payload)

sid = unquote(urlparse.urlparse(response.request.url).query[4:])
new_action = self._get_form_action(soup)
device = self._get_duo_device(soup, auth_device)
factor = self._get_auth_factor(soup, device)

do_wa = device.value.startswith('WA')
# Finally send the POST request for an auth to Duo
payload = {
'sid': sid,
'device': device.value,
'factor': factor.name,
'out_of_date': ''
}
LOG.debug("Payload: %s", payload)
if factor.name == "Passcode":
payload['passcode'] = factor.value
prompt_sid_url = response.url
headers = {'Referer': prompt_sid_url,
'Origin': origin_duo_host}
if do_wa:
# pull in the webauthn prompt
popup_url = (f'https://{duo_host}{new_action}/'
f'webauthn_auth_popup?sid={sid}&wkey={device.value}')
LOG.debug("Popup URL: %s", popup_url)
(status, soup) = self._do_get(
popup_url,
headers=headers)
payload = {
'sid': sid,
'device': device.value,
'factor': factor.name if (
device.name != "Security Key (U2F)"
and not device.value.startswith('WA')) else "WebAuthn Credential",
}
headers = {'Referer': popup_url, 'Origin': origin_duo_host}
(payload, _, new_action) = self._get_duo_plugin_payload(soup, frame_url, auth_device)

(status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload,
headers=headers, soup=False)
soup=False)

# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
LOG.debug("Received response: %s", status.text)
response = json.loads(status.text)['response']
txid = response['txid']
sid = payload['sid']
LOG.debug("Received response %s", response)
LOG.debug("Received transaction ID %s", txid)

headers = {'Referer': prompt_sid_url, 'Origin': origin_duo_host}
# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, headers=headers, soup=False)
data={'sid': sid, 'txid': txid}, soup=False)
# text from this will be something like
# {
# "stat": "OK",
Expand Down Expand Up @@ -502,72 +647,22 @@ def login_two_factor(self, response_1fa, auth_device=None):
print(status_data['response']['status'])
allowed = status_data['response']['status_code'] == 'allow'

# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
opts = status_data['response']['webauthn_credential_request_options']
challenges = [r for r in opts['allowCredentials']
if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])]
if not challenges:
alohomora.die('Sorry, there was a problem talking to Duo.')
resp = self._get_webauthn_response(opts)
LOG.debug(resp)

# include the session ID as passed to us earlier
payload['sid'] = sid
# webauthn_credential and webauthn_finish are magic strings here
payload['device'] = 'webauthn_credential'
payload['factor'] = 'webauthn_finish'
# these are a copy/paste from the duo integration's POST data
payload['out_of_date'] = None
payload['days_out_of_date'] = None
payload['days_to_block'] = 'None'
# finally, the response data itself needs to be a JSON string
payload['response_data'] = json.dumps(resp,separators=(',', ':'))

LOG.debug(payload)

(status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload,
headers=headers, soup=False)
status_data = json.loads(status.text)
# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
txid = json.loads(status.text)['response']['txid']
LOG.debug("Received transaction ID %s from response %s", txid, status.text)

# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, headers=headers, soup=False)
status_data = json.loads(status.text)
LOG.info(str(status_data))
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
print(status_data['response']['status'])
allowed = status_data['response']['status_code'] == 'allow'
if not allowed:
alohomora.die("Sorry, there was a problem with your security key, try again.")

while not allowed:
# call again to get status of request
# for a push notification, this will hang until the user approves/denies
# for a phone call, you need to keep polling until the user approves/denies
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)

if status_data['stat'] != 'OK':
LOG.error("Returned from second status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
if status_data['response']['status_code'] == 'allow':
LOG.info("Login allowed!")
allowed = True
elif status_data['response']['status_code'] == 'deny':
LOG.error("Login disallowed: %s", status.text)
alohomora.die("The login was blocked!")
# If not immediately approved, poll for status
if not allowed:
# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
(factor_name, txid, allowed, status_data) = self._process_webauthn_request(
status_data,
payload,
duo_host,
sid,
iframe=True)
else:
LOG.info("Still waiting... (%s)", status_data['response']['status_code'])
LOG.debug(str(status_data))
time.sleep(2)
status_data = self._wait_for_duo_status(
allowed,
f'https://{duo_host}/frame/status',
sid,
txid)

signed_auth = ''
if 'result_url' in status_data['response']:
Expand Down