Skip to content

Commit

Permalink
added kms asymmetric samples [(#1638)](GoogleCloudPlatform/python-doc…
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored and busunkim96 committed Jun 4, 2020
1 parent 9785f2b commit bddfb6c
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 0 deletions.
136 changes: 136 additions & 0 deletions packages/google-cloud-kms/samples/snippets/asymmetric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#!/bin/python
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.rom googleapiclient import discovery

import base64
import hashlib

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, utils


# [START kms_get_asymmetric_public]
def getAsymmetricPublicKey(client, key_path):
"""Retrieves the public key from a saved asymmetric key pair on Cloud KMS
"""
request = client.projects() \
.locations() \
.keyRings() \
.cryptoKeys() \
.cryptoKeyVersions() \
.getPublicKey(name=key_path)
response = request.execute()
key_txt = response['pem'].encode('ascii')
key = serialization.load_pem_public_key(key_txt, default_backend())
return key
# [END kms_get_asymmetric_public]


# [START kms_decrypt_rsa]
def decryptRSA(ciphertext, client, key_path):
"""Decrypt a given ciphertext using an RSA private key stored on Cloud KMS
"""
request = client.projects() \
.locations() \
.keyRings() \
.cryptoKeys() \
.cryptoKeyVersions() \
.asymmetricDecrypt(name=key_path,
body={'ciphertext': ciphertext})
response = request.execute()
plaintext = base64.b64decode(response['plaintext']).decode('utf-8')
return plaintext
# [END kms_decrypt_rsa]


# [START kms_encrypt_rsa]
def encryptRSA(message, client, key_path):
"""Encrypt message locally using an RSA public key retrieved from Cloud KMS
"""
public_key = getAsymmetricPublicKey(client, key_path)
pad = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None)
ciphertext = public_key.encrypt(message.encode('ascii'), pad)
ciphertext = base64.b64encode(ciphertext).decode('utf-8')
return ciphertext
# [END kms_encrypt_rsa]


# [START kms_sign_asymmetric]
def signAsymmetric(message, client, key_path):
"""Create a signature for a message using a private key stored on Cloud KMS
"""
digest_bytes = hashlib.sha256(message.encode('ascii')).digest()
digest64 = base64.b64encode(digest_bytes)

digest_JSON = {'sha256': digest64.decode('utf-8')}
request = client.projects() \
.locations() \
.keyRings() \
.cryptoKeys() \
.cryptoKeyVersions() \
.asymmetricSign(name=key_path,
body={'digest': digest_JSON})
response = request.execute()
return response.get('signature', None)
# [END kms_sign_asymmetric]


# [START kms_verify_signature_rsa]
def verifySignatureRSA(signature, message, client, key_path):
"""Verify the validity of an 'RSA_SIGN_PSS_2048_SHA256' signature
for the specified plaintext message
"""
public_key = getAsymmetricPublicKey(client, key_path)

digest_bytes = hashlib.sha256(message.encode('ascii')).digest()
sig_bytes = base64.b64decode(signature)

try:
# Attempt verification
public_key.verify(sig_bytes,
digest_bytes,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=32),
utils.Prehashed(hashes.SHA256()))
# No errors were thrown. Verification was successful
return True
except InvalidSignature:
return False
# [END kms_verify_signature_rsa]


# [START kms_verify_signature_ec]
def verifySignatureEC(signature, message, client, key_path):
"""Verify the validity of an 'EC_SIGN_P224_SHA256' signature
for the specified plaintext message
"""
public_key = getAsymmetricPublicKey(client, key_path)

digest_bytes = hashlib.sha256(message.encode('ascii')).digest()
sig_bytes = base64.b64decode(signature)

try:
# Attempt verification
public_key.verify(sig_bytes,
digest_bytes,
ec.ECDSA(utils.Prehashed(hashes.SHA256())))
# No errors were thrown. Verification was successful
return True
except InvalidSignature:
return False
# [END kms_verify_signature_ec]
141 changes: 141 additions & 0 deletions packages/google-cloud-kms/samples/snippets/asymmetric_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/bin/python
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from os import environ
from time import sleep

from cryptography.hazmat.backends.openssl.ec import _EllipticCurvePublicKey
from cryptography.hazmat.backends.openssl.rsa import _RSAPublicKey
from googleapiclient import discovery
from googleapiclient.errors import HttpError
import sample


