Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add diff support #150

Merged
merged 8 commits into from
May 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelogs/fragments/150-diff.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
minor_changes:
- openssl_csr - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- openssl_csr_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- openssl_privatekey - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- openssl_privatekey_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- openssl_publickey - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- x509_certificate - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- x509_certificate_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
- x509_crl - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150).
32 changes: 28 additions & 4 deletions plugins/module_utils/crypto/module_backends/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
cryptography_compare_public_keys,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import (
get_certificate_info,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
MINIMAL_PYOPENSSL_VERSION = '0.15'

Expand Down Expand Up @@ -95,6 +99,19 @@ def __init__(self, module, backend):
self.check_csr_subject = True
self.check_csr_extensions = True

self.diff_before = self._get_info(None)
self.diff_after = self._get_info(None)

def _get_info(self, data):
if data is None:
return dict()
try:
result = get_certificate_info(self.module, self.backend, data, prefer_one_fingerprint=True)
result['can_parse_certificate'] = True
return result
except Exception as exc:
return dict(can_parse_certificate=False)

@abc.abstractmethod
def generate_certificate(self):
"""(Re-)Generate certificate."""
Expand All @@ -108,6 +125,7 @@ def get_certificate_data(self):
def set_existing(self, certificate_bytes):
"""Set existing certificate bytes. None indicates that the key does not exist."""
self.existing_certificate_bytes = certificate_bytes
self.diff_after = self.diff_before = self._get_info(self.existing_certificate_bytes)

def has_existing(self):
"""Query whether an existing certificate is/has been there."""
Expand Down Expand Up @@ -284,13 +302,19 @@ def dump(self, include_certificate):
'privatekey': self.privatekey_path,
'csr': self.csr_path
}
# Get hold of certificate bytes
certificate_bytes = self.existing_certificate_bytes
if self.cert is not None:
certificate_bytes = self.get_certificate_data()
self.diff_after = self._get_info(certificate_bytes)
if include_certificate:
# Get hold of certificate bytes
certificate_bytes = self.existing_certificate_bytes
if self.cert is not None:
certificate_bytes = self.get_certificate_data()
# Store result
result['certificate'] = certificate_bytes.decode('utf-8') if certificate_bytes else None

result['diff'] = dict(
before=self.diff_before,
after=self.diff_after,
)
return result


Expand Down
15 changes: 10 additions & 5 deletions plugins/module_utils/crypto/module_backends/certificate_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _get_all_extensions(self):
def _get_ocsp_uri(self):
pass

def get_info(self):
def get_info(self, prefer_one_fingerprint=False):
result = dict()
self.cert = load_certificate(None, content=self.content, backend=self.backend)

Expand Down Expand Up @@ -195,14 +195,19 @@ def get_info(self):

result['public_key'] = self._get_public_key_pem()

public_key_info = get_publickey_info(self.module, self.backend, key=self._get_public_key_object())
public_key_info = get_publickey_info(
self.module,
self.backend,
key=self._get_public_key_object(),
prefer_one_fingerprint=prefer_one_fingerprint)
result.update({
'public_key_type': public_key_info['type'],
'public_key_data': public_key_info['public_data'],
'public_key_fingerprints': public_key_info['fingerprints'],
})

result['fingerprints'] = get_fingerprint_of_bytes(self._get_der_bytes())
result['fingerprints'] = get_fingerprint_of_bytes(
self._get_der_bytes(), prefer_one=prefer_one_fingerprint)

if self.backend != 'pyopenssl':
ski = self._get_subject_key_identifier()
Expand Down Expand Up @@ -512,12 +517,12 @@ def _get_ocsp_uri(self):
return None


def get_certificate_info(module, backend, content):
def get_certificate_info(module, backend, content, prefer_one_fingerprint=False):
if backend == 'cryptography':
info = CertificateInfoRetrievalCryptography(module, content)
elif backend == 'pyopenssl':
info = CertificateInfoRetrievalPyOpenSSL(module, content)
return info.get_info()
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)


