-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGCSSignedUrlGenerator.py
84 lines (70 loc) · 2.79 KB
/
GCSSignedUrlGenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import base64
import datetime
import md5
import sys
import time
import Crypto.Hash.SHA256 as SHA256
import Crypto.PublicKey.RSA as RSA
import Crypto.Signature.PKCS1_v1_5 as PKCS1_v1_5
import OpenSSL
import OpenSSL.crypto as SSL_Crypto
import urllib
class GCSSignedUrlGenerator(object):
def __init__(self, client_id_email, gcs_api_endpoint='https://storage.googleapis.com', key_der=None):
"""Creates a CloudStorageURLSigner that can be used to access signed URLs.
Args:
key_der: A PyCrypto private key in der format.
client_id_email: GCS service account email.
gcs_api_endpoint: Base URL for GCS API.
"""
self._key_der = key_der
self._client_id_email = client_id_email
self._gcs_api_endpoint = gcs_api_endpoint
self._expiration = None
def _base64Sign(self, plaintext):
"""Signs and returns a base64-encoded SHA256 digest."""
shahash = SHA256.new(plaintext)
priv_key = RSA.importKey(self._key_der)
signer = PKCS1_v1_5.new(priv_key)
signature_bytes = signer.sign(shahash)
return base64.b64encode(signature_bytes)
def _makeSignatureString(self, verb, path, content_md5, content_type):
"""Creates the signature string for signing according to GCS docs."""
signature_string = ('{verb}\n'
'{content_md5}\n'
'{content_type}\n'
'{expiration}\n'
'{resource}')
return signature_string.format(
verb=verb,
content_md5=content_md5,
content_type=content_type,
expiration=self._expiration_ts,
resource=path
)
def _makeUrl(self, verb, path, content_type='', content_md5=''):
"""Forms and returns the full signed URL to access GCS."""
base_url = '%s%s' % (self._gcs_api_endpoint, path)
signature_string = self._makeSignatureString(verb, path, content_md5, content_type)
signature_signed = self._base64Sign(signature_string)
query_params = {
'GoogleAccessId': self._client_id_email,
'Expires': str(self._expiration_ts),
'Signature': signature_signed
}
return base_url, query_params
def importP12Key(self, p12_key, p12_passphrase):
"""Translating default p12 formatted priv key to der format."""
p = SSL_Crypto.load_pkcs12(p12_key, p12_passphrase)
self._key_der = OpenSSL.crypto.dump_privatekey(SSL_Crypto.FILETYPE_ASN1, p.get_privatekey())
def makeSignedUrl(self, path, method='GET', expiration=None):
"""Creates expiration timestamp for signed url and return ready signed url."""
if method not in ['GET', 'PUT', 'DELETE']:
raise Exception('Error', "Available methods: ['GET', 'PUT', 'DELETE']")
if expiration:
self._expiration = expiration
if not self._expiration:
self._expiration = expiration or (datetime.datetime.now() + datetime.timedelta(days=1))
self._expiration_ts = int(time.mktime(self._expiration.timetuple()))
base_url, query_params = self._makeUrl(method, path)
return '%s?%s'%(base_url, urllib.urlencode(query_params))