Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service generated CSR #73

Merged
merged 16 commits into from
Sep 29, 2021
Merged
4 changes: 4 additions & 0 deletions examples/get_cert_service_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def main():
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"]
request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"]
# Specify whether or not to return the private key. It is False by default.
# A password should be defined for the private key if include_private_key is True.
request.include_private_key = True
request.key_password = 'Foo.Bar.Pass.123!'
# Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first")
# or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST.
# You can also specify CHAIN_OPTION_IGNORE ("ignore") to ignore chain (supported only for TPP).
Expand Down
18 changes: 11 additions & 7 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ def test_tpp_token_enroll(self):
def test_tpp_token_enroll_with_service_generated_csr(self):
cn = random_word(10) + ".venafi.example.com"
try:
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, service_generated_csr=True)
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="FooBarPass123",
service_generated_csr=True)
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
except Exception as err:
Expand Down Expand Up @@ -527,18 +528,21 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
request.csr = csr
elif service_generated_csr:
request.csr_origin = CSR_ORIGIN_SERVICE
request.include_private_key = True

conn.request_cert(request, zone)
cert = conn.retrieve_cert(request)
# print("Certificate is:\n %s" % cert_pem)
# print("Private key is:\n %s:" % request.private_key_pem)
# and save into file
f = open("./cert.pem", "w")
f.write(cert.full_chain)
if not service_generated_csr:
f = open("./cert.key", "w")
f.write(request.private_key_pem)
f.close()
with open("./cert.pem", "w") as f:
f.write(cert.full_chain)
with open("./cert.key", "w") as f2:
if request.include_private_key:
assert cert.key is not None
f2.write(cert.key)
else:
f2.write(request.private_key_pem)

cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
assert isinstance(cert, x509.Certificate)
Expand Down
6 changes: 4 additions & 2 deletions vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from .policy import PolicySpecification
from .ssh_utils import SSHCertRequest, SSHRetrieveResponse


MIME_JSON = "application/json"
MIME_HTML = "text/html"
MIME_TEXT = "text/plain"
Expand Down Expand Up @@ -264,7 +263,8 @@ def __init__(self, cert_id=None,
origin=None,
custom_fields=None,
timeout=DEFAULT_TIMEOUT,
csr_origin=CSR_ORIGIN_LOCAL
csr_origin=CSR_ORIGIN_LOCAL,
include_private_key=False
):
"""
:param str cert_id: Certificate request id. Generating by server.
Expand All @@ -285,6 +285,7 @@ def __init__(self, cert_id=None,
:param list[CustomField] custom_fields: list of custom fields values to be added to the certificate.
:param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds.
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
:param bool include_private_key: Indicates if the private key should be returned by the server or not.
"""

self.chain_option = CHAIN_OPTION_LAST # "last"
Expand Down Expand Up @@ -318,6 +319,7 @@ def __init__(self, cert_id=None,
self.custom_fields = custom_fields
self.cert_guid = None
self.timeout = timeout
self.include_private_key = include_private_key

def __setattr__(self, key, value):
if key == "key_password":
Expand Down
6 changes: 5 additions & 1 deletion vcert/connection_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,11 @@ def retrieve_cert(self, request):
status = 0
if status == HTTPStatus.OK:
log.debug("Certificate found, parsing response...")
return parse_pem(data, request.chain_option)
cert_response = parse_pem(data, request.chain_option)
if cert_response.key is None and request.private_key is not None:
log.debug("Adding local private key to response...")
cert_response.key = request.private_key_pem
return cert_response
elif (time.time() - time_start) < request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
Expand Down
4 changes: 3 additions & 1 deletion vcert/connection_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def retrieve_cert(self, certificate_request):
).sign(root_ca_private_key, hashes.SHA256(), default_backend())

log.info("This certificate is for test purposes only. Don't use it in production!")
return parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
response = parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
response.key = certificate_request.private_key_pem
return response

def revoke_cert(self, request):
raise NotImplementedError
Expand Down
48 changes: 2 additions & 46 deletions vcert/connection_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
with_statement)

import base64
import logging as log
import re
import time

import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID

from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \
CSR_ORIGIN_PROVIDED
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType
from .connection_tpp_abstract import AbstractTPPConnection, URLS
from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError,
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
from .errors import (ServerUnexptedBehavior, ClientBadData, AuthenticationError)
from .http import HTTPStatus
from .pem import parse_pem

TOKEN_HEADER_NAME = "x-venafi-api-key" # nosec

Expand Down Expand Up @@ -115,43 +108,6 @@ def auth(self):
log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0]))
raise AuthenticationError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')

if cert_request.chain_option == "last":
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "first":
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "ignore":
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
return parse_pem(pem.decode(), cert_request.chain_option)
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def revoke_cert(self, request):
if not (request.id or request.thumbprint):
raise ClientBadData
Expand Down
57 changes: 55 additions & 2 deletions vcert/connection_tpp_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import logging as log
import re
import time
from pprint import pprint

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
from pprint import pprint

from vcert.pem import parse_pem
from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
CSR_ORIGIN_SERVICE, KeyType
CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE
from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \
CertificateRequestError, CertificateRenewError
from vcert.http import HTTPStatus
Expand Down Expand Up @@ -126,6 +129,56 @@ def request_cert(self, request, zone):
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
raise CertificateRequestError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id,
Format="base64",
IncludeChain=True)

if cert_request.csr_origin == CSR_ORIGIN_SERVICE:
retrieve_request['IncludePrivateKey'] = cert_request.include_private_key
if cert_request.key_password:
# The password is encoded when assigned (for local use, I suppose).
# decode is needed to send a raw string
retrieve_request['Password'] = cert_request.key_password.decode()

if cert_request.chain_option == CHAIN_OPTION_LAST:
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == CHAIN_OPTION_FIRST:
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == CHAIN_OPTION_IGNORE:
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
cert_response = parse_pem(pem.decode(), cert_request.chain_option)
if cert_response.key is None and cert_request.private_key is not None:
log.debug("Adding private key to response...")
cert_response.key = cert_request.private_key_pem
return cert_response
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def renew_cert(self, request, reuse_key=False):
if not request.id and not request.thumbprint:
log.debug("Request id or thumbprint must be specified for TPP")
Expand Down
45 changes: 1 addition & 44 deletions vcert/connection_tpp_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
with_statement)

import base64
import logging as log
import re
import time

import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID

from .common import MIME_JSON, TokenInfo, Authentication, KeyType, Policy, ZoneConfig, CertField
from .connection_tpp_abstract import AbstractTPPConnection, URLS
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError, CertificateRequestError,
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError)
from .http import HTTPStatus
from .pem import parse_pem

HEADER_AUTHORIZATION = "Authorization" # type: str

Expand Down Expand Up @@ -129,43 +123,6 @@ def auth(self):
def import_cert(self, request):
raise NotImplementedError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')

if cert_request.chain_option == "last":
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "first":
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "ignore":
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
return parse_pem(pem.decode(), cert_request.chain_option)
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def revoke_cert(self, request):
if not (request.id or request.thumbprint):
raise ClientBadData
Expand Down