def select_backend(module, backend, content):
Expand Down
33 changes: 29 additions & 4 deletions plugins/module_utils/crypto/module_backends/csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
pyopenssl_parse_name_constraints,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr_info import (
get_csr_info,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec


Expand Down Expand Up @@ -177,6 +181,20 @@ def __init__(self, module, backend):
self.existing_csr = None
self.existing_csr_bytes = None

self.diff_before = self._get_info(None)
self.diff_after = self._get_info(None)

def _get_info(self, data):
if data is None:
return dict()
try:
result = get_csr_info(
self.module, self.backend, data, validate_signature=False, prefer_one_fingerprint=True)
result['can_parse_csr'] = True
return result
except Exception as exc:
return dict(can_parse_csr=False)

@abc.abstractmethod
def generate_csr(self):
"""(Re-)Generate CSR."""
Expand All @@ -190,6 +208,7 @@ def get_csr_data(self):
def set_existing(self, csr_bytes):
"""Set existing CSR bytes. None indicates that the CSR does not exist."""
self.existing_csr_bytes = csr_bytes
self.diff_after = self.diff_before = self._get_info(self.existing_csr_bytes)

def has_existing(self):
"""Query whether an existing CSR is/has been there."""
Expand Down Expand Up @@ -238,13 +257,19 @@ def dump(self, include_csr):
'name_constraints_permitted': self.name_constraints_permitted,
'name_constraints_excluded': self.name_constraints_excluded,
}
# Get hold of CSR bytes
csr_bytes = self.existing_csr_bytes
if self.csr is not None:
csr_bytes = self.get_csr_data()
self.diff_after = self._get_info(csr_bytes)
if include_csr:
# Get hold of CSR bytes
csr_bytes = self.existing_csr_bytes
if self.csr is not None:
csr_bytes = self.get_csr_data()
# Store result
result['csr'] = csr_bytes.decode('utf-8') if csr_bytes else None

result['diff'] = dict(
before=self.diff_before,
after=self.diff_after,
)
return result


Expand Down
12 changes: 8 additions & 4 deletions plugins/module_utils/crypto/module_backends/csr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _get_all_extensions(self):
def _is_signature_valid(self):
pass

def get_info(self):
def get_info(self, prefer_one_fingerprint=False):
result = dict()
self.csr = load_certificate_request(None, content=self.content, backend=self.backend)

Expand All @@ -162,7 +162,11 @@ def get_info(self):

result['public_key'] = self._get_public_key_pem()

