Skip to content

Commit

Permalink
Fix #97: Concurrency Issue
Browse files Browse the repository at this point in the history
The issue was there all the time, but became critical in 9950dd0 with version
0.3.3.

`class _S3ConfigurationMap` is not thread safe because both
`set/get_configuration()` ask for `self.arguments` and `self.resources`
being `None`. However, since the configuration map is a singleton, when
using the library in a multithreaded environment, it can happen that
half the part of `_initial_setup` has run and `self.arguments` is indeed
not `None`, but `self.resources` is still `None` because in another
thread, creation of the boto3 resource is still ongoing. Then, our
thread would *not* call `_initial_setup()`, but still die on
`self.resources` being `None` when trying to access it, leading to the
error described in #97.

I could not make the bug appear without introducing the IO delay by
creating the resource in `_initial_setup()`. Maybe it is a CPython
implementation detail that makes `_initial_setup()` atomic or incredibly
rare to not be atomic without the creation of the boto resource.

Anyways, the setup is now synchronised with a lock, which is required,
again, given that `_S3ConfigurationMap` is a singleton.

Here is the code that reliably triggers the bug without the change:

```py
from multiprocessing.pool import ThreadPool

from s3path import S3Path

def do(i):
    dst = S3Path.from_uri('s3://something') / f'hello_{i}.txt'
    dst.write_bytes(b'hello')

ThreadPool(processes=20).map(do, range(100))
```
  • Loading branch information
sbrandtb committed Mar 1, 2022
1 parent 0341262 commit 21c1cef
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions s3path.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from distutils.version import StrictVersion
from io import DEFAULT_BUFFER_SIZE, UnsupportedOperation
from pathlib import _PosixFlavour, _Accessor, PurePath, Path
from threading import Lock

try:
import boto3
Expand Down Expand Up @@ -58,14 +59,20 @@ def __init__(self, default_resource_kwargs, **default_arguments):
self.default_arguments = default_arguments
self.arguments = None
self.resources = None
self.setup_lock = Lock()
self.is_setup = False

@property
def default_resource(self):
return boto3.resource('s3', **self.default_resource_kwargs)

def _initial_setup(self):
self.arguments = {PureS3Path('/'): self.default_arguments}
self.resources = {PureS3Path('/'): self.default_resource}
def _delayed_setup(self):
""" Resolves a circular dependency between us and PureS3Path """
with self.setup_lock:
if not self.is_setup:
self.arguments = {PureS3Path('/'): self.default_arguments}
self.resources = {PureS3Path('/'): self.default_resource}
self.is_setup = True

def __repr__(self):
return '{name}(arguments={arguments}, resources={resources})'.format(
Expand All @@ -74,17 +81,15 @@ def __repr__(self):
resources=self.resources)

def set_configuration(self, path, *, resource=None, arguments=None):
if self.arguments is None and self.resources is None:
self._initial_setup()
self._delayed_setup()
if arguments is not None:
self.arguments[path] = arguments
if resource is not None:
self.resources[path] = resource

@lru_cache()
def get_configuration(self, path):
if self.arguments is None and self.resources is None:
self._initial_setup()
self._delayed_setup()
resources = arguments = None
for path in chain([path], path.parents):
if resources is None and path in self.resources:
Expand Down

0 comments on commit 21c1cef

Please sign in to comment.