Skip to content

Commit

Permalink
Merge pull request #73 from Venafi/service_generated_csr
Browse files Browse the repository at this point in the history
Service generated CSR
  • Loading branch information
rvelaVenafi authored Sep 29, 2021
2 parents f46d60f + eda2d39 commit 12ecdcf
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 103 deletions.
4 changes: 4 additions & 0 deletions examples/get_cert_service_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ def main():
request.ip_addresses = ["127.0.0.1", "192.168.1.1"]
request.uniform_resource_identifiers = ["http://wgtest.uri.com","https://ragnartest.uri.com"]
request.user_principal_names = ["upn1@venafi.example.com", "upn2@venafi.example.com"]
# Specify whether or not to return the private key. It is False by default.
# A password should be defined for the private key if include_private_key is True.
request.include_private_key = True
request.key_password = 'Foo.Bar.Pass.123!'
# Specify ordering certificates in chain. Root can be CHAIN_OPTION_FIRST ("first")
# or CHAIN_OPTION_LAST ("last"). By default it is CHAIN_OPTION_LAST.
# You can also specify CHAIN_OPTION_IGNORE ("ignore") to ignore chain (supported only for TPP).
Expand Down
18 changes: 11 additions & 7 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ def test_tpp_token_enroll(self):
def test_tpp_token_enroll_with_service_generated_csr(self):
cn = random_word(10) + ".venafi.example.com"
try:
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, service_generated_csr=True)
_, _, _, _, cert_guid = enroll(self.tpp_conn, self.tpp_zone, cn=cn, password="FooBarPass123",
service_generated_csr=True)
cert_config = self.tpp_conn._get_certificate_details(cert_guid)
self.assertEqual(cert_config["Origin"], "Venafi VCert-Python")
except Exception as err:
Expand Down Expand Up @@ -527,18 +528,21 @@ def enroll(conn, zone, cn=None, private_key=None, public_key=None, password=None
request.csr = csr
elif service_generated_csr:
request.csr_origin = CSR_ORIGIN_SERVICE
request.include_private_key = True

conn.request_cert(request, zone)
cert = conn.retrieve_cert(request)
# print("Certificate is:\n %s" % cert_pem)
# print("Private key is:\n %s:" % request.private_key_pem)
# and save into file
f = open("./cert.pem", "w")
f.write(cert.full_chain)
if not service_generated_csr:
f = open("./cert.key", "w")
f.write(request.private_key_pem)
f.close()
with open("./cert.pem", "w") as f:
f.write(cert.full_chain)
with open("./cert.key", "w") as f2:
if request.include_private_key:
assert cert.key is not None
f2.write(cert.key)
else:
f2.write(request.private_key_pem)

cert = x509.load_pem_x509_certificate(cert.cert.encode(), default_backend())
assert isinstance(cert, x509.Certificate)
Expand Down
6 changes: 4 additions & 2 deletions vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from .policy import PolicySpecification
from .ssh_utils import SSHCertRequest, SSHRetrieveResponse


MIME_JSON = "application/json"
MIME_HTML = "text/html"
MIME_TEXT = "text/plain"
Expand Down Expand Up @@ -264,7 +263,8 @@ def __init__(self, cert_id=None,
origin=None,
custom_fields=None,
timeout=DEFAULT_TIMEOUT,
csr_origin=CSR_ORIGIN_LOCAL
csr_origin=CSR_ORIGIN_LOCAL,
include_private_key=False
):
"""
:param str cert_id: Certificate request id. Generating by server.
Expand All @@ -285,6 +285,7 @@ def __init__(self, cert_id=None,
:param list[CustomField] custom_fields: list of custom fields values to be added to the certificate.
:param int timeout: Timeout for the certificate to be retrieved from server. Measured in seconds.
:param str csr_origin: The origin of the CSR, either user provided, locally generated or service generated.
:param bool include_private_key: Indicates if the private key should be returned by the server or not.
"""

self.chain_option = CHAIN_OPTION_LAST # "last"
Expand Down Expand Up @@ -318,6 +319,7 @@ def __init__(self, cert_id=None,
self.custom_fields = custom_fields
self.cert_guid = None
self.timeout = timeout
self.include_private_key = include_private_key

def __setattr__(self, key, value):
if key == "key_password":
Expand Down
6 changes: 5 additions & 1 deletion vcert/connection_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,11 @@ def retrieve_cert(self, request):
status = 0
if status == HTTPStatus.OK:
log.debug("Certificate found, parsing response...")
return parse_pem(data, request.chain_option)
cert_response = parse_pem(data, request.chain_option)
if cert_response.key is None and request.private_key is not None:
log.debug("Adding local private key to response...")
cert_response.key = request.private_key_pem
return cert_response
elif (time.time() - time_start) < request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
Expand Down
4 changes: 3 additions & 1 deletion vcert/connection_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def retrieve_cert(self, certificate_request):
).sign(root_ca_private_key, hashes.SHA256(), default_backend())

log.info("This certificate is for test purposes only. Don't use it in production!")
return parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
response = parse_pem(cert.public_bytes(serialization.Encoding.PEM).decode(), certificate_request.chain_option)
response.key = certificate_request.private_key_pem
return response

def revoke_cert(self, request):
raise NotImplementedError
Expand Down
48 changes: 2 additions & 46 deletions vcert/connection_tpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,16 @@
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
with_statement)