def create_key_helper(key_id, key_path, purpose, algorithm, t):
try:
t.client.projects() \
.locations() \
.keyRings() \
.cryptoKeys() \
.create(parent='{}/keyRings/{}'.format(t.parent, t.keyring),
body={'purpose': purpose,
'versionTemplate': {
'algorithm': algorithm
}
},
cryptoKeyId=key_id) \
.execute()
return True
except HttpError:
# key already exists
return False


def setup_module(module):
"""
Set up keys in project if needed
"""
t = TestKMSSamples()
try:
# create keyring
t.client.projects() \
.locations() \
.keyRings() \
.create(parent=t.parent, body={}, keyRingId=t.keyring) \
.execute()
except HttpError:
# keyring already exists
pass
s1 = create_key_helper(t.rsaDecryptId, t.rsaDecrypt, 'ASYMMETRIC_DECRYPT',
'RSA_DECRYPT_OAEP_2048_SHA256', t)
s2 = create_key_helper(t.rsaSignId, t.rsaSign, 'ASYMMETRIC_SIGN',
'RSA_SIGN_PSS_2048_SHA256', t)
s3 = create_key_helper(t.ecSignId, t.ecSign, 'ASYMMETRIC_SIGN',
'EC_SIGN_P224_SHA256', t)
if s1 or s2 or s3:
# leave time for keys to initialize
sleep(20)


class TestKMSSamples:

project_id = environ['GCLOUD_PROJECT']
keyring = 'kms-asymmetric-samples4'
parent = 'projects/{}/locations/global'.format(project_id)

rsaSignId = 'rsa-sign'
rsaDecryptId = 'rsa-decrypt'
ecSignId = 'ec-sign'

rsaSign = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \
.format(parent, keyring, rsaSignId)
rsaDecrypt = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \
.format(parent, keyring, rsaDecryptId)
ecSign = '{}/keyRings/{}/cryptoKeys/{}/cryptoKeyVersions/1' \
.format(parent, keyring, ecSignId)

message = 'test message 123'

client = discovery.build('cloudkms', 'v1')

def test_get_public_key(self):
rsa_key = sample.getAsymmetricPublicKey(self.client, self.rsaDecrypt)
assert isinstance(rsa_key, _RSAPublicKey), 'expected RSA key'
ec_key = sample.getAsymmetricPublicKey(self.client, self.ecSign)
assert isinstance(ec_key, _EllipticCurvePublicKey), 'expected EC key'

def test_rsa_encrypt_decrypt(self):
ciphertext = sample.encryptRSA(self.message,
self.client,
self.rsaDecrypt)
# ciphertext should be 344 characters with base64 and RSA 2048
assert len(ciphertext) == 344, \
'ciphertext should be 344 chars; got {}'.format(len(ciphertext))
assert ciphertext[-2:] == '==', 'cipher text should end with =='
plaintext = sample.decryptRSA(ciphertext, self.client, self.rsaDecrypt)
assert plaintext == self.message

def test_rsa_sign_verify(self):
sig = sample.signAsymmetric(self.message, self.client, self.rsaSign)
# ciphertext should be 344 characters with base64 and RSA 2048
assert len(sig) == 344, \
'sig should be 344 chars; got {}'.format(len(sig))
assert sig[-2:] == '==', 'sig should end with =='
success = sample.verifySignatureRSA(sig,
self.message,
self.client,
self.rsaSign)
assert success is True, 'RSA verification failed'
success = sample.verifySignatureRSA(sig,
self.message+'.',
self.client,
self.rsaSign)
assert success is False, 'verify should fail with modified message'

def test_ec_sign_verify(self):
sig = sample.signAsymmetric(self.message, self.client, self.ecSign)
assert len(sig) > 50 and len(sig) < 300, \
'sig outside expected length range'
success = sample.verifySignatureEC(sig,
self.message,
self.client,
self.ecSign)
assert success is True, 'EC verification failed'
success = sample.verifySignatureEC(sig,
self.message+'.',
self.client,
self.ecSign)
assert success is False, 'verify should fail with modified message'
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
google-api-python-client==1.6.6
google-auth==1.4.1
google-auth-httplib2==0.0.3
cryptography==2.3.1

0 comments on commit bddfb6c

Please sign in to comment.