From 7fe601901ef74783092842481a273ed3bf657c2d Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Tue, 28 Sep 2021 11:56:21 -0500 Subject: [PATCH 01/16] - Added option to return the private key when retrieving certificates. --- examples/get_cert_service_tpp.py | 4 +++ tests/test_e2e.py | 9 +++--- vcert/common.py | 6 ++-- vcert/connection_tpp.py | 48 ++---------------------------- vcert/connection_tpp_abstract.py | 51 ++++++++++++++++++++++++++++++-- vcert/connection_tpp_token.py | 45 +--------------------------- 6 files changed, 65 insertions(+), 98 deletions(-) diff --git a/examples/get_cert_service_tpp.py b/examples/get_cert_service_tpp.py index 627710f..f490a58 100644 --- a/examples/get_cert_service_tpp.py +++ b/examples/get_cert_service_tpp.py @@ -52,6 +52,10 @@ def main(): request.ip_addresses = ["127.0.0.1", "192.168.1.1"] request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"] request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"] + # Specify whether or not to return the private key. It is True by default. + # request.include_private_key = False + # Specify a password for the private key. This may be required or not and depends entirely on the Zone being used. + request.key_password = 'Foo.Bar.Pass.123!' # Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first") # or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST. # You can also specify CHAIN_OPTION_IGNORE ("ignore") to ignore chain (supported only for TPP). diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 04cb6cd..27735f5 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -535,10 +535,11 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None # and save into file f = open("./cert.pem", "w") f.write(cert.full_chain) - if not service_generated_csr: - f = open("./cert.key", "w") - f.write(request.private_key_pem) - f.close() + with open("./cert.key", "w"): + if request.include_private_key: + f.write(cert.key) + else: + f.write(request.private_key_pem) cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) assert isinstance(cert, x509.Certificate) diff --git a/vcert/common.py b/vcert/common.py index 982e601..6322a30 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -38,7 +38,6 @@ from .policy import PolicySpecification from .ssh_utils import SSHCertRequest, SSHRetrieveResponse - MIME_JSON = "application/json" MIME_HTML = "text/html" MIME_TEXT = "text/plain" @@ -264,7 +263,8 @@ def __init__(self, cert_id=None, origin=None, custom_fields=None, timeout=DEFAULT_TIMEOUT, - csr_origin=CSR_ORIGIN_LOCAL + csr_origin=CSR_ORIGIN_LOCAL, + include_private_key=True ): """ :param str cert_id: Certificate request id. Generating by server. @@ -285,6 +285,7 @@ def __init__(self, cert_id=None, :param list[CustomField] custom_fields: list of custom fields values to be added to the certificate. :param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds. :param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated. + :param bool include_private_key: Indicates if the private key should be returned by the server or not. """ self.chain_option = CHAIN_OPTION_LAST # "last" @@ -318,6 +319,7 @@ def __init__(self, cert_id=None, self.custom_fields = custom_fields self.cert_guid = None self.timeout = timeout + self.include_private_key = include_private_key def __setattr__(self, key, value): if key == "key_password": diff --git a/vcert/connection_tpp.py b/vcert/connection_tpp.py index 4dfa929..8f39e49 100644 --- a/vcert/connection_tpp.py +++ b/vcert/connection_tpp.py @@ -17,23 +17,16 @@ from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes, with_statement) -import base64 import logging as log import re import time import requests -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.x509 import SignatureAlgorithmOID as AlgOID -from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \ - CSR_ORIGIN_PROVIDED +from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType from .connection_tpp_abstract import AbstractTPPConnection, URLS -from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError, - CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError) +from .errors import (ServerUnexptedBehavior, ClientBadData, AuthenticationError) from .http import HTTPStatus -from .pem import parse_pem TOKEN_HEADER_NAME = "x-venafi-api-key" # nosec @@ -115,43 +108,6 @@ def auth(self): log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0])) raise AuthenticationError - def retrieve_cert(self, cert_request): - log.debug("Getting certificate status for id %s" % cert_request.id) - - retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true') - - if cert_request.chain_option == "last": - retrieve_request['RootFirstOrder'] = 'false' - retrieve_request['IncludeChain'] = 'true' - elif cert_request.chain_option == "first": - retrieve_request['RootFirstOrder'] = 'true' - retrieve_request['IncludeChain'] = 'true' - elif cert_request.chain_option == "ignore": - retrieve_request['IncludeChain'] = 'false' - else: - log.error("chain option %s is not valid" % cert_request.chain_option) - raise ClientBadData - - time_start = time.time() - while True: - try: - status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request) - except VenafiError: - log.debug("Certificate with id %s not found." % cert_request.id) - status = 0 - - if status == HTTPStatus.OK: - pem64 = data['CertificateData'] - pem = base64.b64decode(pem64) - return parse_pem(pem.decode(), cert_request.chain_option) - elif (time.time() - time_start) < cert_request.timeout: - log.debug("Waiting for certificate...") - time.sleep(2) - else: - raise RetrieveCertificateTimeoutError( - 'Operation timed out at %d seconds while retrieving certificate with id %s' - % (cert_request.timeout, cert_request.id)) - def revoke_cert(self, request): if not (request.id or request.thumbprint): raise ClientBadData diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 42c79fe..2894146 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -13,16 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import base64 import logging as log import re import time +from pprint import pprint + from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.x509 import SignatureAlgorithmOID as AlgOID -from pprint import pprint +from pem import parse_pem from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \ - CSR_ORIGIN_SERVICE, KeyType + CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \ CertificateRequestError, CertificateRenewError from vcert.http import HTTPStatus @@ -126,6 +129,50 @@ def request_cert(self, request, zone): log.error("Request status is not %s. %s." % HTTPStatus.OK, status) raise CertificateRequestError + def retrieve_cert(self, cert_request): + log.debug("Getting certificate status for id %s" % cert_request.id) + + retrieve_request = dict(CertificateDN=cert_request.id, + Format="base64", + IncludeChain='true', + IncludePrivateKey=cert_request.include_private_key) + + if cert_request.key_password: + retrieve_request['Password'] = cert_request.key_password + + if cert_request.chain_option == CHAIN_OPTION_LAST: + retrieve_request['RootFirstOrder'] = 'false' + retrieve_request['IncludeChain'] = 'true' + elif cert_request.chain_option == CHAIN_OPTION_FIRST: + retrieve_request['RootFirstOrder'] = 'true' + retrieve_request['IncludeChain'] = 'true' + elif cert_request.chain_option == CHAIN_OPTION_IGNORE: + retrieve_request['IncludeChain'] = 'false' + else: + log.error("chain option %s is not valid" % cert_request.chain_option) + raise ClientBadData + + time_start = time.time() + while True: + try: + status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request) + except VenafiError: + log.debug("Certificate with id %s not found." % cert_request.id) + status = 0 + + if status == HTTPStatus.OK: + pem64 = data['CertificateData'] + pem = base64.b64decode(pem64) + return parse_pem(pem.decode(), cert_request.chain_option) + elif (time.time() - time_start) < cert_request.timeout: + asd = time.time() - time_start + log.debug("Waiting for certificate...") + time.sleep(2) + else: + raise RetrieveCertificateTimeoutError( + 'Operation timed out at %d seconds while retrieving certificate with id %s' + % (cert_request.timeout, cert_request.id)) + def renew_cert(self, request, reuse_key=False): if not request.id and not request.thumbprint: log.debug("Request id or thumbprint must be specified for TPP") diff --git a/vcert/connection_tpp_token.py b/vcert/connection_tpp_token.py index 966e97f..a237d71 100644 --- a/vcert/connection_tpp_token.py +++ b/vcert/connection_tpp_token.py @@ -18,22 +18,16 @@ from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes, with_statement) -import base64 import logging as log import re import time import requests -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.x509 import SignatureAlgorithmOID as AlgOID from .common import MIME_JSON, TokenInfo, Authentication, KeyType, Policy, ZoneConfig, CertField from .connection_tpp_abstract import AbstractTPPConnection, URLS -from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError, CertificateRequestError, - CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError) +from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError) from .http import HTTPStatus -from .pem import parse_pem HEADER_AUTHORIZATION = "Authorization" # type: str @@ -129,43 +123,6 @@ def auth(self): def import_cert(self, request): raise NotImplementedError - def retrieve_cert(self, cert_request): - log.debug("Getting certificate status for id %s" % cert_request.id) - - retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true') - - if cert_request.chain_option == "last": - retrieve_request['RootFirstOrder'] = 'false' - retrieve_request['IncludeChain'] = 'true' - elif cert_request.chain_option == "first": - retrieve_request['RootFirstOrder'] = 'true' - retrieve_request['IncludeChain'] = 'true' - elif cert_request.chain_option == "ignore": - retrieve_request['IncludeChain'] = 'false' - else: - log.error("chain option %s is not valid" % cert_request.chain_option) - raise ClientBadData - - time_start = time.time() - while True: - try: - status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request) - except VenafiError: - log.debug("Certificate with id %s not found." % cert_request.id) - status = 0 - - if status == HTTPStatus.OK: - pem64 = data['CertificateData'] - pem = base64.b64decode(pem64) - return parse_pem(pem.decode(), cert_request.chain_option) - elif (time.time() - time_start) < cert_request.timeout: - log.debug("Waiting for certificate...") - time.sleep(2) - else: - raise RetrieveCertificateTimeoutError( - 'Operation timed out at %d seconds while retrieving certificate with id %s' - % (cert_request.timeout, cert_request.id)) - def revoke_cert(self, request): if not (request.id or request.thumbprint): raise ClientBadData From ff59fd114d141d3854c61b6e20615ac536166551 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Tue, 28 Sep 2021 15:10:28 -0500 Subject: [PATCH 02/16] - Updated test cases to account for private key when CSR is service generated. --- tests/test_e2e.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 27735f5..19b6865 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -537,6 +537,7 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None f.write(cert.full_chain) with open("./cert.key", "w"): if request.include_private_key: + assert cert.key is not None f.write(cert.key) else: f.write(request.private_key_pem) From 932d69138d6a315cd1bc9565cf038dfb67bb8b10 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Tue, 28 Sep 2021 15:23:35 -0500 Subject: [PATCH 03/16] - Updated test cases to account for private key when CSR is service generated. --- tests/test_e2e.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 19b6865..37a0981 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -290,7 +290,8 @@ def test_tpp_token_enroll(self): def test_tpp_token_enroll_with_service_generated_csr(self): cn = random_word(10) + ".venafi.example.com" try: - _, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, service_generated_csr=True) + _, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="Foo.Bar.123!", + service_generated_csr=True) cert_config = self.tpp_conn._get_certificate_details(cert_guid) self.assertEqual(cert_config["Origin"], "Venafi VCert-Python") except Exception as err: From 33c45df9f47e6b17fe567ec781d9113ad9ecd1a4 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Tue, 28 Sep 2021 17:14:51 -0500 Subject: [PATCH 04/16] - Added private key (when available) to retrieve_cert() operation response object. --- vcert/connection_tpp_abstract.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 2894146..c7b8c05 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -163,9 +163,12 @@ def retrieve_cert(self, cert_request): if status == HTTPStatus.OK: pem64 = data['CertificateData'] pem = base64.b64decode(pem64) - return parse_pem(pem.decode(), cert_request.chain_option) + cert_response = parse_pem(pem.decode(), cert_request.chain_option) + if cert_response.key is None and cert_request.private_key is not None: + log.debug("Adding private key to response...") + cert_response.key = cert_request.private_key_pem + return cert_response elif (time.time() - time_start) < cert_request.timeout: - asd = time.time() - time_start log.debug("Waiting for certificate...") time.sleep(2) else: From 07294afe0a658040434e5d23e0536eebe625512b Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 10:57:04 -0500 Subject: [PATCH 05/16] - Fixed import issue. --- vcert/connection_tpp_abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index c7b8c05..da93e71 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -23,7 +23,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.x509 import SignatureAlgorithmOID as AlgOID -from pem import parse_pem +from vcert.pem import parse_pem from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \ CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \ From 8d827c763e1f46fec8052798bb422a2868a8f167 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 12:07:30 -0500 Subject: [PATCH 06/16] - Fixed issues with test suite. --- vcert/connection_cloud.py | 6 +++++- vcert/connection_fake.py | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index 18d3274..d4bec97 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -356,7 +356,11 @@ def retrieve_cert(self, request): status = 0 if status == HTTPStatus.OK: log.debug("Certificate found, parsing response...") - return parse_pem(data, request.chain_option) + cert_response = parse_pem(data, request.chain_option) + if cert_response.key is None and request.private_key is not None: + log.debug("Adding private key to response...") + cert_response.key = request.private_key_pem + return cert_response elif (time.time() - time_start) < request.timeout: log.debug("Waiting for certificate...") time.sleep(2) diff --git a/vcert/connection_fake.py b/vcert/connection_fake.py index 22b07d6..79d06c3 100644 --- a/vcert/connection_fake.py +++ b/vcert/connection_fake.py @@ -179,7 +179,9 @@ def retrieve_cert(self, certificate_request): ).sign(root_ca_private_key, hashes.SHA256(), default_backend()) log.info("This certificate is for test purposes only. Don't use it in production!") - return parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option) + response = parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option) + response.key = certificate_request.private_key_pem + return response def revoke_cert(self, request): raise NotImplementedError From 2a40d6f1651f58105c544a02f65ee312e377e6a2 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 12:50:58 -0500 Subject: [PATCH 07/16] - Changed default value of include_private_key parameter in the CertificateRequest object. Default value is now False. - Request field 'IncludePrivateKey' is set only for service generated CSR use case. - Request field 'Password' is set only for service generated CSR use case. - Fixed issues on the test suite. - Updated example get_cert_service_tpp to reflect this changes. --- examples/get_cert_service_tpp.py | 6 +++--- tests/test_e2e.py | 1 + vcert/common.py | 2 +- vcert/connection_cloud.py | 2 +- vcert/connection_tpp_abstract.py | 9 +++++---- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/get_cert_service_tpp.py b/examples/get_cert_service_tpp.py index f490a58..eb48923 100644 --- a/examples/get_cert_service_tpp.py +++ b/examples/get_cert_service_tpp.py @@ -52,9 +52,9 @@ def main(): request.ip_addresses = ["127.0.0.1", "192.168.1.1"] request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"] request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"] - # Specify whether or not to return the private key. It is True by default. - # request.include_private_key = False - # Specify a password for the private key. This may be required or not and depends entirely on the Zone being used. + # Specify whether or not to return the private key. It is False by default. + # A password should be defined for the private key if include_private_key is True. + request.include_private_key = True request.key_password = 'Foo.Bar.Pass.123!' # Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first") # or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST. diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 37a0981..1b65156 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -528,6 +528,7 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None request.csr = csr elif service_generated_csr: request.csr_origin = CSR_ORIGIN_SERVICE + request.include_private_key = True conn.request_cert(request, zone) cert = conn.retrieve_cert(request) diff --git a/vcert/common.py b/vcert/common.py index 6322a30..6ac4af6 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -264,7 +264,7 @@ def __init__(self, cert_id=None, custom_fields=None, timeout=DEFAULT_TIMEOUT, csr_origin=CSR_ORIGIN_LOCAL, - include_private_key=True + include_private_key=False ): """ :param str cert_id: Certificate request id. Generating by server. diff --git a/vcert/connection_cloud.py b/vcert/connection_cloud.py index d4bec97..1c21cad 100644 --- a/vcert/connection_cloud.py +++ b/vcert/connection_cloud.py @@ -358,7 +358,7 @@ def retrieve_cert(self, request): log.debug("Certificate found, parsing response...") cert_response = parse_pem(data, request.chain_option) if cert_response.key is None and request.private_key is not None: - log.debug("Adding private key to response...") + log.debug("Adding local private key to response...") cert_response.key = request.private_key_pem return cert_response elif (time.time() - time_start) < request.timeout: diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index da93e71..9658ce3 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -134,11 +134,12 @@ def retrieve_cert(self, cert_request): retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", - IncludeChain='true', - IncludePrivateKey=cert_request.include_private_key) + IncludeChain='true') - if cert_request.key_password: - retrieve_request['Password'] = cert_request.key_password + if cert_request.csr_origin == CSR_ORIGIN_SERVICE: + retrieve_request["IncludePrivateKey"] = cert_request.include_private_key + if cert_request.key_password: + retrieve_request['Password'] = cert_request.key_password if cert_request.chain_option == CHAIN_OPTION_LAST: retrieve_request['RootFirstOrder'] = 'false' From 0fbbd438df5af3ad09ad87ef581941a7fa5e855c Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 13:30:38 -0500 Subject: [PATCH 08/16] debug --- tests/test_e2e.py | 2 +- vcert/connection_tpp_abstract.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 1b65156..f3d0393 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -295,7 +295,7 @@ def test_tpp_token_enroll_with_service_generated_csr(self): cert_config = self.tpp_conn._get_certificate_details(cert_guid) self.assertEqual(cert_config["Origin"], "Venafi VCert-Python") except Exception as err: - self.fail("Error in test: %s" % err.message) + self.fail("Error in test: %s" % err) def test_tpp_token_enroll_with_custom_fields(self): cn = random_word(10) + ".venafi.example.com" diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 9658ce3..5275fc7 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -156,6 +156,7 @@ def retrieve_cert(self, cert_request): time_start = time.time() while True: try: + log.info(retrieve_request) status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request) except VenafiError: log.debug("Certificate with id %s not found." % cert_request.id) From cea796fa578d5a0da803234f3862c9ce63873b7c Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 13:53:31 -0500 Subject: [PATCH 09/16] debug --- vcert/connection_tpp_abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 5275fc7..51b3440 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -134,7 +134,7 @@ def retrieve_cert(self, cert_request): retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", - IncludeChain='true') + IncludeChain=True) if cert_request.csr_origin == CSR_ORIGIN_SERVICE: retrieve_request["IncludePrivateKey"] = cert_request.include_private_key From 4bd7dd469cdf151649ffafc608512fa49427fccf Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 15:32:03 -0500 Subject: [PATCH 10/16] debug --- tests/test_e2e.py | 2 +- vcert/connection_tpp_abstract.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index f3d0393..4b28c57 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -539,7 +539,7 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None f.write(cert.full_chain) with open("./cert.key", "w"): if request.include_private_key: - assert cert.key is not None + # assert cert.key is not None f.write(cert.key) else: f.write(request.private_key_pem) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 51b3440..aeebe05 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -136,10 +136,10 @@ def retrieve_cert(self, cert_request): Format="base64", IncludeChain=True) - if cert_request.csr_origin == CSR_ORIGIN_SERVICE: - retrieve_request["IncludePrivateKey"] = cert_request.include_private_key - if cert_request.key_password: - retrieve_request['Password'] = cert_request.key_password + # if cert_request.csr_origin == CSR_ORIGIN_SERVICE: + # retrieve_request['IncludePrivateKey'] = cert_request.include_private_key + # if cert_request.key_password: + # retrieve_request['Password'] = cert_request.key_password if cert_request.chain_option == CHAIN_OPTION_LAST: retrieve_request['RootFirstOrder'] = 'false' From 20db5bde84ffa056d0c286ceda9bbaaf11b0ad30 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 15:46:21 -0500 Subject: [PATCH 11/16] debug --- tests/test_e2e.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 4b28c57..7fd495c 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -535,14 +535,14 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None # print("Certificate is:\n %s" % cert_pem) # print("Private key is:\n %s:" % request.private_key_pem) # and save into file - f = open("./cert.pem", "w") - f.write(cert.full_chain) - with open("./cert.key", "w"): - if request.include_private_key: - # assert cert.key is not None - f.write(cert.key) - else: - f.write(request.private_key_pem) + with open("./cert.pem", "w") as f: + f.write(cert.full_chain) + # with open("./cert.key", "w") as f2: + # if request.include_private_key: + # assert cert.key is not None + # f2.write(cert.key) + # else: + # f2.write(request.private_key_pem) cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) assert isinstance(cert, x509.Certificate) From f1c98a432fd6fec99fe8d3f8c37a56931df317bb Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 15:47:16 -0500 Subject: [PATCH 12/16] debug --- tests/test_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 7fd495c..edab874 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -295,7 +295,7 @@ def test_tpp_token_enroll_with_service_generated_csr(self): cert_config = self.tpp_conn._get_certificate_details(cert_guid) self.assertEqual(cert_config["Origin"], "Venafi VCert-Python") except Exception as err: - self.fail("Error in test: %s" % err) + self.fail("Error in test: %s" % err.message) def test_tpp_token_enroll_with_custom_fields(self): cn = random_word(10) + ".venafi.example.com" From 719cd74513d39032b77c4b9c104d4164f4154003 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 16:03:19 -0500 Subject: [PATCH 13/16] debug --- tests/test_e2e.py | 12 ++++++------ vcert/connection_tpp_abstract.py | 9 ++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index edab874..4afeb44 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -537,12 +537,12 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None # and save into file with open("./cert.pem", "w") as f: f.write(cert.full_chain) - # with open("./cert.key", "w") as f2: - # if request.include_private_key: - # assert cert.key is not None - # f2.write(cert.key) - # else: - # f2.write(request.private_key_pem) + with open("./cert.key", "w") as f2: + if request.include_private_key: + assert cert.key is not None + f2.write(cert.key) + else: + f2.write(request.private_key_pem) cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend()) assert isinstance(cert, x509.Certificate) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index aeebe05..6aad8d6 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -136,10 +136,10 @@ def retrieve_cert(self, cert_request): Format="base64", IncludeChain=True) - # if cert_request.csr_origin == CSR_ORIGIN_SERVICE: - # retrieve_request['IncludePrivateKey'] = cert_request.include_private_key - # if cert_request.key_password: - # retrieve_request['Password'] = cert_request.key_password + if cert_request.csr_origin == CSR_ORIGIN_SERVICE: + retrieve_request['IncludePrivateKey'] = True + if cert_request.key_password: + retrieve_request['Password'] = cert_request.key_password if cert_request.chain_option == CHAIN_OPTION_LAST: retrieve_request['RootFirstOrder'] = 'false' @@ -156,7 +156,6 @@ def retrieve_cert(self, cert_request): time_start = time.time() while True: try: - log.info(retrieve_request) status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request) except VenafiError: log.debug("Certificate with id %s not found." % cert_request.id) From 38708140a2310289f2430d25de1019fe31dcf624 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 16:12:46 -0500 Subject: [PATCH 14/16] debug --- vcert/connection_tpp_abstract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 6aad8d6..e0a1248 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -137,7 +137,7 @@ def retrieve_cert(self, cert_request): IncludeChain=True) if cert_request.csr_origin == CSR_ORIGIN_SERVICE: - retrieve_request['IncludePrivateKey'] = True + retrieve_request['IncludePrivateKey'] = 'true' if cert_request.key_password: retrieve_request['Password'] = cert_request.key_password From 64516778039fe24e9bbc5750f8d70b0dfc3b35b4 Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 16:50:06 -0500 Subject: [PATCH 15/16] debug --- tests/test_e2e.py | 2 +- vcert/connection_tpp_abstract.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 4afeb44..b0a62cf 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -290,7 +290,7 @@ def test_tpp_token_enroll(self): def test_tpp_token_enroll_with_service_generated_csr(self): cn = random_word(10) + ".venafi.example.com" try: - _, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="Foo.Bar.123!", + _, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="FooBarPass123", service_generated_csr=True) cert_config = self.tpp_conn._get_certificate_details(cert_guid) self.assertEqual(cert_config["Origin"], "Venafi VCert-Python") diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index e0a1248..8e058da 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -137,7 +137,7 @@ def retrieve_cert(self, cert_request): IncludeChain=True) if cert_request.csr_origin == CSR_ORIGIN_SERVICE: - retrieve_request['IncludePrivateKey'] = 'true' + retrieve_request['IncludePrivateKey'] = cert_request.include_private_key if cert_request.key_password: retrieve_request['Password'] = cert_request.key_password From eda2d392ab90f52528cd7a78df98ecc7add1f63c Mon Sep 17 00:00:00 2001 From: Russel Vela Date: Wed, 29 Sep 2021 17:03:55 -0500 Subject: [PATCH 16/16] debug --- vcert/connection_tpp_abstract.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vcert/connection_tpp_abstract.py b/vcert/connection_tpp_abstract.py index 8e058da..7fcc798 100644 --- a/vcert/connection_tpp_abstract.py +++ b/vcert/connection_tpp_abstract.py @@ -139,7 +139,9 @@ def retrieve_cert(self, cert_request): if cert_request.csr_origin == CSR_ORIGIN_SERVICE: retrieve_request['IncludePrivateKey'] = cert_request.include_private_key if cert_request.key_password: - retrieve_request['Password'] = cert_request.key_password + # The password is encoded when assigned (for local use, I suppose). + # decode is needed to send a raw string + retrieve_request['Password'] = cert_request.key_password.decode() if cert_request.chain_option == CHAIN_OPTION_LAST: retrieve_request['RootFirstOrder'] = 'false'