Skip to content

Commit

Permalink
Merge pull request #115 from dajiaji/add-new
Browse files Browse the repository at this point in the history
Add new() as constructor.
  • Loading branch information
dajiaji authored Jun 4, 2021
2 parents f2c655d + 65444d5 commit f39a013
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 166 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Changes
Unreleased
----------

- Introduce new() into CWT/COSE. `#115 <https://github.com/dajiaji/python-cwt/pull/115>`__
- Rename Claims.from_dict to Claims.new. `#115 <https://github.com/dajiaji/python-cwt/pull/115>`__
- Rename COSEKey.from_dict to COSEKey.new. `#115 <https://github.com/dajiaji/python-cwt/pull/115>`__
- Rename Recipient.from_dict to Recipient.new. `#115 <https://github.com/dajiaji/python-cwt/pull/115>`__
- Add Signer for encode_and_sign function. `#114 <https://github.com/dajiaji/python-cwt/pull/114>`__
- Divide CWT options into independent parameters. `#113 <https://github.com/dajiaji/python-cwt/pull/113>`__

Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ decoded = cwt.decode(token, key)
# }

# If you want to treat the result like a JWT;
readable = Claims.from_dict(decoded)
readable = Claims.new(decoded)

# readable.iss == 'coaps://as.example'
# readable.sub == 'dajiaji'
Expand Down Expand Up @@ -228,7 +228,7 @@ raw = cwt.decode(token, public_key)
# raw[-70002][0] == "bar"
# raw[-70003]["baz"] == "qux"
# raw[-70004] == 123
readable = Claims.from_dict(raw)
readable = Claims.new(raw)
# readable.get(-70001) == "foo"
# readable.get(-70002)[0] == "bar"
# readable.get(-70003)["baz"] == "qux"
Expand Down Expand Up @@ -267,7 +267,7 @@ token = cwt.encode(
private_key,
)
raw = cwt.decode(token, public_key)
readable = Claims.from_dict(
readable = Claims.new(
raw,
private_claims_names={
"ext_1": -70001,
Expand Down Expand Up @@ -352,10 +352,10 @@ with open("./public_key_of_issuer.pem") as key_file:

# Verifies and decodes the CWT received from the presenter.
raw = cwt.decode(token, public_key)
decoded = Claims.from_dict(raw)
decoded = Claims.new(raw)

# Extracts the PoP key from the CWT.
extracted_pop_key = COSEKey.from_dict(decoded.cnf) # = raw[8][1]
extracted_pop_key = COSEKey.new(decoded.cnf) # = raw[8][1]

# Then, verifies the message sent by the presenter
# with the signature which is also sent by the presenter as follows:
Expand Down
12 changes: 4 additions & 8 deletions cwt/claims.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
from typing import Any, Dict, List, TypeVar, Union
from typing import Any, Dict, List, Union

from .const import CWT_CLAIM_NAMES
from .cose_key import COSEKey

T = TypeVar("T", bound="Claims")


class Claims:
"""
Expand Down Expand Up @@ -71,9 +69,7 @@ def __init__(
return

@classmethod
def from_dict(
cls, claims: Dict[int, Any], private_claim_names: Dict[str, int] = {}
) -> Any:
def new(cls, claims: Dict[int, Any], private_claim_names: Dict[str, int] = {}):
"""
Create a Claims object from a CBOR-like(Dict[int, Any]) claim object.
Expand Down Expand Up @@ -105,7 +101,7 @@ def from_json(
cls,
claims: Union[str, bytes, Dict[str, Any]],
private_claim_names: Dict[str, int] = {},
) -> T:
):
"""
Converts a JWT claims object into a CWT claims object which has numeric
keys. If a key string in JSON data cannot be mapped to a numeric key,
Expand Down Expand Up @@ -166,7 +162,7 @@ def from_json(
for i in [-259, -258, 7]:
if i in cbor_claims and isinstance(cbor_claims[i], str):
cbor_claims[i] = cbor_claims[i].encode("utf-8")
return cls.from_dict(cbor_claims, private_claim_names)
return cls.new(cbor_claims, private_claim_names)

@classmethod
def validate(cls, claims: Dict[int, Any]):
Expand Down
18 changes: 11 additions & 7 deletions cwt/cose.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ class COSE(CBORProcessor):
def __init__(
self, alg_auto_inclusion: int = False, kid_auto_inclusion: int = False
):
if not isinstance(alg_auto_inclusion, bool):
raise ValueError("alg_auto_inclusion should be bool.")
self._alg_auto_inclusion = alg_auto_inclusion

if not isinstance(kid_auto_inclusion, bool):
raise ValueError("kid_auto_inclusion should be bool.")
self._kid_auto_inclusion = kid_auto_inclusion

@classmethod
def new(cls, alg_auto_inclusion: int = False, kid_auto_inclusion: int = False):
"""
Constructor.
Expand All @@ -29,13 +39,7 @@ def __init__(
kid_auto_inclusion(bool): The indicator whether ``kid`` parameter is included
in a proper header bucket automatically or not.
"""
if not isinstance(alg_auto_inclusion, bool):
raise ValueError("alg_auto_inclusion should be bool.")
self._alg_auto_inclusion = alg_auto_inclusion

if not isinstance(kid_auto_inclusion, bool):
raise ValueError("kid_auto_inclusion should be bool.")
self._kid_auto_inclusion = kid_auto_inclusion
return cls(alg_auto_inclusion, kid_auto_inclusion)

def encode_and_mac(
self,
Expand Down
10 changes: 5 additions & 5 deletions cwt/cose_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class COSEKey:
"""

@staticmethod
def from_dict(cose_key: Dict[int, Any]) -> COSEKeyInterface:
def new(cose_key: Dict[int, Any]) -> COSEKeyInterface:
"""
Create a COSE key from a CBOR-like dictionary with numeric keys.
Expand Down Expand Up @@ -148,7 +148,7 @@ def from_symmetric_key(
except Exception:
raise ValueError("Unsupported or unknown key_ops.")
cose_key[4] = key_ops_labels
return cls.from_dict(cose_key)
return cls.new(cose_key)

@classmethod
def from_bytes(cls, key_data: bytes) -> COSEKeyInterface:
Expand All @@ -164,7 +164,7 @@ def from_bytes(cls, key_data: bytes) -> COSEKeyInterface:
DecodeError: Failed to decode the key data.
"""
cose_key = cbor2.loads(key_data)
return cls.from_dict(cose_key)
return cls.new(cose_key)

@classmethod
def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]) -> COSEKeyInterface:
Expand Down Expand Up @@ -270,7 +270,7 @@ def from_jwk(cls, data: Union[str, bytes, Dict[str, Any]]) -> COSEKeyInterface:
if use != 0:
cose_key[4] = []
cose_key[4].append(use)
return cls.from_dict(cose_key)
return cls.new(cose_key)

@classmethod
def from_pem(
Expand Down Expand Up @@ -438,4 +438,4 @@ def from_pem(
)
else:
raise ValueError(f"Unsupported or unknown key: {type(k)}.")
return cls.from_dict(cose_key)
return cls.new(cose_key)
40 changes: 23 additions & 17 deletions cwt/cwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ class CWT(CBORProcessor):

def __init__(
self, expires_in: int = CWT_DEFAULT_EXPIRES_IN, leeway: int = CWT_DEFAULT_LEEWAY
):
if not isinstance(expires_in, int):
raise ValueError("expires_in should be int.")
if expires_in <= 0:
raise ValueError("expires_in should be positive number.")
self._expires_in = expires_in

if not isinstance(leeway, int):
raise ValueError("leeway should be int.")
if leeway <= 0:
raise ValueError("leeway should be positive number.")
self._leeway = leeway

self._cose = COSE(kid_auto_inclusion=True, alg_auto_inclusion=True)
self._claim_names: Dict[str, int] = {}

@classmethod
def new(
cls, expires_in: int = CWT_DEFAULT_EXPIRES_IN, leeway: int = CWT_DEFAULT_LEEWAY
):
"""
Constructor.
Expand All @@ -42,27 +61,14 @@ def __init__(
Examples:
>>> from cwt import CWT, COSEKey
>>> ctx = CWT(expires_in=3600*24, leeway=10)
>>> ctx = CWT.new(expires_in=3600*24, leeway=10)
>>> key = COSEKey.from_symmetric_key(alg="HS256")
>>> token = ctx.encode(
... {"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"},
... key,
... )
"""
if not isinstance(expires_in, int):
raise ValueError("expires_in should be int.")
if expires_in <= 0:
raise ValueError("expires_in should be positive number.")
self._expires_in = expires_in

if not isinstance(leeway, int):
raise ValueError("leeway should be int.")
if leeway <= 0:
raise ValueError("leeway should be positive number.")
self._leeway = leeway

self._cose = COSE(kid_auto_inclusion=True, alg_auto_inclusion=True)
self._claim_names: Dict[str, int] = {}
return cls(expires_in, leeway)

@property
def expires_in(self) -> int:
Expand All @@ -81,7 +87,7 @@ def leeway(self) -> int:

def encode(
self,
claims: Union[Claims, Dict[str, Any], Dict[int, Any], bytes, str],
claims: Union[Claims, Dict[str, Any], Dict[int, Any], bytes],
key: COSEKeyInterface,
nonce: bytes = b"",
recipients: Optional[List[RecipientInterface]] = None,
Expand All @@ -101,7 +107,7 @@ def encode(
parameter set to identify the usage.
Args:
claims (Union[Claims, Dict[str, Any], Dict[int, Any], bytes, str]): A CWT
claims (Union[Claims, Dict[str, Any], Dict[int, Any], bytes]): A CWT
claims object, or a JWT claims object, text string or byte string.
key (COSEKeyInterface): A COSE key used to generate a MAC for the claims.
recipients (List[RecipientInterface]): A list of recipient information structures.
Expand Down
2 changes: 1 addition & 1 deletion cwt/encrypted_cose_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ def to_cose_key(
VerifyError: Failed to verify the COSE key.
"""
res = cbor2.loads(COSE().decode(CBORTag(16, key), encryption_key))
return COSEKey.from_dict(res)
return COSEKey.new(res)
4 changes: 2 additions & 2 deletions cwt/recipient.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Recipient:
"""

@classmethod
def from_dict(
def new(
cls,
protected: dict = {},
unprotected: dict = {},
Expand Down Expand Up @@ -120,4 +120,4 @@ def from_json(cls, data: Union[str, bytes, Dict[str, Any]]) -> RecipientInterfac
raise ValueError("k should be str.")
key = base64url_decode(recipient["k"])

return cls.from_dict(protected, unprotected, key_ops=key_ops, key=key)
return cls.new(protected, unprotected, key_ops=key_ops, key=key)
4 changes: 2 additions & 2 deletions cwt/recipients.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def _create_recipient(cls, recipient: List[Any]) -> RecipientInterface:
if not isinstance(recipient[2], bytes):
raise ValueError("ciphertext should be bytes.")
if len(recipient) == 3:
return Recipient.from_dict(protected, recipient[1], recipient[2])
return Recipient.new(protected, recipient[1], recipient[2])
if not isinstance(recipient[3], list):
raise ValueError("recipients should be list.")
recipients: List[RecipientInterface] = []
for r in recipient[3]:
recipients.append(cls._create_recipient(r))
return Recipient.from_dict(protected, recipient[1], recipient[2], recipients)
return Recipient.new(protected, recipient[1], recipient[2], recipients)

def derive_key(
self,
Expand Down
Loading

0 comments on commit f39a013

Please sign in to comment.