Skip to content

Commit

Permalink
feat: added tests and more general functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
robinvandernoord committed Jan 17, 2024
1 parent 6680b3a commit be86df5
Show file tree
Hide file tree
Showing 13 changed files with 363 additions and 67 deletions.
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ directory = "src"
include = []
exclude = []
# 'all':
stop-after-first-failure = false
stop-after-first-failure = true
# pytest:
coverage = 100
badge = true
# --format json indent
json-indent = 4
json-indent = 2


[tool.black]
Expand Down Expand Up @@ -181,4 +181,4 @@ add_ignore = [
[tool.pytest.ini_options]
pythonpath = [
"src",
]
]
120 changes: 73 additions & 47 deletions src/twofas/_security.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import base64
import getpass
import hashlib
import json
import logging
import sys
import time
from getpass import getpass
import warnings
from pathlib import Path
from typing import Any, Optional

import cryptography.exceptions
import keyring
import keyring.backends.SecretService
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from keyring.backend import KeyringBackend

from ._types import AnyDict, TwoFactorAuthDetails
from ._types import AnyDict, TwoFactorAuthDetails, into_class

# Suppress keyring warnings
keyring_logger = logging.getLogger("keyring")
Expand All @@ -30,15 +31,20 @@ def _decrypt(encrypted: str, passphrase: str) -> list[AnyDict]:
aesgcm = AESGCM(key)
credentials_dec = aesgcm.decrypt(nonce, credentials_enc, None)
dec = json.loads(credentials_dec) # type: list[AnyDict]
if not isinstance(dec, list):
if not isinstance(dec, list): # pragma: no cover
raise TypeError("Unexpected data structure in input file.")
return dec


def decrypt(encrypted: str, passphrase: str) -> list[TwoFactorAuthDetails]:
"""
Raises:
PermissionError
"""
try:
dicts = _decrypt(encrypted, passphrase)
return [TwoFactorAuthDetails.load(d) for d in dicts]
return into_class(dicts, TwoFactorAuthDetails)
except cryptography.exceptions.InvalidTag as e:
# wrong passphrase!
raise PermissionError("Invalid passphrase for file.") from e
Expand All @@ -53,58 +59,78 @@ def hash_string(data: Any) -> str:
return sha256.hexdigest()


tmp = Path("/tmp")
tmp_file = tmp / ".2fas"

# APPNAME is session specific but with global prefix for easy clean up
PREFIX = "2fas:"

if tmp_file.exists() and (session := tmp_file.read_text()) and session.startswith(PREFIX):
# existing session
APPNAME = session
else:
# new session!
session = hash_string((time.time())) # random enough for this purpose
APPNAME = f"{PREFIX}{session}"
tmp_file.write_text(APPNAME)

class KeyringManager:
appname: str = ""
tmp_file = Path("/tmp") / ".2fas"

def __init__(self) -> None:
self._init()

def _init(self) -> None:
# so you can call init again to set active appname (for pytest)
tmp_file = self.tmp_file
# APPNAME is session specific but with global prefix for easy clean up

if tmp_file.exists() and (session := tmp_file.read_text()) and session.startswith(PREFIX):
# existing session
self.appname = session
else:
# new session!
session = hash_string((time.time())) # random enough for this purpose
self.appname = f"{PREFIX}{session}"
tmp_file.write_text(self.appname)

@classmethod
def _retrieve_credentials(cls, filename: str, appname: str) -> Optional[str]:
return keyring.get_password(appname, hash_string(filename))

def retrieve_credentials(self, filename: str) -> Optional[str]:
return self._retrieve_credentials(filename, self.appname)

@classmethod
def _save_credentials(cls, filename: str, passphrase: str, appname: str) -> None:
keyring.set_password(appname, hash_string(filename), passphrase)

def retrieve_credentials(filename: str) -> Optional[str]:
return keyring.get_password(APPNAME, hash_string(filename))
def save_credentials(self, filename: str) -> str:
passphrase = getpass.getpass("Passphrase? ")
self._save_credentials(filename, passphrase, self.appname)

return passphrase

