Skip to content

Commit

Permalink
Remove automatic YubiCloud upload
Browse files Browse the repository at this point in the history
  • Loading branch information
dainnilsson committed Jun 3, 2024
1 parent d279c69 commit 2bcf81b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 135 deletions.
40 changes: 9 additions & 31 deletions ykman/_cli/otp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,8 @@
EnumChoice,
is_yk4_fips,
)
from .. import __version__
from ..scancodes import encode, KEYBOARD_LAYOUT
from ..otp import (
_PrepareUploadFailed,
_prepare_upload_key,
is_in_fips_mode,
generate_static_pw,
parse_oath_key,
Expand All @@ -76,7 +73,6 @@
import os
import struct
import click
import webbrowser


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -366,7 +362,7 @@ def delete(ctx, slot, force):
"--upload",
is_flag=True,
required=False,
help="upload credential to YubiCloud (opens a browser, can't be used with --force)",
hidden=True,
)
@click.option(
"-O",
Expand Down Expand Up @@ -395,19 +391,23 @@ def yubiotp(
Program a Yubico OTP credential.
"""

info = ctx.obj["info"]
session = _get_session(ctx)
serial = None

if upload:
raise CliFail(
"Automated YubiCloud upload support has been ended. "
"You can manually upload a credential by saving it as a CSV file "
"(use -O/--config-output) and then submitting it to "
"https://upload.yubico.com"
)

if public_id and serial_public_id:
ctx.fail("Invalid options: --public-id conflicts with --serial-public-id.")

if private_id and generate_private_id:
ctx.fail("Invalid options: --private-id conflicts with --generate-public-id.")

if upload and force:
ctx.fail("Invalid options: --upload conflicts with --force.")

if key and generate_key:
ctx.fail("Invalid options: --key conflicts with --generate-key.")

Expand Down Expand Up @@ -461,23 +461,6 @@ def yubiotp(
key = click_prompt("Enter secret key")
key = bytes.fromhex(key)

if upload:
click.confirm("Upload credential to YubiCloud?", abort=True, err=True)

try:
upload_url = _prepare_upload_key(
key,
public_id,
private_id,
serial=info.serial,
user_agent="ykman/" + __version__,
)
click.echo("Upload to YubiCloud initiated successfully.")
logger.info("Initiated YubiCloud upload")
except _PrepareUploadFailed as e:
error_msg = "\n".join(e.messages())
raise CliFail("Upload to YubiCloud failed.\n" + error_msg)

force or click.confirm(
f"Program a YubiOTP credential in slot {slot}?", abort=True, err=True
)
Expand All @@ -501,11 +484,6 @@ def yubiotp(
config_output.write(csv + "\n")
logger.info(f"Configuration parameters written to {_fname(config_output)}")

if upload:
logger.info("Launching browser for YubiCloud upload")
click.echo("Opening upload form in browser: " + upload_url)
webbrowser.open_new_tab(upload_url)


@otp.command()
@click_slot_argument
Expand Down
104 changes: 0 additions & 104 deletions ykman/otp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,124 +25,20 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from . import __version__
from .scancodes import KEYBOARD_LAYOUT
from yubikit.core.otp import modhex_encode
from yubikit.yubiotp import YubiOtpSession
from yubikit.oath import parse_b32_key
from enum import Enum
from http.client import HTTPSConnection
from datetime import datetime
from typing import Iterable, Optional

import json
import struct
import random
import logging

logger = logging.getLogger(__name__)


_UPLOAD_HOST = "upload.yubico.com"
_UPLOAD_PATH = "/prepare"


class _PrepareUploadError(Enum):
# Defined here
CONNECTION_FAILED = "Failed to open HTTPS connection."
NOT_FOUND = "Upload request not recognized by server."
SERVICE_UNAVAILABLE = (
"Service temporarily unavailable, try again later." # noqa: E501
)

# Defined in upload project
PRIVATE_ID_INVALID_LENGTH = "Private ID must be 12 characters long."
PRIVATE_ID_NOT_HEX = (
"Private ID must consist only of hex characters (0-9A-F)." # noqa: E501
)
PRIVATE_ID_UNDEFINED = "Private ID is required."
PUBLIC_ID_INVALID_LENGTH = "Public ID must be 12 characters long."
PUBLIC_ID_NOT_MODHEX = "Public ID must consist only of modhex characters (cbdefghijklnrtuv)." # noqa: E501
PUBLIC_ID_NOT_VV = 'Public ID must begin with "vv".'
PUBLIC_ID_OCCUPIED = "Public ID is already in use."
PUBLIC_ID_UNDEFINED = "Public ID is required."
SECRET_KEY_INVALID_LENGTH = "Secret key must be 32 character long." # nosec
SECRET_KEY_NOT_HEX = (
"Secret key must consist only of hex characters (0-9A-F)." # noqa: E501 # nosec
)
SECRET_KEY_UNDEFINED = "Secret key is required." # nosec
SERIAL_NOT_INT = "Serial number must be an integer."
SERIAL_TOO_LONG = "Serial number is too long."

def message(self):
return self.value


class _PrepareUploadFailed(Exception):
def __init__(self, status, content, error_ids):
super().__init__(f"Upload to YubiCloud failed with status {status}: {content}")
self.status = status
self.content = content
self.errors = [
e if isinstance(e, _PrepareUploadError) else _PrepareUploadError[e]
for e in error_ids
]

def messages(self):
return [e.message() for e in self.errors]


def _prepare_upload_key(
key,
public_id,
private_id,
serial=None,
user_agent="python-yubikey-manager/" + __version__,
):
modhex_public_id = modhex_encode(public_id)
data = {
"aes_key": key.hex(),
"serial": serial or 0,
"public_id": modhex_public_id,
"private_id": private_id.hex(),
}

httpconn = HTTPSConnection(_UPLOAD_HOST, timeout=1) # nosec

try:
httpconn.request(
"POST",
_UPLOAD_PATH,
body=json.dumps(data, indent=False, sort_keys=True).encode("utf-8"),
headers={"Content-Type": "application/json", "User-Agent": user_agent},
)
except Exception:
logger.error("Failed to connect to %s", _UPLOAD_HOST, exc_info=True)
raise _PrepareUploadFailed(None, None, [_PrepareUploadError.CONNECTION_FAILED])

resp = httpconn.getresponse()
if resp.status == 200:
url = json.loads(resp.read().decode("utf-8"))["finish_url"]
return url
else:
resp_body = resp.read()
logger.debug("Upload failed with status %d: %s", resp.status, resp_body)
if resp.status == 404:
raise _PrepareUploadFailed(
resp.status, resp_body, [_PrepareUploadError.NOT_FOUND]
)
elif resp.status == 503:
raise _PrepareUploadFailed(
resp.status, resp_body, [_PrepareUploadError.SERVICE_UNAVAILABLE]
)
else:
try:
errors = json.loads(resp_body.decode("utf-8")).get("errors")
except Exception:
errors = []
raise _PrepareUploadFailed(resp.status, resp_body, errors)


def is_in_fips_mode(session: YubiOtpSession) -> bool:
"""Check if the OTP application of a FIPS YubiKey is in FIPS approved mode.
Expand Down

0 comments on commit 2bcf81b

Please sign in to comment.