Skip to content

Commit

Permalink
Merge pull request #67 from martindurant/more_fuse
Browse files Browse the repository at this point in the history
Improve FUSE
  • Loading branch information
martindurant authored Feb 26, 2018
2 parents dccd3f5 + b6210af commit a7c7901
Show file tree
Hide file tree
Showing 4 changed files with 694 additions and 70 deletions.
21 changes: 17 additions & 4 deletions gcsfs/cli/gcsfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,34 @@
help="Billing Project ID")
@click.option('--foreground/--background', default=True,
help="Run in the foreground or as a background process")
@click.option('--threads/--no-threads', default=True,
help="Whether to run with threads")
@click.option('--cache_files', type=int, default=10,
help="Number of open files to cache")
@click.option('-v', '--verbose', count=True,
help="Set logging level. '-v' for 'gcsfuse' logging."
"'-v -v' for complete debug logging.")
def main(bucket, mount_point, token, project_id, foreground, verbose):
def main(bucket, mount_point, token, project_id, foreground, threads,
cache_files, verbose):
""" Mount a Google Cloud Storage (GCS) bucket to a local directory """

if verbose == 1:
logging.basicConfig(level=logging.INFO)
logging.getLogger("gcsfs.gcsfuse").setLevel(logging.DEBUG)
if verbose > 1:
logging.basicConfig(level=logging.DEBUG)

""" Mount a Google Cloud Storage (GCS) bucket to a local directory """
fmt = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'
if verbose == 1:
logging.basicConfig(level=logging.INFO, format=fmt)
logging.getLogger("gcsfs.gcsfuse").setLevel(logging.DEBUG)
if verbose > 1:
logging.basicConfig(level=logging.DEBUG, format=fmt)

print("Mounting bucket %s to directory %s" % (bucket, mount_point))
FUSE(GCSFS(bucket, token=token, project=project_id),
mount_point, nothreads=True, foreground=foreground)
print('foreground:', foreground, ', nothreads:', not threads)
FUSE(GCSFS(bucket, token=token, project=project_id, nfiles=cache_files),
mount_point, nothreads=not threads, foreground=foreground)


if __name__ == '__main__':
Expand Down
87 changes: 39 additions & 48 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import warnings

from requests.exceptions import RequestException
from .utils import HtmlError
from .utils import is_retriable
from .utils import read_block
from .utils import HtmlError, is_retriable, read_block

PY2 = sys.version_info.major == 2

Expand Down Expand Up @@ -264,7 +262,8 @@ class GCSFileSystem(object):
default_block_size = DEFAULT_BLOCK_SIZE