def save_credentials(filename: str) -> str:
passphrase = getpass("Passphrase? ")
keyring.set_password(APPNAME, hash_string(filename), passphrase)
@classmethod
def _delete_credentials(cls, filename: str, appname: str) -> None:
keyring.delete_password(appname, hash_string(filename))

return passphrase
def delete_credentials(self, filename: str) -> None:
self._delete_credentials(filename, self.appname)

@classmethod
def _cleanup_keyring(cls, appname: str) -> int:
kr: keyring.backends.SecretService.Keyring | KeyringBackend = keyring.get_keyring()
if not hasattr(kr, "get_preferred_collection"): # pragma: no cover
warnings.warn(f"Can't clean up this keyring backend! {type(kr)}", category=RuntimeWarning)
return -1

def cleanup_keyring() -> None:
import keyring.backends.SecretService
collection = kr.get_preferred_collection()

kr: keyring.backends.SecretService.Keyring | KeyringBackend = keyring.get_keyring()
if not hasattr(kr, "get_preferred_collection"):
print(f"Can't clean up this keyring backend! {type(kr)}", file=sys.stderr) # todo: only if verbose
return
# get old 2fas: keyring items:
return len(
[
item
for item in collection.get_all_items()
if (
service := item.get_attributes().get("service", "")
) # must have a 'service' attribute, otherwise it's unrelated
and service.startswith(PREFIX) # must be a 2fas: service, otherwise it's unrelated
and service != appname # must not be the currently active session
]
)

c = kr.get_preferred_collection()
def cleanup_keyring(self) -> None:
self._cleanup_keyring(self.appname)

# get old 2fas: keyring items:
relevant = [
_
for _ in c.get_all_items()
if (
service := _.get_attributes().get("service", "")
) # must have a 'service' attribute, otherwise it's unrelated
and service.startswith(PREFIX) # must be a 2fas: service, otherwise it's unrelated
and service != APPNAME # must not be the currently active session
]

print(
"removed",
len([_.delete() for _ in relevant]),
"old items",
# todo: only if verbose I guess
file=sys.stderr,
)
keyring_manager = KeyringManager()
28 changes: 24 additions & 4 deletions src/twofas/_types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import typing
from typing import Optional

from configuraptor import TypedConfig
from configuraptor import TypedConfig, asdict, asjson
from pyotp import TOTP

AnyDict = dict[str, typing.Any]
Expand Down Expand Up @@ -41,9 +41,6 @@ class TwoFactorAuthDetails(TypedConfig):

_topt: Optional[TOTP] = None # lazily loaded when calling .totp or .generate()

def __repr__(self) -> str:
return f"<2fas '{self.name}'>"

@property
def totp(self) -> TOTP:
if not self._topt:
Expand All @@ -52,3 +49,26 @@ def totp(self) -> TOTP:

def generate(self) -> str:
return self.totp.now()

def generate_int(self) -> int:
# !!! usually not prefered, because this drops leading zeroes!!
return int(self.totp.now())

def as_dict(self) -> AnyDict:
return asdict(self, with_top_level_key=False, exclude_internals=2)

def as_json(self) -> str:
return asjson(self, with_top_level_key=False, indent=2, exclude_internals=2)

def __str__(self) -> str:
return f"<2fas '{self.name}'>"

def __repr__(self) -> str:
return self.as_json()


T_TypedConfig = typing.TypeVar("T_TypedConfig", bound=TypedConfig)


def into_class(entries: list[AnyDict], klass: typing.Type[T_TypedConfig]) -> list[T_TypedConfig]:
return [klass.load(d) for d in entries]
107 changes: 102 additions & 5 deletions src/twofas/core.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,114 @@
import json
import sys
import typing
from collections import defaultdict
from pathlib import Path
from typing import Optional

from ._security import decrypt, retrieve_credentials, save_credentials
from ._types import TwoFactorAuthDetails
from ._security import decrypt, keyring_manager
from ._types import TwoFactorAuthDetails, into_class
from .utils import flatten, fuzzy_match

