From 08bd4c7f8e6f5e446f9304cfee2533046569be1a Mon Sep 17 00:00:00 2001 From: Eric L Frederich Date: Tue, 30 Oct 2018 20:55:15 -0400 Subject: [PATCH 1/2] Use .gitignore file from gitignore.io --- .gitignore | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index e5f3dbb..4188796 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,137 @@ -*.pyc +# Created by https://www.gitignore.io/api/python + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +### Python Patch ### +.venv/ + +### Python.VirtualEnv Stack ### +# Virtualenv +# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/ +[Bb]in +[Ii]nclude +[Ll]ib +[Ll]ib64 +[Ll]ocal +[Ss]cripts +pyvenv.cfg +pip-selfcheck.json + + +# End of https://www.gitignore.io/api/python + fabfile_config.py From 67472c81445ee87f30a8a4fce73d0eee5ce98e0a Mon Sep 17 00:00:00 2001 From: Eric L Frederich Date: Wed, 31 Oct 2018 13:20:57 -0400 Subject: [PATCH 2/2] Explicitly propagate errors from threads back to main thread. --- lambda_functions/copy_keys.py | 97 ++++++++++++++---------- lambda_functions/delete_orphaned_keys.py | 45 +++++++---- 2 files changed, 84 insertions(+), 58 deletions(-) diff --git a/lambda_functions/copy_keys.py b/lambda_functions/copy_keys.py index 000d098..c791c74 100644 --- a/lambda_functions/copy_keys.py +++ b/lambda_functions/copy_keys.py @@ -38,6 +38,8 @@ from botocore.exceptions import ClientError from Queue import Queue, Empty import json +import traceback +import sys # Constants @@ -78,9 +80,10 @@ def collect_metadata(response): # Classes class KeySynchronizer(Thread): - def __init__(self, job_queue=None, source=None, destination=None, region=None): + def __init__(self, job_queue=None, err_queue=None, source=None, destination=None, region=None): super(KeySynchronizer, self).__init__() self.job_queue = job_queue + self.err_queue = err_queue self.source = source self.destination = destination self.s3 = boto3.client('s3', region_name=region) @@ -113,60 +116,65 @@ def copy_object(self, key): ) def run(self): - while not self.job_queue.empty(): - try: - key = self.job_queue.get(True, 1) - except Empty: - return - - source_response = self.s3.head_object(Bucket=self.source, Key=key) - try: - destination_response = self.s3.head_object(Bucket=self.destination, Key=key) - except ClientError as e: - if int(e.response['Error']['Code']) == 404: # 404 = we need to copy this. - if 'WebsiteRedirectLocation' in source_response: + try: + while not self.job_queue.empty(): + try: + key = self.job_queue.get(True, 1) + except Empty: + return + + source_response = self.s3.head_object(Bucket=self.source, Key=key) + try: + destination_response = self.s3.head_object(Bucket=self.destination, Key=key) + except ClientError as e: + if int(e.response['Error']['Code']) == 404: # 404 = we need to copy this. + if 'WebsiteRedirectLocation' in source_response: + self.copy_redirect(key, source_response['WebsiteRedirectLocation']) + else: + self.copy_object(key) + continue + else: # All other return codes are unexpected. + raise e + + if 'WebsiteRedirectLocation' in source_response: + if ( + source_response['WebsiteRedirectLocation'] != + destination_response.get('WebsiteRedirectLocation', None) + ): self.copy_redirect(key, source_response['WebsiteRedirectLocation']) - else: - self.copy_object(key) continue - else: # All other return codes are unexpected. - raise e - - if 'WebsiteRedirectLocation' in source_response: - if ( - source_response['WebsiteRedirectLocation'] != - destination_response.get('WebsiteRedirectLocation', None) - ): - self.copy_redirect(key, source_response['WebsiteRedirectLocation']) - continue - - source_etag = source_response.get('ETag', None) - destination_etag = destination_response.get('ETag', None) - if source_etag != destination_etag: - self.copy_object(key) - continue - - source_metadata = collect_metadata(source_response) - destination_metadata = collect_metadata(destination_response) - if source_metadata == destination_metadata: - logger.info( - 'Key: ' + key + ' from bucket: ' + self.source + - ' is already current in destination bucket: ' + self.destination - ) - continue - else: - self.copy_object(key) + + source_etag = source_response.get('ETag', None) + destination_etag = destination_response.get('ETag', None) + if source_etag != destination_etag: + self.copy_object(key) + continue + + source_metadata = collect_metadata(source_response) + destination_metadata = collect_metadata(destination_response) + if source_metadata == destination_metadata: + logger.info( + 'Key: ' + key + ' from bucket: ' + self.source + + ' is already current in destination bucket: ' + self.destination + ) + continue + else: + self.copy_object(key) + except Exception as e: + self.err_queue.put(sys.exc_info()) # Functions def sync_keys(source=None, destination=None, region=None, keys=None): job_queue = Queue() + err_queue = Queue() worker_threads = [] for i in range(THREAD_PARALLELISM): worker_threads.append(KeySynchronizer( job_queue=job_queue, + err_queue=err_queue, source=source, destination=destination, region=region, @@ -186,6 +194,11 @@ def sync_keys(source=None, destination=None, region=None, keys=None): for t in worker_threads: t.join() + if not err_queue.empty(): + ex_type, ex, tb = err_queue.get() + logger.error('\n'.join(traceback.format_exception(ex_type, ex, tb))) + raise ex + def handler(event, context): assert(isinstance(event, dict)) diff --git a/lambda_functions/delete_orphaned_keys.py b/lambda_functions/delete_orphaned_keys.py index d261302..7a3ead8 100644 --- a/lambda_functions/delete_orphaned_keys.py +++ b/lambda_functions/delete_orphaned_keys.py @@ -32,6 +32,8 @@ # Imports +import traceback +import sys import logging import boto3 from threading import Thread @@ -57,40 +59,46 @@ # Classes class ObsoleteKeyDeleter(Thread): - def __init__(self, job_queue=None, source=None, destination=None, region=None): + def __init__(self, job_queue=None, err_queue=None, source=None, destination=None, region=None): super(ObsoleteKeyDeleter, self).__init__() self.job_queue = job_queue + self.err_queue = err_queue self.source = source self.destination = destination self.s3 = boto3.client('s3', region_name=region) def run(self): - while not self.job_queue.empty(): - try: - key = self.job_queue.get(True, 1) - except Empty: - return - - try: - self.s3.head_object(Bucket=self.source, Key=key) - logger.info('Key: ' + key + ' is present in source bucket, nothing to do.') - except ClientError as e: - if int(e.response['Error']['Code']) == 404: # The key was not found. - logger.info('Key: ' + key + ' is not present in source bucket. Deleting orphaned key.') - self.s3.delete_object(Bucket=self.destination, Key=key) - else: - raise e + try: + while not self.job_queue.empty(): + try: + key = self.job_queue.get(True, 1) + except Empty: + return + + try: + self.s3.head_object(Bucket=self.source, Key=key) + logger.info('Key: ' + key + ' is present in source bucket, nothing to do.') + except ClientError as e: + if int(e.response['Error']['Code']) == 404: # The key was not found. + logger.info('Key: ' + key + ' is not present in source bucket. Deleting orphaned key.') + self.s3.delete_object(Bucket=self.destination, Key=key) + else: + raise e + except Exception as e: + self.err_queue.put(sys.exc_info()) # Functions def delete_obsolete_keys(source=None, destination=None, region=None, keys=None): job_queue = Queue() + err_queue = Queue() worker_threads = [] for i in range(THREAD_PARALLELISM): worker_threads.append(ObsoleteKeyDeleter( job_queue=job_queue, + err_queue=err_queue, source=source, destination=destination, region=region, @@ -107,6 +115,11 @@ def delete_obsolete_keys(source=None, destination=None, region=None, keys=None): for t in worker_threads: t.join() + if not err_queue.empty(): + ex_type, ex, tb = err_queue.get() + logger.error('\n'.join(traceback.format_exception(ex_type, ex, tb))) + raise ex + def handler(event, context): assert(isinstance(event, dict))