Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support gcs caching for parallel processing #113

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions bionic/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,29 @@ def __init__(self, url):
self.url = url
bucket_name, object_prefix = self._bucket_and_object_names_from_url(url)

logger.info("Initializing GCS client ...")
self._client = get_gcs_client_without_warnings()
self._bucket = self._client.get_bucket(bucket_name)
self._bucket_name = bucket_name
self._object_prefix = object_prefix
self._init_client()

def __getstate__(self):
namanjain marked this conversation as resolved.
Show resolved Hide resolved
# Copy the object's state from self.__dict__ which contains
# all our instance attributes. Always use the dict.copy()
# method to avoid modifying the original state.
state = self.__dict__.copy()
# Remove the unpicklable entries.
del state["_client"]
del state["_bucket"]
return state

def __setstate__(self, state):
# Restore instance attributes.
self.__dict__.update(state)
# Restore the client and bucket.
self._init_client()

def _init_client(self):
self._client = get_gcs_client_without_warnings()
self._bucket = self._client.get_bucket(self._bucket_name)

def blob_from_url(self, url):
object_name = self._validated_object_name_from_url(url)
Expand Down
4 changes: 0 additions & 4 deletions bionic/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@
oneline,
)

import logging

logger = logging.getLogger(__name__)

DEFAULT_PROTOCOL = protos.CombinedProtocol(
protos.ParquetDataFrameProtocol(),
protos.ImageProtocol(),
Expand Down
5 changes: 5 additions & 0 deletions bionic/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

from .optdep import import_optional_dependency, oneline

import logging

logger = logging.getLogger(__name__)


def n_present(*items):
"Returns the number of non-None arguments."
Expand Down Expand Up @@ -123,6 +127,7 @@ def get_gcs_client_without_warnings(cache_value=True):
warnings.filterwarnings(
"ignore", "Your application has authenticated using end user credentials"
)
logger.info("Initializing GCS client ...")
return gcs.Client()


Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def process_executor(request):
return None

loky = import_optional_dependency("loky", purpose="parallel processing")
return loky.get_reusable_executor()
return loky.get_reusable_executor(max_workers=1)


@pytest.fixture(scope="session")
Expand Down
1 change: 0 additions & 1 deletion tests/test_flow/test_persistence_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def gcs_builder(builder, tmp_gcs_url_prefix):
# place.
# TODO Now that we have a workspace fixture and cached client initialization,
# this may not be true anymore.
@pytest.mark.no_parallel
def test_gcs_caching(gcs_builder, make_counter):
# Setup.

Expand Down