Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threading errors #15

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,137 @@
*.pyc
# Created by https://www.gitignore.io/api/python

### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

### Python Patch ###
.venv/

### Python.VirtualEnv Stack ###
# Virtualenv
# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/
[Bb]in
[Ii]nclude
[Ll]ib
[Ll]ib64
[Ll]ocal
[Ss]cripts
pyvenv.cfg
pip-selfcheck.json


# End of https://www.gitignore.io/api/python

fabfile_config.py
97 changes: 55 additions & 42 deletions lambda_functions/copy_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from botocore.exceptions import ClientError
from Queue import Queue, Empty
import json
import traceback
import sys


# Constants
Expand Down Expand Up @@ -78,9 +80,10 @@ def collect_metadata(response):
# Classes

class KeySynchronizer(Thread):
def __init__(self, job_queue=None, source=None, destination=None, region=None):
def __init__(self, job_queue=None, err_queue=None, source=None, destination=None, region=None):
super(KeySynchronizer, self).__init__()
self.job_queue = job_queue
self.err_queue = err_queue
self.source = source
self.destination = destination
self.s3 = boto3.client('s3', region_name=region)
Expand Down Expand Up @@ -113,60 +116,65 @@ def copy_object(self, key):
)

def run(self):
while not self.job_queue.empty():
try:
key = self.job_queue.get(True, 1)
except Empty:
return

source_response = self.s3.head_object(Bucket=self.source, Key=key)
try:
destination_response = self.s3.head_object(Bucket=self.destination, Key=key)
except ClientError as e:
if int(e.response['Error']['Code']) == 404: # 404 = we need to copy this.
if 'WebsiteRedirectLocation' in source_response:
try:
while not self.job_queue.empty():
try:
key = self.job_queue.get(True, 1)
except Empty:
return

source_response = self.s3.head_object(Bucket=self.source, Key=key)
try:
destination_response = self.s3.head_object(Bucket=self.destination, Key=key)
except ClientError as e:
if int(e.response['Error']['Code']) == 404: # 404 = we need to copy this.
if 'WebsiteRedirectLocation' in source_response:
self.copy_redirect(key, source_response['WebsiteRedirectLocation'])
else:
self.copy_object(key)
continue
else: # All other return codes are unexpected.
raise e

if 'WebsiteRedirectLocation' in source_response:
if (
source_response['WebsiteRedirectLocation'] !=
destination_response.get('WebsiteRedirectLocation', None)
):
self.copy_redirect(key, source_response['WebsiteRedirectLocation'])
else:
self.copy_object(key)
continue
else: # All other return codes are unexpected.
raise e

if 'WebsiteRedirectLocation' in source_response:
if (
source_response['WebsiteRedirectLocation'] !=
destination_response.get('WebsiteRedirectLocation', None)
):
self.copy_redirect(key, source_response['WebsiteRedirectLocation'])
continue

source_etag = source_response.get('ETag', None)
destination_etag = destination_response.get('ETag', None)
if source_etag != destination_etag:
self.copy_object(key)
continue

source_metadata = collect_metadata(source_response)
destination_metadata = collect_metadata(destination_response)
if source_metadata == destination_metadata:
logger.info(
'Key: ' + key + ' from bucket: ' + self.source +
' is already current in destination bucket: ' + self.destination
)
continue
else:
self.copy_object(key)

source_etag = source_response.get('ETag', None)
destination_etag = destination_response.get('ETag', None)
if source_etag != destination_etag:
self.copy_object(key)
continue

source_metadata = collect_metadata(source_response)
destination_metadata = collect_metadata(destination_response)
if source_metadata == destination_metadata:
logger.info(
'Key: ' + key + ' from bucket: ' + self.source +
' is already current in destination bucket: ' + self.destination
)
continue
else:
self.copy_object(key)
except Exception as e:
self.err_queue.put(sys.exc_info())


# Functions

def sync_keys(source=None, destination=None, region=None, keys=None):
job_queue = Queue()
err_queue = Queue()
worker_threads = []

for i in range(THREAD_PARALLELISM):
worker_threads.append(KeySynchronizer(
job_queue=job_queue,
err_queue=err_queue,
source=source,
destination=destination,
region=region,
Expand All @@ -186,6 +194,11 @@ def sync_keys(source=None, destination=None, region=None, keys=None):
for t in worker_threads:
t.join()

if not err_queue.empty():
ex_type, ex, tb = err_queue.get()
logger.error('\n'.join(traceback.format_exception(ex_type, ex, tb)))
raise ex


def handler(event, context):
assert(isinstance(event, dict))
Expand Down
45 changes: 29 additions & 16 deletions lambda_functions/delete_orphaned_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

# Imports

import traceback
import sys
import logging
import boto3
from threading import Thread
Expand All @@ -57,40 +59,46 @@
# Classes

class ObsoleteKeyDeleter(Thread):
def __init__(self, job_queue=None, source=None, destination=None, region=None):
def __init__(self, job_queue=None, err_queue=None, source=None, destination=None, region=None):
super(ObsoleteKeyDeleter, self).__init__()
self.job_queue = job_queue
self.err_queue = err_queue
self.source = source
self.destination = destination
self.s3 = boto3.client('s3', region_name=region)

def run(self):
while not self.job_queue.empty():
try:
key = self.job_queue.get(True, 1)
except Empty:
return

try:
self.s3.head_object(Bucket=self.source, Key=key)
logger.info('Key: ' + key + ' is present in source bucket, nothing to do.')
except ClientError as e:
if int(e.response['Error']['Code']) == 404: # The key was not found.
logger.info('Key: ' + key + ' is not present in source bucket. Deleting orphaned key.')
self.s3.delete_object(Bucket=self.destination, Key=key)
else:
raise e
try:
while not self.job_queue.empty():
try:
key = self.job_queue.get(True, 1)
except Empty:
return

try:
self.s3.head_object(Bucket=self.source, Key=key)
logger.info('Key: ' + key + ' is present in source bucket, nothing to do.')
except ClientError as e:
if int(e.response['Error']['Code']) == 404: # The key was not found.
logger.info('Key: ' + key + ' is not present in source bucket. Deleting orphaned key.')
self.s3.delete_object(Bucket=self.destination, Key=key)
else:
raise e
except Exception as e:
self.err_queue.put(sys.exc_info())


# Functions

def delete_obsolete_keys(source=None, destination=None, region=None, keys=None):
job_queue = Queue()
err_queue = Queue()
worker_threads = []

for i in range(THREAD_PARALLELISM):
worker_threads.append(ObsoleteKeyDeleter(
job_queue=job_queue,
err_queue=err_queue,
source=source,
destination=destination,
region=region,
Expand All @@ -107,6 +115,11 @@ def delete_obsolete_keys(source=None, destination=None, region=None, keys=None):
for t in worker_threads:
t.join()

if not err_queue.empty():
ex_type, ex, tb = err_queue.get()
logger.error('\n'.join(traceback.format_exception(ex_type, ex, tb)))
raise ex


def handler(event, context):
assert(isinstance(event, dict))
Expand Down