public_key_info = get_publickey_info(self.module, self.backend, key=self._get_public_key_object())
public_key_info = get_publickey_info(
self.module,
self.backend,
key=self._get_public_key_object(),
prefer_one_fingerprint=prefer_one_fingerprint)
result.update({
'public_key_type': public_key_info['type'],
'public_key_data': public_key_info['public_data'],
Expand Down Expand Up @@ -429,12 +433,12 @@ def _is_signature_valid(self):
return False


def get_csr_info(module, backend, content, validate_signature=True):
def get_csr_info(module, backend, content, validate_signature=True, prefer_one_fingerprint=False):
if backend == 'cryptography':
info = CSRInfoRetrievalCryptography(module, content, validate_signature=validate_signature)
elif backend == 'pyopenssl':
info = CSRInfoRetrievalPyOpenSSL(module, content, validate_signature=validate_signature)
return info.get_info()
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)


def select_backend(module, backend, content, validate_signature=True):
Expand Down
39 changes: 35 additions & 4 deletions plugins/module_utils/crypto/module_backends/privatekey.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
identify_private_key_format,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey_info import (
PrivateKeyConsistencyError,
PrivateKeyParseError,
get_privatekey_info,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec


Expand Down Expand Up @@ -102,6 +108,25 @@ def __init__(self, module, backend):
self.existing_private_key = None
self.existing_private_key_bytes = None

self.diff_before = self._get_info(None)
self.diff_after = self._get_info(None)

def _get_info(self, data):
if data is None:
return dict()
result = dict(can_parse_key=False)
try:
result.update(get_privatekey_info(
self.module, self.backend, data, passphrase=self.passphrase,
return_private_key_data=False, prefer_one_fingerprint=True))
except PrivateKeyConsistencyError as exc:
result.update(exc.result)
except PrivateKeyParseError as exc:
result.update(exc.result)
except Exception as exc:
pass
return result

@abc.abstractmethod
def generate_private_key(self):
"""(Re-)Generate private key."""
Expand All @@ -125,6 +150,7 @@ def get_private_key_data(self):
def set_existing(self, privatekey_bytes):
"""Set existing private key bytes. None indicates that the key does not exist."""
self.existing_private_key_bytes = privatekey_bytes
self.diff_after = self.diff_before = self._get_info(self.existing_private_key_bytes)

def has_existing(self):
"""Query whether an existing private key is/has been there."""
Expand Down Expand Up @@ -215,11 +241,12 @@ def dump(self, include_key):
}
if self.type == 'ECC':
result['curve'] = self.curve
# Get hold of private key bytes
pk_bytes = self.existing_private_key_bytes
if self.private_key is not None:
pk_bytes = self.get_private_key_data()
self.diff_after = self._get_info(pk_bytes)
if include_key:
# Get hold of private key bytes
pk_bytes = self.existing_private_key_bytes
if self.private_key is not None:
pk_bytes = self.get_private_key_data()
# Store result
if pk_bytes:
if identify_private_key_format(pk_bytes) == 'raw':
Expand All @@ -229,6 +256,10 @@ def dump(self, include_key):
else:
result['privatekey'] = None

result['diff'] = dict(
before=self.diff_before,
after=self.diff_after,
)
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def _get_key_info(self):
def _is_key_consistent(self, key_public_data, key_private_data):
pass

def get_info(self):
def get_info(self, prefer_one_fingerprint=False):
result = dict(
can_parse_key=False,
key_is_consistent=None,
Expand All @@ -227,7 +227,8 @@ def get_info(self):

result['public_key'] = self._get_public_key(binary=False)
pk = self._get_public_key(binary=True)
result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
result['public_key_fingerprints'] = get_fingerprint_of_bytes(
pk, prefer_one=prefer_one_fingerprint) if pk is not None else dict()

key_type, key_public_data, key_private_data = self._get_key_info()
result['type'] = key_type
Expand Down Expand Up @@ -386,14 +387,14 @@ def _is_key_consistent(self, key_public_data, key_private_data):
return None


def get_privatekey_info(module, backend, content, passphrase=None, return_private_key_data=False):
def get_privatekey_info(module, backend, content, passphrase=None, return_private_key_data=False, prefer_one_fingerprint=False):
if backend == 'cryptography':
info = PrivateKeyInfoRetrievalCryptography(
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
elif backend == 'pyopenssl':
info = PrivateKeyInfoRetrievalPyOpenSSL(
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
return info.get_info()
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)


def select_backend(module, backend, content, passphrase=None, return_private_key_data=False):
Expand Down
9 changes: 5 additions & 4 deletions plugins/module_utils/crypto/module_backends/publickey_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _get_public_key(self, binary):
def _get_key_info(self):
pass

def get_info(self):
def get_info(self, prefer_one_fingerprint=False):
result = dict()
if self.key is None:
try:
Expand All @@ -218,7 +218,8 @@ def get_info(self):
raise PublicKeyParseError(to_native(e))

pk = self._get_public_key(binary=True)
result['fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
result['fingerprints'] = get_fingerprint_of_bytes(
pk, prefer_one=prefer_one_fingerprint) if pk is not None else dict()

key_type, key_public_data = self._get_key_info()
result['type'] = key_type
Expand Down Expand Up @@ -276,12 +277,12 @@ def _get_key_info(self):
return key_type, key_public_data


def get_publickey_info(module, backend, content=None, key=None):
def get_publickey_info(module, backend, content=None, key=None, prefer_one_fingerprint=False):
if backend == 'cryptography':
info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key)
elif backend == 'pyopenssl':
info = PublicKeyInfoRetrievalPyOpenSSL(module, content=content, key=key)
return info.get_info()
return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint)


def select_backend(module, backend, content=None, key=None):
Expand Down
Loading