T_TwoFactorAuthDetails = typing.TypeVar("T_TwoFactorAuthDetails", bound=TwoFactorAuthDetails)

def load_services(filename: str) -> list[TwoFactorAuthDetails]:

class TwoFactorStorage(typing.Generic[T_TwoFactorAuthDetails]):
_multidict: defaultdict[str, list[T_TwoFactorAuthDetails]]
count: int

def __init__(self, _klass: typing.Type[T_TwoFactorAuthDetails] = None) -> None:
# _klass is purely for annotation atm

self._multidict = defaultdict(list) # one name can map to multiple keys
self.count = 0

def __bool__(self) -> bool:
return self.count > 0

def add(self, entries: list[T_TwoFactorAuthDetails]) -> None:
for entry in entries:
self._multidict[entry.name].append(entry)

self.count += len(entries)

def __getitem__(self, item: str) -> list[T_TwoFactorAuthDetails]:
# class[property] syntax
return self._multidict[item]

def keys(self) -> list[str]:
return list(self._multidict.keys())

def _fuzzy_find(self, find: typing.Optional[str], fuzz_threshold: int) -> list[T_TwoFactorAuthDetails]:
if find is None:
# don't loop
return list(self)

all_items = self._multidict.items()

find = find.upper()
# if nothing found exactly, try again but fuzzy (could be slower)
# search in key:
fuzzy = [
# search in key
v
for k, v in all_items
if fuzzy_match(k.upper(), find) > fuzz_threshold
]
if fuzzy and (flat := flatten(fuzzy)):
return flat

# search in value:
return [
# search in value instead
v
for v in list(self)
if fuzzy_match(str(v).upper(), find) > fuzz_threshold
]

def find(self, target: Optional[str] = None, fuzz_threshold: int = 75) -> list[T_TwoFactorAuthDetails]:
# first try exact match:
if items := self._multidict.get(target or ""):
return items
# else: fuzzy match:
return self._fuzzy_find(target, fuzz_threshold)

def all(self) -> list[T_TwoFactorAuthDetails]:
return list(self)

def __iter__(self) -> typing.Generator[T_TwoFactorAuthDetails, None, None]:
for entries in self._multidict.values():
yield from entries

def __repr__(self) -> str:
return f"<TwoFactorStorage with {len(self._multidict)} keys and {self.count} entries>"


def load_services(filename: str, _max_retries: int = 0) -> TwoFactorStorage[TwoFactorAuthDetails]:
filepath = Path(filename)
with filepath.open() as f:
data_raw = f.read()
data = json.loads(data_raw)

storage = TwoFactorStorage(TwoFactorAuthDetails)

if decrypted := data["services"]:
services = into_class(decrypted, TwoFactorAuthDetails)
storage.add(services)
return storage

encrypted = data["servicesEncrypted"]
password = retrieve_credentials(filename) or save_credentials(filename)

return decrypt(encrypted, password)
retries = 0
while True:
password = keyring_manager.retrieve_credentials(filename) or keyring_manager.save_credentials(filename)
try:
entries = decrypt(encrypted, password)
storage.add(entries)
return storage
except PermissionError as e:
retries += 1 # only really useful for pytest
print(e, file=sys.stderr)
keyring_manager.delete_credentials(filename)

if _max_retries and retries > _max_retries:
raise e
13 changes: 5 additions & 8 deletions src/twofas/main.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import sys

from ._security import cleanup_keyring
from ._security import keyring_manager
from .core import load_services


def _main(filename: str = '') -> None:
cleanup_keyring()
def _main(filename: str = "") -> None:
keyring_manager.cleanup_keyring()

if not filename:
print('todo: remember previous file(s)')
print("todo: remember previous file(s)")
return

decrypted = load_services(filename)

print([(_, _.generate()) for _ in decrypted])
print([(_, _.generate()) for _ in decrypted.all()])


def main() -> None:
return _main(*sys.argv[1:])


if __name__ == "__main__":
# fixme: soft-code
# todo: store prefered 2fas file somewhere
# todo: deal with unencrypted ones
main()
Loading

0 comments on commit be86df5

Please sign in to comment.