import base64
import logging as log
import re
import time

import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID

from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType, CSR_ORIGIN_LOCAL, CSR_ORIGIN_SERVICE, \
CSR_ORIGIN_PROVIDED
from .common import MIME_JSON, CertField, ZoneConfig, Policy, KeyType
from .connection_tpp_abstract import AbstractTPPConnection, URLS
from .errors import (ServerUnexptedBehavior, ClientBadData, CertificateRequestError, AuthenticationError,
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
from .errors import (ServerUnexptedBehavior, ClientBadData, AuthenticationError)
from .http import HTTPStatus
from .pem import parse_pem

TOKEN_HEADER_NAME = "x-venafi-api-key" # nosec

Expand Down Expand Up @@ -115,43 +108,6 @@ def auth(self):
log.error("Authentication status is not %s but %s. Exiting" % (HTTPStatus.OK, status[0]))
raise AuthenticationError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')

if cert_request.chain_option == "last":
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "first":
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "ignore":
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
return parse_pem(pem.decode(), cert_request.chain_option)
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def revoke_cert(self, request):
if not (request.id or request.thumbprint):
raise ClientBadData
Expand Down
57 changes: 55 additions & 2 deletions vcert/connection_tpp_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import logging as log
import re
import time
from pprint import pprint

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID
from pprint import pprint

from vcert.pem import parse_pem
from vcert.common import CertField, CommonConnection, CertificateRequest, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, \
CSR_ORIGIN_SERVICE, KeyType
CSR_ORIGIN_SERVICE, KeyType, CHAIN_OPTION_LAST, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE
from vcert.errors import VenafiError, ServerUnexptedBehavior, ClientBadData, RetrieveCertificateTimeoutError, \
CertificateRequestError, CertificateRenewError
from vcert.http import HTTPStatus
Expand Down Expand Up @@ -126,6 +129,56 @@ def request_cert(self, request, zone):
log.error("Request status is not %s. %s." % HTTPStatus.OK, status)
raise CertificateRequestError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id,
Format="base64",
IncludeChain=True)

if cert_request.csr_origin == CSR_ORIGIN_SERVICE:
retrieve_request['IncludePrivateKey'] = cert_request.include_private_key
if cert_request.key_password:
# The password is encoded when assigned (for local use, I suppose).
# decode is needed to send a raw string
retrieve_request['Password'] = cert_request.key_password.decode()

if cert_request.chain_option == CHAIN_OPTION_LAST:
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == CHAIN_OPTION_FIRST:
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == CHAIN_OPTION_IGNORE:
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
cert_response = parse_pem(pem.decode(), cert_request.chain_option)
if cert_response.key is None and cert_request.private_key is not None:
log.debug("Adding private key to response...")
cert_response.key = cert_request.private_key_pem
return cert_response
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def renew_cert(self, request, reuse_key=False):
if not request.id and not request.thumbprint:
log.debug("Request id or thumbprint must be specified for TPP")
Expand Down
45 changes: 1 addition & 44 deletions vcert/connection_tpp_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,16 @@
from __future__ import (absolute_import, division, generators, unicode_literals, print_function, nested_scopes,
with_statement)

import base64
import logging as log
import re
import time

import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import SignatureAlgorithmOID as AlgOID

from .common import MIME_JSON, TokenInfo, Authentication, KeyType, Policy, ZoneConfig, CertField
from .connection_tpp_abstract import AbstractTPPConnection, URLS
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError, CertificateRequestError,
CertificateRenewError, VenafiError, RetrieveCertificateTimeoutError)
from .errors import (ClientBadData, ServerUnexptedBehavior, AuthenticationError)
from .http import HTTPStatus
from .pem import parse_pem

HEADER_AUTHORIZATION = "Authorization" # type: str

Expand Down Expand Up @@ -129,43 +123,6 @@ def auth(self):
def import_cert(self, request):
raise NotImplementedError

def retrieve_cert(self, cert_request):
log.debug("Getting certificate status for id %s" % cert_request.id)

retrieve_request = dict(CertificateDN=cert_request.id, Format="base64", IncludeChain='true')

if cert_request.chain_option == "last":
retrieve_request['RootFirstOrder'] = 'false'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "first":
retrieve_request['RootFirstOrder'] = 'true'
retrieve_request['IncludeChain'] = 'true'
elif cert_request.chain_option == "ignore":
retrieve_request['IncludeChain'] = 'false'
else:
log.error("chain option %s is not valid" % cert_request.chain_option)
raise ClientBadData

time_start = time.time()
while True:
try:
status, data = self._post(URLS.CERTIFICATE_RETRIEVE, data=retrieve_request)
except VenafiError:
log.debug("Certificate with id %s not found." % cert_request.id)
status = 0

if status == HTTPStatus.OK:
pem64 = data['CertificateData']
pem = base64.b64decode(pem64)
return parse_pem(pem.decode(), cert_request.chain_option)
elif (time.time() - time_start) < cert_request.timeout:
log.debug("Waiting for certificate...")
time.sleep(2)
else:
raise RetrieveCertificateTimeoutError(
'Operation timed out at %d seconds while retrieving certificate with id %s'
% (cert_request.timeout, cert_request.id))

def revoke_cert(self, request):
if not (request.id or request.thumbprint):
raise ClientBadData
Expand Down

0 comments on commit 12ecdcf

Please sign in to comment.