def __init__(self, project=DEFAULT_PROJECT, access='full_control',
token=None, block_size=None, consistency='none', cache_timeout = None):
token=None, block_size=None, consistency='none',
cache_timeout=None):
if access not in self.scopes:
raise ValueError('access must be one of {}', self.scopes)
if project is None:
Expand All @@ -279,7 +278,6 @@ def __init__(self, project=DEFAULT_PROJECT, access='full_control',
self.session = None
self.connect(method=token)


self._singleton[0] = self

self.cache_timeout = cache_timeout
Expand Down Expand Up @@ -417,9 +415,8 @@ def _save_tokens():
except Exception as e:
warnings.warn('Saving token cache failed: ' + str(e))

@_tracemethod
def _call(self, method, path, *args, **kwargs):
logger.debug("_call(%s, %s, args=%s, kwargs=%s)", method, path, args, kwargs)

for k, v in list(kwargs.items()):
# only pass parameters that have values
if v is None:
Expand Down Expand Up @@ -453,7 +450,6 @@ def buckets(self):
"""Return list of available project buckets."""
return [b["name"] for b in self._list_buckets()["items"]]


@classmethod
def _process_object(self, bucket, object_metadata):
"""Process object resource into gcsfs object information format.
Expand Down Expand Up @@ -492,12 +488,11 @@ def _get_object(self, path):
# listing.
raise FileNotFoundError(path)

result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', bucket, key))
result = self._process_object(bucket, self._call('get', 'b/{}/o/{}',
bucket, key))

logger.debug("_get_object result: %s", result)
return result


@_tracemethod
def _maybe_get_cached_listing(self, path):
logger.debug("_maybe_get_cached_listing: %s", path)
Expand All @@ -506,7 +501,8 @@ def _maybe_get_cached_listing(self, path):
cache_age = time.time() - retrieved_time
if self.cache_timeout is not None and cache_age > self.cache_timeout:
logger.debug(
"expired cache path: %s retrieved_time: %.3f cache_age: %.3f cache_timeout: %.3f",
"expired cache path: %s retrieved_time: %.3f cache_age: "
"%.3f cache_timeout: %.3f",
path, retrieved_time, cache_age, self.cache_timeout
)
del self._listing_cache[path]
Expand All @@ -532,15 +528,16 @@ def _list_objects(self, path):

@_tracemethod
def _do_list_objects(self, path, max_results = None):
"""Return depaginated object listing for the given {bucket}/{prefix}/ path."""
"""Object listing for the given {bucket}/{prefix}/ path."""
bucket, prefix = split_path(path)
if not prefix:
prefix = None

prefixes = []
items = []
page = self._call(
'get', 'b/{}/o/', bucket, delimiter="/", prefix=prefix, maxResults=max_results)
'get', 'b/{}/o/', bucket, delimiter="/", prefix=prefix,
maxResults=max_results)

assert page["kind"] == "storage#objects"
prefixes.extend(page.get("prefixes", []))
Expand All @@ -549,29 +546,25 @@ def _do_list_objects(self, path, max_results = None):

while next_page_token is not None:
page = self._call(
'get', 'b/{}/o/', bucket, delimiter="/", prefix=prefix, maxResults=max_results,
pageToken=next_page_token)
'get', 'b/{}/o/', bucket, delimiter="/", prefix=prefix,
maxResults=max_results, pageToken=next_page_token)

assert page["kind"] == "storage#objects"
prefixes.extend(page.get("prefixes", []))
items.extend(page.get("items", []))
next_page_token = page.get('nextPageToken', None)

result = {
"kind" : "storage#objects",
"prefixes" : prefixes,
"items" : [self._process_object(bucket, i) for i in items],
"kind": "storage#objects",
"prefixes": prefixes,
"items": [self._process_object(bucket, i) for i in items],
}

logger.debug("_list_objects result: %s", {k : len(result[k]) for k in ("prefixes", "items")})

return result

@_tracemethod
def _list_buckets(self):
"""Return list of all buckets under the current project."""

logger.debug("_list_buckets")

items = []
page = self._call(
'get', 'b/', project=self.project
Expand All @@ -590,34 +583,32 @@ def _list_buckets(self):
next_page_token = page.get('nextPageToken', None)

result = {
"kind" : "storage#buckets",
"items" : items,
"kind": "storage#buckets",
"items": items,
}

logger.debug("_list_buckets result: %s", {k : len(result[k]) for k in ("items",)})

return result

@_tracemethod
def invalidate_cache(self, path=None):
"""
Invalidate listing cache for given path, so that it is reloaded on next use.
Invalidate listing cache for given path, it is reloaded on next use.
Parameters
----------
path: string or None
If None, clear all listings cached else listings at or under given path.
If None, clear all listings cached else listings at or under given
path.
"""

if not path:
logger.debug("invalidate_cache clearing cache")
self._listing_cache.clear()
else:
path = norm_path(path)
logger.debug("invalidate_cache prefix: %s", path)

invalid_keys = [k for k in self._listing_cache if k.startswith(path)]
logger.debug("invalidate_cache keys: %s", invalid_keys)
invalid_keys = [k for k in self._listing_cache
if k.startswith(path)]

for k in invalid_keys:
self._listing_cache.pop(k, None)
Expand Down Expand Up @@ -658,31 +649,29 @@ def ls(self, path, detail=False):
elif path.endswith("/"):
return self._ls(path, detail)
else:
combined_listing = self._ls(path, detail) + self._ls(path + "/", detail)
combined_listing = self._ls(path, detail) + self._ls(path + "/",
detail)
if detail:
combined_entries = dict((l["path"],l) for l in combined_listing )
combined_entries.pop(path+"/", None)
combined_entries = dict(
(l["path"], l) for l in combined_listing)
combined_entries.pop(path + "/", None)
return list(combined_entries.values())
else:
return list(set(combined_listing) - {path + "/"})

@_tracemethod
def _ls(self, path, detail=False):
listing = self._list_objects(path)
bucket, key = split_path(path)

if not detail:
result = []

# Convert item listing into list of 'item' and 'subdir/'
# entries. Items may be of form "key/", in which case there
# will be duplicate entries in prefix and item_names.
item_names = [
f["name"] for f in listing["items"] if f["name"]
]
item_names = [f["name"] for f in listing["items"] if f["name"]]
prefixes = [p for p in listing["prefixes"]]

logger.debug("path: %s item_names: %s prefixes: %s", path, item_names, prefixes)

return [
posixpath.join(bucket, n) for n in set(item_names + prefixes)
]
Expand Down Expand Up @@ -712,7 +701,6 @@ def walk(self, path, detail=False):
raise ValueError("path must include at least target bucket")

if path.endswith('/'):
results = []
listing = self.ls(path, detail=True)

files = [l for l in listing if l["storageClass"] != "DIRECTORY"]
Expand Down Expand Up @@ -799,6 +787,8 @@ def info(self, path):
bucket, key = split_path(path)
if not key:
# Return a pseudo dir for the bucket root
# TODO: check that it exists (either is in bucket list,
# or can list it)
return {
'bucket': bucket,
'name': "/",
Expand Down Expand Up @@ -933,6 +923,7 @@ def touch(self, path):
with self.open(path, 'wb'):
pass

@_tracemethod
def read_block(self, fn, offset, length, delimiter=None):
""" Read a block of bytes from a GCS file
Expand Down Expand Up @@ -1283,17 +1274,17 @@ def _fetch(self, start, end):
# First read
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.details, self.gcsfs.session, start,
self.end)
self.cache = _fetch_range(self.details, self.gcsfs.session,
self.start, self.end)
if start < self.start:
if self.end - end > self.blocksize:
self.start = start
self.end = end + self.blocksize
self.cache = _fetch_range(self.details, self.gcsfs.session,
self.start, self.end)
else:
new = _fetch_range(self.details, self.gcsfs.session, start,
self.start)
new = _fetch_range(self.details, self.gcsfs.session,
start, self.start)
self.start = start
self.cache = new + self.cache
if end > self.end:
Expand Down Expand Up @@ -1384,6 +1375,7 @@ def __exit__(self, *args):
self.close()


@_tracemethod
def _fetch_range(obj_dict, session, start=None, end=None):
""" Get data from GCS
Expand All @@ -1392,7 +1384,6 @@ def _fetch_range(obj_dict, session, start=None, end=None):
start, end : None or integers
if not both None, fetch only given range
"""
logger.debug("Fetch: %s, %i-%i", obj_dict['name'], start, end)
if start is not None or end is not None:
start = start or 0
end = end or 0
Expand Down
Loading

0 comments on commit a7c7901

Please sign in to comment.