From aa3ad6c68915b3bf7675c7c1903d0739ad7d4817 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 1 Feb 2018 15:32:54 -0500 Subject: [PATCH 01/33] Add more logging to fuse NB: these remain print statements for now --- gcsfs/gcsfuse.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index dfce460e..e9818eb2 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -53,6 +53,7 @@ def getattr(self, path, fh=None): def readdir(self, path, fh): path = ''.join([self.root, path]) + print("List", path, fh) files = self.gcs.ls(path) files = [f.rstrip('/').rsplit('/', 1)[1] for f in files] return ['.', '..'] + files @@ -71,32 +72,35 @@ def rmdir(self, path): self.gcs.rm(path, False) def read(self, path, size, offset, fh): - print('read', path, size, offset, fh) fn = ''.join([self.root, path]) - f = self.cache[fn] + print('read #{} ({}) offset: {}, size: {}'.format( + fh, fn, offset,size)) + f = self.cache[fh] f.seek(offset) out = f.read(size) return out def write(self, path, data, offset, fh): - print('write', path, offset, fh) + fn = ''.join([self.root, path]) + print('write #{} ({}) offset'.format(fh, fn, offset)) f = self.cache[fh] f.write(data) return len(data) def create(self, path, flags): - print('create', path, oct(flags)) fn = ''.join([self.root, path]) + print('create', fn, oct(flags), end=' ') self.gcs.touch(fn) # this makes sure directory entry exists - wasteful! # write (but ignore creation flags) f = self.gcs.open(fn, 'wb') self.cache[self.counter] = f + print('-> fh #', self.counter) self.counter += 1 return self.counter - 1 def open(self, path, flags): - print('open', path, oct(flags)) fn = ''.join([self.root, path]) + print('open', fn, oct(flags), end=' ') if flags % 2 == 0: # read f = self.gcs.open(fn, 'rb') @@ -104,27 +108,29 @@ def open(self, path, flags): # write (but ignore creation flags) f = self.gcs.open(fn, 'wb') self.cache[self.counter] = f + print('-> fh #', self.counter) self.counter += 1 return self.counter - 1 def truncate(self, path, length, fh=None): - print('truncate', path, length, fh) fn = ''.join([self.root, path]) + print('truncate #{} ({}) to {}'.format(fh, fn, length)) if length != 0: raise NotImplementedError # maybe should be no-op since open with write sets size to zero anyway self.gcs.touch(fn) def unlink(self, path): - print('delete', path) fn = ''.join([self.root, path]) + print('delete', fn) try: self.gcs.rm(fn, False) except (IOError, FileNotFoundError): raise FuseOSError(EIO) def release(self, path, fh): - print('close', path, fh) + fn = ''.join([self.root, path]) + print('close #{} ({})'.format(fh, fn)) try: f = self.cache[fh] f.close() From 8886e3e4705c132598f6e1d85bf9a4f2f9f712f3 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 1 Feb 2018 16:16:18 -0500 Subject: [PATCH 02/33] experimental caching for fuse --- gcsfs/gcsfuse.py | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index e9818eb2..2b180eb9 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -14,6 +14,37 @@ def str_to_time(s): return t.to_datetime64().view('int64') / 1e9 +class SmallChunkCacher: + def __init__(self, cutoff=10000, maxmem=50 * 2 ** 20): + self.cache = {} + self.cutoff = cutoff + self.maxmem = maxmem + self.mem = 0 + + def read(self, fn, offset, size, f): + if size > self.cutoff: + # big reads are likely sequential + f.seek(offset) + return f.read(size) + if fn not in self.cache: + self.cache[fn] = [] + c = self.cache[fn] + for chunk in c: + if chunk['start'] < offset and chunk['end'] > offset + size: + print('cache hit') + start = offset - chunk['start'] + return chunk['data'][start:start + size] + print('cache miss') + f.seek(offset) + out = f.read(size) + c.append({'start': f.start, 'end': f.end, 'data': f.cache}) + self.mem += len(f.cache) + return out + + def close(self, fn): + self.cache.pop(fn, None) + + class GCSFS(Operations): def __init__(self, path='.', gcs=None, **fsargs): @@ -22,6 +53,7 @@ def __init__(self, path='.', gcs=None, **fsargs): else: self.gcs = gcs self.cache = {} + self.chunk_cacher = SmallChunkCacher() self.counter = 0 self.root = path @@ -53,7 +85,7 @@ def getattr(self, path, fh=None): def readdir(self, path, fh): path = ''.join([self.root, path]) - print("List", path, fh) + print("List", path, fh, flush=True) files = self.gcs.ls(path) files = [f.rstrip('/').rsplit('/', 1)[1] for f in files] return ['.', '..'] + files @@ -76,8 +108,7 @@ def read(self, path, size, offset, fh): print('read #{} ({}) offset: {}, size: {}'.format( fh, fn, offset,size)) f = self.cache[fh] - f.seek(offset) - out = f.read(size) + out = self.chunk_cacher.read(fn, offset, size, f) return out def write(self, path, data, offset, fh): From d131f61c043fc8cf72ee624656161aaaf7949fa0 Mon Sep 17 00:00:00 2001 From: Alex Ford Date: Fri, 2 Feb 2018 02:07:17 +0000 Subject: [PATCH 03/33] Add per-method debug tracing. Add decorator-based method tracing to `gcsfuse.GCSFS` and `core.GCSFileSystem` interface methods. Add `--verbose` command-line option to `gcsfuse` to support debug logging control. --- gcsfs/cli/gcsfuse.py | 13 ++++++++++++- gcsfs/core.py | 30 ++++++++++++++++++++++++++++++ gcsfs/gcsfuse.py | 29 +++++++++++++++++++++-------- requirements.txt | 1 + 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index bdcf9dbf..0b05d07f 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -1,4 +1,5 @@ import click +import logging from fuse import FUSE from gcsfs.gcsfuse import GCSFS @@ -13,7 +14,17 @@ help="Billing Project ID") @click.option('--foreground/--background', default=True, help="Run in the foreground or as a background process") -def main(bucket, mount_point, token, project_id, foreground): +@click.option('-v', '--verbose', count=True, + help="Set logging level. '-v' for 'gcsfuse' logging." + "'-v -v' for complete debug logging.") +def main(bucket, mount_point, token, project_id, foreground, verbose): + + if verbose == 1: + logging.basicConfig(level=logging.INFO) + logging.getLogger("gcsfs.gcsfuse").setLevel(logging.DEBUG) + if verbose > 1: + logging.basicConfig(level=logging.DEBUG) + """ Mount a Google Cloud Storage (GCS) bucket to a local directory """ print("Mounting bucket %s to directory %s" % (bucket, mount_point)) FUSE(GCSFS(bucket, token=token, project=project_id), diff --git a/gcsfs/core.py b/gcsfs/core.py index 6f755e5e..d65be455 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -4,6 +4,8 @@ """ from __future__ import print_function +import decorator + import array from base64 import b64encode import google.auth as gauth @@ -33,6 +35,11 @@ logger = logging.getLogger(__name__) +@decorator.decorator +def _tracemethod(f, self, *args, **kwargs): + logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) + return f(self, *args, **kwargs) + # client created 23-Sept-2017 not_secret = {"client_id": "586241054156-0asut23a7m10790r2ik24309flribp7j" ".apps.googleusercontent.com", @@ -528,6 +535,7 @@ def _list_buckets(self): return result + @_tracemethod def invalidate_cache(self, path=None): """ Invalidate listing cache for given path, so that it is reloaded on next use. @@ -551,6 +559,7 @@ def invalidate_cache(self, path=None): for k in invalid_keys: self._listing_cache.pop(k, None) + @_tracemethod def mkdir(self, bucket, acl='projectPrivate', default_acl='bucketOwnerFullControl'): """ @@ -570,11 +579,13 @@ def mkdir(self, bucket, acl='projectPrivate', json={"name": bucket}) self.invalidate_cache(bucket) + @_tracemethod def rmdir(self, bucket): """Delete an empty bucket""" self._call('delete', 'b/' + bucket) self.invalidate_cache(bucket) + @_tracemethod def ls(self, path, detail=False): """List objects under the given '/{bucket}/{prefix} path.""" if path in ['/', '']: @@ -602,6 +613,7 @@ def ls(self, path, detail=False): else: item_details = listing["items"] + pseudodirs = [{ 'bucket': bucket, 'name': key + prefix, @@ -614,6 +626,7 @@ def ls(self, path, detail=False): return item_details + pseudodirs + @_tracemethod def walk(self, path, detail=False): """ Return all real keys belows path. """ bucket, prefix = split_path(path) @@ -645,6 +658,7 @@ def walk(self, path, detail=False): else: return [posixpath.join(f["bucket"], f['name']) for f in files] + @_tracemethod def du(self, path, total=False, deep=False): if deep: files = self.walk(path, True) @@ -654,6 +668,7 @@ def du(self, path, total=False, deep=False): return sum(f['size'] for f in files) return {f['name']: f['size'] for f in files} + @_tracemethod def glob(self, path): """ Find files by glob-matching. @@ -681,6 +696,7 @@ def glob(self, path): f.replace('//', '/').rstrip('/'))] return out + @_tracemethod def exists(self, path): bucket, key = split_path(path) try: @@ -702,6 +718,7 @@ def exists(self, path): except FileNotFoundError: return False + @_tracemethod def info(self, path): bucket, key = split_path(path) if not key: @@ -735,14 +752,17 @@ def info(self, path): else: raise + @_tracemethod def url(self, path): return self.info(path)['mediaLink'] + @_tracemethod def cat(self, path): """ Simple one-shot get of file data """ details = self.info(path) return _fetch_range(details, self.session) + @_tracemethod def get(self, rpath, lpath, blocksize=5 * 2 ** 20): with self.open(rpath, 'rb', block_size=blocksize) as f1: with open(lpath, 'wb') as f2: @@ -752,6 +772,7 @@ def get(self, rpath, lpath, blocksize=5 * 2 ** 20): break f2.write(d) + @_tracemethod def put(self, lpath, rpath, blocksize=5 * 2 ** 20, acl=None): with self.open(rpath, 'wb', block_size=blocksize, acl=acl) as f1: with open(lpath, 'rb') as f2: @@ -761,10 +782,12 @@ def put(self, lpath, rpath, blocksize=5 * 2 ** 20, acl=None): break f1.write(d) + @_tracemethod def head(self, path, size=1024): with self.open(path, 'rb') as f: return f.read(size) + @_tracemethod def tail(self, path, size=1024): if size > self.info(path)['size']: return self.cat(path) @@ -772,6 +795,7 @@ def tail(self, path, size=1024): f.seek(-size, 2) return f.read() + @_tracemethod def merge(self, path, paths, acl=None): """Concatenate objects within a single bucket""" bucket, key = split_path(path) @@ -782,16 +806,19 @@ def merge(self, path, paths, acl=None): "kind": "storage#composeRequest", 'destination': {'name': key, 'bucket': bucket}}) + @_tracemethod def copy(self, path1, path2, acl=None): b1, k1 = split_path(path1) b2, k2 = split_path(path2) self._call('post', 'b/{}/o/{}/copyTo/b/{}/o/{}', b1, k1, b2, k2, destinationPredefinedAcl=acl) + @_tracemethod def mv(self, path1, path2, acl=None): self.copy(path1, path2, acl) self.rm(path1) + @_tracemethod def rm(self, path, recursive=False): """Delete keys. If recursive, also delete all keys given by walk(path)""" @@ -803,6 +830,7 @@ def rm(self, path, recursive=False): self._call('delete', "b/{}/o/{}", bucket, key) self.invalidate_cache(posixpath.dirname(norm_path(path))) + @_tracemethod def open(self, path, mode='rb', block_size=None, acl=None, consistency=None, metadata=None): """ @@ -823,10 +851,12 @@ def open(self, path, mode='rb', block_size=None, acl=None, GCSFile(self, path, mode, block_size, consistency=const, metadata=metadata)) + @_tracemethod def touch(self, path): with self.open(path, 'wb'): pass + @_tracemethod def read_block(self, fn, offset, length, delimiter=None): """ Read a block of bytes from a GCS file diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index dfce460e..f1976407 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -1,5 +1,7 @@ from __future__ import print_function import os +import logging +import decorator import stat import pandas as pd from errno import ENOENT, EIO @@ -8,6 +10,12 @@ from pwd import getpwnam from grp import getgrnam +logger = logging.getLogger(__name__) + +@decorator.decorator +def _tracemethod(f, self, *args, **kwargs): + logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) + return f(self, *args, **kwargs) def str_to_time(s): t = pd.to_datetime(s) @@ -25,6 +33,7 @@ def __init__(self, path='.', gcs=None, **fsargs): self.counter = 0 self.root = path + @_tracemethod def getattr(self, path, fh=None): try: info = self.gcs.info(''.join([self.root, path])) @@ -51,12 +60,14 @@ def getattr(self, path, fh=None): return data + @_tracemethod def readdir(self, path, fh): path = ''.join([self.root, path]) files = self.gcs.ls(path) files = [f.rstrip('/').rsplit('/', 1)[1] for f in files] return ['.', '..'] + files + @_tracemethod def mkdir(self, path, mode): bucket, key = core.split_path(path) if not self.gcs.info(path): @@ -65,27 +76,28 @@ def mkdir(self, path, mode): 'size': 0, 'storageClass': 'DIRECTORY', 'name': path.rstrip('/') + '/'}) + @_tracemethod def rmdir(self, path): info = self.gcs.info(path) if info['storageClass': 'DIRECTORY']: self.gcs.rm(path, False) + @_tracemethod def read(self, path, size, offset, fh): - print('read', path, size, offset, fh) fn = ''.join([self.root, path]) f = self.cache[fn] f.seek(offset) out = f.read(size) return out + @_tracemethod def write(self, path, data, offset, fh): - print('write', path, offset, fh) f = self.cache[fh] f.write(data) return len(data) + @_tracemethod def create(self, path, flags): - print('create', path, oct(flags)) fn = ''.join([self.root, path]) self.gcs.touch(fn) # this makes sure directory entry exists - wasteful! # write (but ignore creation flags) @@ -94,8 +106,8 @@ def create(self, path, flags): self.counter += 1 return self.counter - 1 + @_tracemethod def open(self, path, flags): - print('open', path, oct(flags)) fn = ''.join([self.root, path]) if flags % 2 == 0: # read @@ -107,31 +119,32 @@ def open(self, path, flags): self.counter += 1 return self.counter - 1 + @_tracemethod def truncate(self, path, length, fh=None): - print('truncate', path, length, fh) fn = ''.join([self.root, path]) if length != 0: raise NotImplementedError # maybe should be no-op since open with write sets size to zero anyway self.gcs.touch(fn) + @_tracemethod def unlink(self, path): - print('delete', path) fn = ''.join([self.root, path]) try: self.gcs.rm(fn, False) except (IOError, FileNotFoundError): raise FuseOSError(EIO) + @_tracemethod def release(self, path, fh): - print('close', path, fh) try: f = self.cache[fh] f.close() self.cache.pop(fh, None) # should release any cache memory except Exception as e: - print(e) + logger.exception("exception on release") return 0 + @_tracemethod def chmod(self, path, mode): raise NotImplementedError diff --git a/requirements.txt b/requirements.txt index 946e469d..7e2e3419 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ google-auth>=1.2 google-auth-oauthlib requests +decorator From 9211b034cc56a789b8b52e8f6fc5e52125ddb22d Mon Sep 17 00:00:00 2001 From: Alex Ford Date: Fri, 2 Feb 2018 02:11:03 +0000 Subject: [PATCH 04/33] Bugfix prototype gcsfuse/per_dir_cache integration. Prototype `per_dir_cache` integration for gcsfuse. Minimal fixup to gcsfuse to support directory listing. --- gcsfs/core.py | 2 +- gcsfs/gcsfuse.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index d65be455..4f08252c 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -616,7 +616,7 @@ def ls(self, path, detail=False): pseudodirs = [{ 'bucket': bucket, - 'name': key + prefix, + 'name': prefix, 'kind': 'storage#object', 'size': 0, 'storageClass': 'DIRECTORY', diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index f1976407..eed5059a 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -64,7 +64,7 @@ def getattr(self, path, fh=None): def readdir(self, path, fh): path = ''.join([self.root, path]) files = self.gcs.ls(path) - files = [f.rstrip('/').rsplit('/', 1)[1] for f in files] + files = [os.path.basename(f.rstrip('/')) for f in files] return ['.', '..'] + files @_tracemethod From 91adc210bce1f75ba013beaf47b86629d68180ed Mon Sep 17 00:00:00 2001 From: Alex Ford Date: Fri, 2 Feb 2018 02:16:28 +0000 Subject: [PATCH 05/33] Fix GCSFS::read cache access. Fix error in GCSFS::read() cache key resolution. --- gcsfs/gcsfuse.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index eed5059a..8f1ee53f 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -85,7 +85,7 @@ def rmdir(self, path): @_tracemethod def read(self, path, size, offset, fh): fn = ''.join([self.root, path]) - f = self.cache[fn] + f = self.cache[fh] f.seek(offset) out = f.read(size) return out From 3ed7fb2c4ea10a969111174740b702bed8084e38 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sat, 3 Feb 2018 18:28:56 -0500 Subject: [PATCH 06/33] file and chunk caching --- gcsfs/cli/gcsfuse.py | 6 ++-- gcsfs/core.py | 7 ++-- gcsfs/gcsfuse.py | 77 +++++++++++++++++++++++++++----------------- 3 files changed, 54 insertions(+), 36 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index 0b05d07f..29fb781a 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -18,12 +18,12 @@ help="Set logging level. '-v' for 'gcsfuse' logging." "'-v -v' for complete debug logging.") def main(bucket, mount_point, token, project_id, foreground, verbose): - + fmt = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' if verbose == 1: - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.INFO, format=fmt) logging.getLogger("gcsfs.gcsfuse").setLevel(logging.DEBUG) if verbose > 1: - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.DEBUG, format=fmt) """ Mount a Google Cloud Storage (GCS) bucket to a local directory """ print("Mounting bucket %s to directory %s" % (bucket, mount_point)) diff --git a/gcsfs/core.py b/gcsfs/core.py index 646661a5..2f2ebee5 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -422,7 +422,7 @@ def _get_object(self, path): return cached_obj[0] else: # Should error on missing cache or reprobe? - pass + raise FileNotFoundError if not key: # Attempt to "get" the bucket root, return error instead of @@ -434,7 +434,6 @@ def _get_object(self, path): logger.debug("_get_object result: %s", result) return result - def _maybe_get_cached_listing(self, path): logger.debug("_maybe_get_cached_listing: %s", path) if path in self._listing_cache: @@ -466,7 +465,7 @@ def _list_objects(self, path): return listing def _do_list_objects(self, path, max_results = None): - """Return depaginated object listing for the given {bucket}/{prefix}/ path.""" + """Return depaginated object listing for the given path.""" logger.debug("_list_objects(%s, max_results=%s)", path, max_results) bucket, prefix = split_path(path) if prefix: @@ -724,6 +723,8 @@ def info(self, path): bucket, key = split_path(path) if not key: # Return a pseudo dir for the bucket root + # TODO: check that it exists (either is in bucket list, + # or can list it) return { 'bucket': bucket, 'name': "/", diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index ab5f3efb..545328ce 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -9,6 +9,7 @@ from gcsfs import GCSFileSystem, core from pwd import getpwnam from grp import getgrnam +import time logger = logging.getLogger(__name__) @@ -25,34 +26,48 @@ def str_to_time(s): class SmallChunkCacher: - def __init__(self, cutoff=10000, maxmem=50 * 2 ** 20): - self.cache = {} + def __init__(self, gcs, cutoff=10000, maxmem=50 * 2 ** 20, + nfiles=10): + self.gcs = gcs + self.file_cache = {} + self.block_cache = {} self.cutoff = cutoff self.maxmem = maxmem + self.nfiles = nfiles self.mem = 0 - def read(self, fn, offset, size, f): + def read(self, fn, offset, size): + f = self.file_cache[fn] if size > self.cutoff: # big reads are likely sequential f.seek(offset) return f.read(size) - if fn not in self.cache: - self.cache[fn] = [] - c = self.cache[fn] + if fn not in self.block_cache: + self.block_cache[fn] = [] + c = self.block_cache[fn] for chunk in c: if chunk['start'] < offset and chunk['end'] > offset + size: - print('cache hit') + logger.info('cache hit') start = offset - chunk['start'] return chunk['data'][start:start + size] - print('cache miss') + logger.info('cache miss') f.seek(offset) out = f.read(size) c.append({'start': f.start, 'end': f.end, 'data': f.cache}) self.mem += len(f.cache) return out + def open(self, fn): + if fn not in self.file_cache: + self.file_cache[fn] = self.gcs.open(fn, 'rb') + logger.info('{} inserted into cache'.format(fn)) + else: + logger.info('{} found in cache'.format(fn)) + return self.file_cache[fn] + def close(self, fn): - self.cache.pop(fn, None) + self.block_cache.pop(fn, None) + self.file_cache.pop(fn, None) class GCSFS(Operations): @@ -62,8 +77,8 @@ def __init__(self, path='.', gcs=None, **fsargs): self.gcs = GCSFileSystem(**fsargs) else: self.gcs = gcs - self.cache = {} - self.chunk_cacher = SmallChunkCacher() + self.cache = SmallChunkCacher(self.gcs) + self.write_cache = {} self.counter = 0 self.root = path @@ -73,6 +88,7 @@ def getattr(self, path, fh=None): info = self.gcs.info(''.join([self.root, path])) except FileNotFoundError: raise FuseOSError(ENOENT) + logger.info(str(list(self.gcs._listing_cache))) data = {'st_uid': 1000, 'st_gid': 1000} perm = 0o777 @@ -97,7 +113,7 @@ def getattr(self, path, fh=None): @_tracemethod def readdir(self, path, fh): path = ''.join([self.root, path]) - print("List", path, fh, flush=True) + logger.info("List {}, {}".format(path, fh)) files = self.gcs.ls(path) files = [os.path.basename(f.rstrip('/')) for f in files] return ['.', '..'] + files @@ -120,28 +136,27 @@ def rmdir(self, path): @_tracemethod def read(self, path, size, offset, fh): fn = ''.join([self.root, path]) - print('read #{} ({}) offset: {}, size: {}'.format( + logger.info('read #{} ({}) offset: {}, size: {}'.format( fh, fn, offset,size)) - f = self.cache[fh] - out = self.chunk_cacher.read(fn, offset, size, f) + out = self.cache.read(fn, offset, size) return out @_tracemethod def write(self, path, data, offset, fh): fn = ''.join([self.root, path]) - print('write #{} ({}) offset'.format(fh, fn, offset)) - f = self.cache[fh] + logger.info('write #{} ({}) offset'.format(fh, fn, offset)) + f = self.write_cache[fh] f.write(data) return len(data) @_tracemethod def create(self, path, flags): fn = ''.join([self.root, path]) - print('create', fn, oct(flags), end=' ') + logger.info('create {} {}'.format(fn, oct(flags))) self.gcs.touch(fn) # this makes sure directory entry exists - wasteful! # write (but ignore creation flags) f = self.gcs.open(fn, 'wb') - self.cache[self.counter] = f + self.write_cache[self.counter] = f print('-> fh #', self.counter) self.counter += 1 return self.counter - 1 @@ -149,22 +164,22 @@ def create(self, path, flags): @_tracemethod def open(self, path, flags): fn = ''.join([self.root, path]) - print('open', fn, oct(flags), end=' ') + logger.info('open {} {}'.format(fn, oct(flags))) if flags % 2 == 0: # read - f = self.gcs.open(fn, 'rb') + f = self.cache.open(fn) else: # write (but ignore creation flags) f = self.gcs.open(fn, 'wb') - self.cache[self.counter] = f - print('-> fh #', self.counter) + self.write_cache[self.counter] = f + logger.info('-> fh #{}'.format(self.counter)) self.counter += 1 return self.counter - 1 @_tracemethod def truncate(self, path, length, fh=None): fn = ''.join([self.root, path]) - print('truncate #{} ({}) to {}'.format(fh, fn, length)) + logger.info('truncate #{} ({}) to {}'.format(fh, fn, length)) if length != 0: raise NotImplementedError # maybe should be no-op since open with write sets size to zero anyway @@ -173,7 +188,7 @@ def truncate(self, path, length, fh=None): @_tracemethod def unlink(self, path): fn = ''.join([self.root, path]) - print('delete', fn) + logger.info('delete', fn) try: self.gcs.rm(fn, False) except (IOError, FileNotFoundError): @@ -182,13 +197,15 @@ def unlink(self, path): @_tracemethod def release(self, path, fh): fn = ''.join([self.root, path]) - print('close #{} ({})'.format(fh, fn)) + logger.info('close #{} ({})'.format(fh, fn)) try: - f = self.cache[fh] - f.close() - self.cache.pop(fh, None) # should release any cache memory + if fh in self.write_cache: + # write mode + f = self.write_cache[fh] + f.close() + self.write_cache.pop(fh, None) except Exception as e: - logger.exception("exception on release") + logger.exception("exception on release:" + str(e)) return 0 @_tracemethod From d8a8055253428dbe3c6f040532897a80a8d253d7 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sat, 3 Feb 2018 18:36:03 -0500 Subject: [PATCH 07/33] remove some prints --- gcsfs/core.py | 5 ----- gcsfs/gcsfuse.py | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 2f2ebee5..37821554 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -56,7 +56,6 @@ def _tracemethod(f, self, *args, **kwargs): bACLs = {"authenticatedRead", "private", "projectPrivate", "publicRead", "publicReadWrite"} DEFAULT_PROJECT = os.environ.get('GCSFS_DEFAULT_PROJECT', '') -DEBUG = False if PY2: FileNotFoundError = IOError @@ -140,8 +139,6 @@ def validate_response(r, path): except: msg = str(r.content) - if DEBUG: - print(r.url, r.headers, sep='\n') if "Not Found" in m: raise FileNotFoundError(path) elif "forbidden" in m: @@ -1284,8 +1281,6 @@ def _fetch_range(obj_dict, session, start=None, end=None): start, end : None or integers if not both None, fetch only given range """ - if DEBUG: - print('Fetch: ', start, end) logger.debug("Fetch: {}, {}-{}", obj_dict['name'], start, end) if start is not None or end is not None: start = start or 0 diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 545328ce..6160ddd9 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -88,7 +88,6 @@ def getattr(self, path, fh=None): info = self.gcs.info(''.join([self.root, path])) except FileNotFoundError: raise FuseOSError(ENOENT) - logger.info(str(list(self.gcs._listing_cache))) data = {'st_uid': 1000, 'st_gid': 1000} perm = 0o777 @@ -157,7 +156,7 @@ def create(self, path, flags): # write (but ignore creation flags) f = self.gcs.open(fn, 'wb') self.write_cache[self.counter] = f - print('-> fh #', self.counter) + logger.info('-> fh #{}'.format(self.counter)) self.counter += 1 return self.counter - 1 From ff0a88c381ec549eee118148f390ae494b8170b5 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sun, 4 Feb 2018 14:08:22 -0500 Subject: [PATCH 08/33] Simple LRU file caching --- gcsfs/gcsfuse.py | 70 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 6160ddd9..29a456bf 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -1,4 +1,5 @@ from __future__ import print_function +from collections import OrderedDict, MutableMapping import os import logging import decorator @@ -25,27 +26,57 @@ def str_to_time(s): return t.to_datetime64().view('int64') / 1e9 +class LRUDict(MutableMapping): + """A dict that discards least-recently-used items""" + + def __init__(self, *args, size=128, **kwargs): + """Same arguments as OrderedDict with one additions: + + size: maximum number of entries + """ + self.data = OrderedDict(*args, **kwargs) + self.size = size + self.purge() + + def purge(self): + """Removes expired or overflowing entries.""" + # pop until maximum capacity is reached + extra = max(0, len(self.data) - self.size) + for _ in range(extra): + self.data.popitem(last=False) + + def __getitem__(self, key): + self.data.move_to_end(key) + return self.data[key] + + def __setitem__(self, key, value): + self.data[key] = value + self.purge() + + def __delitem__(self, key): + del self.data[key] + + def __iter__(self): + return iter(list(self.data)) + + def __len__(self): + return len(self.data) + + class SmallChunkCacher: - def __init__(self, gcs, cutoff=10000, maxmem=50 * 2 ** 20, - nfiles=10): + def __init__(self, gcs, cutoff=10000, nfiles=3): self.gcs = gcs - self.file_cache = {} - self.block_cache = {} + self.cache = LRUDict(size=nfiles) self.cutoff = cutoff - self.maxmem = maxmem self.nfiles = nfiles - self.mem = 0 def read(self, fn, offset, size): - f = self.file_cache[fn] + f, chunks = self.cache[fn] if size > self.cutoff: # big reads are likely sequential f.seek(offset) return f.read(size) - if fn not in self.block_cache: - self.block_cache[fn] = [] - c = self.block_cache[fn] - for chunk in c: + for chunk in chunks: if chunk['start'] < offset and chunk['end'] > offset + size: logger.info('cache hit') start = offset - chunk['start'] @@ -53,21 +84,20 @@ def read(self, fn, offset, size): logger.info('cache miss') f.seek(offset) out = f.read(size) - c.append({'start': f.start, 'end': f.end, 'data': f.cache}) - self.mem += len(f.cache) + chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + sizes = {f: "%.1fMB" % (sum( + [c['end'] - c['start'] + 1 for c in ch[1]])/2**20) + for f, ch in self.cache.items()} + logger.info('Cache Report: {}'.format(sizes)) return out def open(self, fn): - if fn not in self.file_cache: - self.file_cache[fn] = self.gcs.open(fn, 'rb') + if fn not in self.cache: + self.cache[fn] = self.gcs.open(fn, 'rb'), [] logger.info('{} inserted into cache'.format(fn)) else: logger.info('{} found in cache'.format(fn)) - return self.file_cache[fn] - - def close(self, fn): - self.block_cache.pop(fn, None) - self.file_cache.pop(fn, None) + return self.cache[fn][0] class GCSFS(Operations): From 3de01dd7b9b80df8119f9d7684b565af2b516988 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 5 Feb 2018 15:33:44 -0500 Subject: [PATCH 09/33] Add docstrings Also, reduce default file block size (reduces read times) --- gcsfs/core.py | 9 ++++----- gcsfs/gcsfuse.py | 26 +++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 37821554..b13e75b0 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -27,9 +27,7 @@ import warnings from requests.exceptions import RequestException -from .utils import HtmlError -from .utils import is_retriable -from .utils import read_block +from .utils import HtmlError, is_retriable, read_block PY2 = sys.version_info.major == 2 @@ -40,7 +38,7 @@ def _tracemethod(f, self, *args, **kwargs): logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) return f(self, *args, **kwargs) -# client created 23-Sept-2017 +# client created 16-Jan-2018 not_secret = {"client_id": "586241054156-0asut23a7m10790r2ik24309flribp7j" ".apps.googleusercontent.com", "client_secret": "w6VkI99jS6e9mECscNztXvQv"} @@ -209,7 +207,8 @@ class GCSFileSystem(object): default_block_size = 5 * 2**20 def __init__(self, project=DEFAULT_PROJECT, access='full_control', - token=None, block_size=None, consistency='none', cache_timeout = 60 ): + token=None, block_size=None, consistency='none', + cache_timeout=60): if access not in self.scopes: raise ValueError('access must be one of {}', self.scopes) if project is None: diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 29a456bf..f2d4bd84 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -64,6 +64,19 @@ def __len__(self): class SmallChunkCacher: + """ + Cache open GCSFiles, and data chunks from small reads + + Parameters + ---------- + gcs : instance of GCSFileSystem + cutoff : int + Will store/fetch data from cache for calls to read() with values smaller + than this. + nfile : int + Number of files to store in LRU cache. + """ + def __init__(self, gcs, cutoff=10000, nfiles=3): self.gcs = gcs self.cache = LRUDict(size=nfiles) @@ -71,6 +84,12 @@ def __init__(self, gcs, cutoff=10000, nfiles=3): self.nfiles = nfiles def read(self, fn, offset, size): + """Reach block from file + + If size is less than cutoff, see if the relevant data is in the cache; + either return data from there, or call read() on underlying file object + and store the resultant block in the cache. + """ f, chunks = self.cache[fn] if size > self.cutoff: # big reads are likely sequential @@ -92,6 +111,10 @@ def read(self, fn, offset, size): return out def open(self, fn): + """Create cache entry, or return existing open file + + May result in the eviction of LRU file object and its data blocks. + """ if fn not in self.cache: self.cache[fn] = self.gcs.open(fn, 'rb'), [] logger.info('{} inserted into cache'.format(fn)) @@ -104,7 +127,8 @@ class GCSFS(Operations): def __init__(self, path='.', gcs=None, **fsargs): if gcs is None: - self.gcs = GCSFileSystem(**fsargs) + # minimum block size: still read on 5MB boundaries. + self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, **fsargs) else: self.gcs = gcs self.cache = SmallChunkCacher(self.gcs) From 0df77627e2af8e1a7b0d83671d8cc0928270b68d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:05:15 -0500 Subject: [PATCH 10/33] Make number of cached files configurable --- gcsfs/cli/gcsfuse.py | 13 ++++++++++--- gcsfs/core.py | 39 +++++++++++++++++++++++---------------- gcsfs/gcsfuse.py | 11 +++++++---- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index 29fb781a..c98dd996 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -14,10 +14,16 @@ help="Billing Project ID") @click.option('--foreground/--background', default=True, help="Run in the foreground or as a background process") +@click.option('--threads/--no-threads', default=True, + help="Run in the foreground or as a background process") +@click.option('-cache-files', default=10, + help="Number of open files to cache") @click.option('-v', '--verbose', count=True, help="Set logging level. '-v' for 'gcsfuse' logging." "'-v -v' for complete debug logging.") -def main(bucket, mount_point, token, project_id, foreground, verbose): +def main(bucket, mount_point, token, project_id, foreground, threads, + nfiles, verbose): + """ Mount a Google Cloud Storage (GCS) bucket to a local directory """ fmt = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' if verbose == 1: logging.basicConfig(level=logging.INFO, format=fmt) @@ -25,10 +31,11 @@ def main(bucket, mount_point, token, project_id, foreground, verbose): if verbose > 1: logging.basicConfig(level=logging.DEBUG, format=fmt) - """ Mount a Google Cloud Storage (GCS) bucket to a local directory """ print("Mounting bucket %s to directory %s" % (bucket, mount_point)) + print('foreground:', foreground, ', nothreads:', not threads) FUSE(GCSFS(bucket, token=token, project=project_id), - mount_point, nothreads=True, foreground=foreground) + mount_point, nothreads=not threads, foreground=foreground, + nfiles=nfiles) if __name__ == '__main__': diff --git a/gcsfs/core.py b/gcsfs/core.py index b13e75b0..7614ef78 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1173,38 +1173,35 @@ def _simple_upload(self): assert b64encode(self.md5.digest()) == md5.encode(), "MD5 checksum failed" def _fetch(self, start, end): - # force read to 5MB boundaries - start = start // (5 * 2**20) * 5 * 2**20 - end = (end // (5 * 2 ** 20) + 1) * 5 * 2 ** 20 if self.start is None and self.end is None: # First read - self.start = start - self.end = end + self.blocksize - self.cache = _fetch_range(self.details, self.gcsfs.session, start, - self.end) + self.start = _round_down(start) + self.end = _round_up(end + self.blocksize) + self.cache = _fetch_range(self.details, self.gcsfs.session, + self.start, self.end) if start < self.start: if self.end - end > self.blocksize: - self.start = start - self.end = end + self.blocksize + self.start = _round_down(start) + self.end = _round_up(end + self.blocksize) self.cache = _fetch_range(self.details, self.gcsfs.session, self.start, self.end) else: - new = _fetch_range(self.details, self.gcsfs.session, start, - self.start) - self.start = start + new = _fetch_range(self.details, self.gcsfs.session, + _round_down(start), self.start) + self.start = _round_down(start) self.cache = new + self.cache if end > self.end: if self.end > self.size: return if end - self.end > self.blocksize: - self.start = start - self.end = end + self.blocksize + self.start = _round_down(start) + self.end = _round_up(end + self.blocksize) self.cache = _fetch_range(self.details, self.gcsfs.session, self.start, self.end) else: new = _fetch_range(self.details, self.gcsfs.session, self.end, - end + self.blocksize) - self.end = end + self.blocksize + _round_up(end + self.blocksize)) + self.end = _round_up(end + self.blocksize) self.cache = self.cache + new def read(self, length=-1): @@ -1272,6 +1269,16 @@ def __exit__(self, *args): self.close() +def _round_down(x): + """Next lowest 5MB boundary""" + return x // (5 * 2**20) * 5 * 2**20 + + +def _round_up(x): + """Next highest 5MB boundary""" + return (x // (5 * 2 ** 20) + 1) * 5 * 2 ** 20 + + def _fetch_range(obj_dict, session, start=None, end=None): """ Get data from GCS diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index f2d4bd84..db4acfe4 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -51,6 +51,7 @@ def __getitem__(self, key): def __setitem__(self, key, value): self.data[key] = value + self.data.move_to_end(key) self.purge() def __delitem__(self, key): @@ -90,6 +91,8 @@ def read(self, fn, offset, size): either return data from there, or call read() on underlying file object and store the resultant block in the cache. """ + if fn not in self.cache: + self.open(fn) f, chunks = self.cache[fn] if size > self.cutoff: # big reads are likely sequential @@ -125,13 +128,13 @@ def open(self, fn): class GCSFS(Operations): - def __init__(self, path='.', gcs=None, **fsargs): + def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): if gcs is None: # minimum block size: still read on 5MB boundaries. self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, **fsargs) else: self.gcs = gcs - self.cache = SmallChunkCacher(self.gcs) + self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles) self.write_cache = {} self.counter = 0 self.root = path @@ -220,10 +223,10 @@ def open(self, path, flags): logger.info('open {} {}'.format(fn, oct(flags))) if flags % 2 == 0: # read - f = self.cache.open(fn) + self.cache.open(fn) else: # write (but ignore creation flags) - f = self.gcs.open(fn, 'wb') + self.gcs.open(fn, 'wb') self.write_cache[self.counter] = f logger.info('-> fh #{}'.format(self.counter)) self.counter += 1 From ee3815fdc90614fe8622e0889154bb3448c01a87 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:09:08 -0500 Subject: [PATCH 11/33] Better CLI parameter --- gcsfs/cli/gcsfuse.py | 2 +- gcsfs/gcsfuse.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index c98dd996..428b60dc 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -16,7 +16,7 @@ help="Run in the foreground or as a background process") @click.option('--threads/--no-threads', default=True, help="Run in the foreground or as a background process") -@click.option('-cache-files', default=10, +@click.option('--cache-files', type=int, default=10, help="Number of open files to cache") @click.option('-v', '--verbose', count=True, help="Set logging level. '-v' for 'gcsfuse' logging." diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index db4acfe4..2884bd0c 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -46,6 +46,8 @@ def purge(self): self.data.popitem(last=False) def __getitem__(self, key): + if key not in self.data: + raise KeyError(key) self.data.move_to_end(key) return self.data[key] From 43656365b3d2720f2ef8c4a04e132cdeb5108683 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:11:05 -0500 Subject: [PATCH 12/33] arg loc --- gcsfs/cli/gcsfuse.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index 428b60dc..c61ebc64 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -33,9 +33,8 @@ def main(bucket, mount_point, token, project_id, foreground, threads, print("Mounting bucket %s to directory %s" % (bucket, mount_point)) print('foreground:', foreground, ', nothreads:', not threads) - FUSE(GCSFS(bucket, token=token, project=project_id), - mount_point, nothreads=not threads, foreground=foreground, - nfiles=nfiles) + FUSE(GCSFS(bucket, token=token, project=project_id, nfiles=nfiles), + mount_point, nothreads=not threads, foreground=foreground) if __name__ == '__main__': From ec770a4d44380c1d86b3bf611a0edcc0524c2d0c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:12:22 -0500 Subject: [PATCH 13/33] try again --- gcsfs/cli/gcsfuse.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index c61ebc64..46e7cad1 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -16,13 +16,13 @@ help="Run in the foreground or as a background process") @click.option('--threads/--no-threads', default=True, help="Run in the foreground or as a background process") -@click.option('--cache-files', type=int, default=10, +@click.option('--cache_files', type=int, default=10, help="Number of open files to cache") @click.option('-v', '--verbose', count=True, help="Set logging level. '-v' for 'gcsfuse' logging." "'-v -v' for complete debug logging.") def main(bucket, mount_point, token, project_id, foreground, threads, - nfiles, verbose): + cache_files, verbose): """ Mount a Google Cloud Storage (GCS) bucket to a local directory """ fmt = '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' if verbose == 1: @@ -33,7 +33,7 @@ def main(bucket, mount_point, token, project_id, foreground, threads, print("Mounting bucket %s to directory %s" % (bucket, mount_point)) print('foreground:', foreground, ', nothreads:', not threads) - FUSE(GCSFS(bucket, token=token, project=project_id, nfiles=nfiles), + FUSE(GCSFS(bucket, token=token, project=project_id, nfiles=cache_files), mount_point, nothreads=not threads, foreground=foreground) From 151dba49663c371bfb263216915ce0acd5168f85 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:44:45 -0500 Subject: [PATCH 14/33] rational --- gcsfs/gcsfuse.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 2884bd0c..388b65be 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -108,10 +108,22 @@ def read(self, fn, offset, size): logger.info('cache miss') f.seek(offset) out = f.read(size) - chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) - sizes = {f: "%.1fMB" % (sum( - [c['end'] - c['start'] + 1 for c in ch[1]])/2**20) - for f, ch in self.cache.items()} + new = True + for chunk in chunks: + if chunk['end'] == f.start - 1: + chunk['end'] = f.end + chunk['data'] += f.cache + new = False + elif chunk['start'] == f.end + 1: + chunk['start'] = f.start + chunk['data'] = f.cache + chunk['data'] + new = False + if new: + chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + + sizes = sum([sum( + [c['end'] - c['start'] + 1 for c in ch[1]])/2**20 + for f, ch in self.cache.items()]) logger.info('Cache Report: {}'.format(sizes)) return out From 60a92c405d1369439bd32008580881eba045fd49 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:51:45 -0500 Subject: [PATCH 15/33] add locks --- gcsfs/gcsfuse.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 388b65be..17562186 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -11,6 +11,7 @@ from pwd import getpwnam from grp import getgrnam import time +from threading import Lock logger = logging.getLogger(__name__) @@ -98,14 +99,18 @@ def read(self, fn, offset, size): f, chunks = self.cache[fn] if size > self.cutoff: # big reads are likely sequential + f.lock.acquire() f.seek(offset) - return f.read(size) + out = f.read(size) + f.lock.release() + return out for chunk in chunks: if chunk['start'] < offset and chunk['end'] > offset + size: logger.info('cache hit') start = offset - chunk['start'] return chunk['data'][start:start + size] logger.info('cache miss') + f.lock.acquire() f.seek(offset) out = f.read(size) new = True @@ -120,6 +125,7 @@ def read(self, fn, offset, size): new = False if new: chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + f.lock.release() sizes = sum([sum( [c['end'] - c['start'] + 1 for c in ch[1]])/2**20 @@ -134,6 +140,7 @@ def open(self, fn): """ if fn not in self.cache: self.cache[fn] = self.gcs.open(fn, 'rb'), [] + self.cache[fn].lock = Lock() logger.info('{} inserted into cache'.format(fn)) else: logger.info('{} found in cache'.format(fn)) From 4affd48e6b50f8d8004b8fabd2e675712f5f756e Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 12:52:52 -0500 Subject: [PATCH 16/33] oops --- gcsfs/gcsfuse.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 17562186..81e94a05 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -139,8 +139,9 @@ def open(self, fn): May result in the eviction of LRU file object and its data blocks. """ if fn not in self.cache: - self.cache[fn] = self.gcs.open(fn, 'rb'), [] - self.cache[fn].lock = Lock() + f = self.gcs.open(fn, 'rb') + self.cache[fn] = f, [] + f.lock = Lock() logger.info('{} inserted into cache'.format(fn)) else: logger.info('{} found in cache'.format(fn)) From bb3a1b3d64129620451a0d1b3dcd21fe0ff9c22d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 14:09:16 -0500 Subject: [PATCH 17/33] remove rounding --- gcsfs/core.py | 30 ++++++++++-------------------- gcsfs/gcsfuse.py | 45 +++++++++++++++++++-------------------------- 2 files changed, 29 insertions(+), 46 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 7614ef78..3917721d 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -1175,33 +1175,33 @@ def _simple_upload(self): def _fetch(self, start, end): if self.start is None and self.end is None: # First read - self.start = _round_down(start) - self.end = _round_up(end + self.blocksize) + self.start = start + self.end = end + self.blocksize self.cache = _fetch_range(self.details, self.gcsfs.session, self.start, self.end) if start < self.start: if self.end - end > self.blocksize: - self.start = _round_down(start) - self.end = _round_up(end + self.blocksize) + self.start = start + self.end = end + self.blocksize self.cache = _fetch_range(self.details, self.gcsfs.session, self.start, self.end) else: new = _fetch_range(self.details, self.gcsfs.session, - _round_down(start), self.start) - self.start = _round_down(start) + start, self.start) + self.start = start self.cache = new + self.cache if end > self.end: if self.end > self.size: return if end - self.end > self.blocksize: - self.start = _round_down(start) - self.end = _round_up(end + self.blocksize) + self.start = start + self.end = end + self.blocksize self.cache = _fetch_range(self.details, self.gcsfs.session, self.start, self.end) else: new = _fetch_range(self.details, self.gcsfs.session, self.end, - _round_up(end + self.blocksize)) - self.end = _round_up(end + self.blocksize) + end + self.blocksize) + self.end = end + self.blocksize self.cache = self.cache + new def read(self, length=-1): @@ -1269,16 +1269,6 @@ def __exit__(self, *args): self.close() -def _round_down(x): - """Next lowest 5MB boundary""" - return x // (5 * 2**20) * 5 * 2**20 - - -def _round_up(x): - """Next highest 5MB boundary""" - return (x // (5 * 2 ** 20) + 1) * 5 * 2 ** 20 - - def _fetch_range(obj_dict, session, start=None, end=None): """ Get data from GCS diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 81e94a05..eefe5721 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -99,38 +99,31 @@ def read(self, fn, offset, size): f, chunks = self.cache[fn] if size > self.cutoff: # big reads are likely sequential - f.lock.acquire() - f.seek(offset) - out = f.read(size) - f.lock.release() - return out + with f.lock: + f.seek(offset) + return f.read(size) for chunk in chunks: if chunk['start'] < offset and chunk['end'] > offset + size: logger.info('cache hit') start = offset - chunk['start'] return chunk['data'][start:start + size] logger.info('cache miss') - f.lock.acquire() - f.seek(offset) - out = f.read(size) - new = True - for chunk in chunks: - if chunk['end'] == f.start - 1: - chunk['end'] = f.end - chunk['data'] += f.cache - new = False - elif chunk['start'] == f.end + 1: - chunk['start'] = f.start - chunk['data'] = f.cache + chunk['data'] - new = False - if new: - chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) - f.lock.release() - - sizes = sum([sum( - [c['end'] - c['start'] + 1 for c in ch[1]])/2**20 - for f, ch in self.cache.items()]) - logger.info('Cache Report: {}'.format(sizes)) + with f.lock: + f.seek(offset) + out = f.read(size) + new = True + for chunk in chunks: + if chunk['end'] == f.start - 1: + chunk['end'] = f.end + chunk['data'] += f.cache + new = False + elif chunk['start'] == f.end + 1: + chunk['start'] = f.start + chunk['data'] = f.cache + chunk['data'] + new = False + if new: + chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + return out def open(self, fn): From e64077108680f1ccf8ee21eec7b9a2005093f277 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 14:21:36 -0500 Subject: [PATCH 18/33] profile --- gcsfs/gcsfuse.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index eefe5721..447253fe 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -13,6 +13,9 @@ import time from threading import Lock +import cProfile +import atexit + logger = logging.getLogger(__name__) @@ -153,9 +156,15 @@ def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): self.write_cache = {} self.counter = 0 self.root = path + prof = cProfile.Profile() + self.prof = prof + def dump(): + prof.dump_stats('/home/ubuntu/notebooks/out.prof') + atexit.register(dump) @_tracemethod def getattr(self, path, fh=None): + self.prof.enable(()) try: info = self.gcs.info(''.join([self.root, path])) except FileNotFoundError: @@ -178,15 +187,17 @@ def getattr(self, path, fh=None): data['st_size'] = info['size'] data['st_blksize'] = 5 * 2**20 data['st_nlink'] = 1 - + self.prof.disable()() return data @_tracemethod def readdir(self, path, fh): + self.prof.enable(()) path = ''.join([self.root, path]) logger.info("List {}, {}".format(path, fh)) files = self.gcs.ls(path) files = [os.path.basename(f.rstrip('/')) for f in files] + self.prof.disable()() return ['.', '..'] + files @_tracemethod @@ -206,10 +217,12 @@ def rmdir(self, path): @_tracemethod def read(self, path, size, offset, fh): + self.prof.enable()() fn = ''.join([self.root, path]) logger.info('read #{} ({}) offset: {}, size: {}'.format( fh, fn, offset,size)) out = self.cache.read(fn, offset, size) + self.prof.disable()() return out @_tracemethod From a02f3ba44e00e44b5685fc613baf823e116b3a7b Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 14:22:40 -0500 Subject: [PATCH 19/33] again --- gcsfs/gcsfuse.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 447253fe..b251c061 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -158,13 +158,15 @@ def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): self.root = path prof = cProfile.Profile() self.prof = prof + def dump(): prof.dump_stats('/home/ubuntu/notebooks/out.prof') + atexit.register(dump) @_tracemethod def getattr(self, path, fh=None): - self.prof.enable(()) + self.prof.enable() try: info = self.gcs.info(''.join([self.root, path])) except FileNotFoundError: From b15192be4a1e613b0c9e6e165305825fdeb5b6ed Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 14:23:46 -0500 Subject: [PATCH 20/33] again --- gcsfs/gcsfuse.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index b251c061..2f9ab34f 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -189,17 +189,17 @@ def getattr(self, path, fh=None): data['st_size'] = info['size'] data['st_blksize'] = 5 * 2**20 data['st_nlink'] = 1 - self.prof.disable()() + self.prof.disable() return data @_tracemethod def readdir(self, path, fh): - self.prof.enable(()) + self.prof.enable() path = ''.join([self.root, path]) logger.info("List {}, {}".format(path, fh)) files = self.gcs.ls(path) files = [os.path.basename(f.rstrip('/')) for f in files] - self.prof.disable()() + self.prof.disable() return ['.', '..'] + files @_tracemethod @@ -219,12 +219,12 @@ def rmdir(self, path): @_tracemethod def read(self, path, size, offset, fh): - self.prof.enable()() + self.prof.enable() fn = ''.join([self.root, path]) logger.info('read #{} ({}) offset: {}, size: {}'.format( fh, fn, offset,size)) out = self.cache.read(fn, offset, size) - self.prof.disable()() + self.prof.disable() return out @_tracemethod From 27e1155708fbe21c839fcdd96b97d10f62b09919 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 14:56:17 -0500 Subject: [PATCH 21/33] better prof --- gcsfs/core.py | 1 - gcsfs/gcsfuse.py | 36 +++++++++++++++++------------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 3917721d..dab320e6 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -397,7 +397,6 @@ def buckets(self): """Return list of available project buckets.""" return [b["name"] for b in self._list_buckets()["items"]] - @classmethod def _process_object(self, bucket, object_metadata): object_metadata["size"] = int(object_metadata.get("size", 0)) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 2f9ab34f..a2a63998 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -16,13 +16,24 @@ import cProfile import atexit +prof = cProfile.Profile() logger = logging.getLogger(__name__) +def dump(): + prof.dump_stats('/home/ubuntu/out.prof') + + +atexit.register(dump) + + @decorator.decorator def _tracemethod(f, self, *args, **kwargs): logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) - return f(self, *args, **kwargs) + prof.enable() + out = f(self, *args, **kwargs) + prof.disable() + return out def str_to_time(s): @@ -100,16 +111,16 @@ def read(self, fn, offset, size): if fn not in self.cache: self.open(fn) f, chunks = self.cache[fn] - if size > self.cutoff: - # big reads are likely sequential - with f.lock: - f.seek(offset) - return f.read(size) for chunk in chunks: if chunk['start'] < offset and chunk['end'] > offset + size: logger.info('cache hit') start = offset - chunk['start'] return chunk['data'][start:start + size] + if size > self.cutoff: + # big reads are likely sequential + with f.lock: + f.seek(offset) + return f.read(size) logger.info('cache miss') with f.lock: f.seek(offset) @@ -156,17 +167,9 @@ def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): self.write_cache = {} self.counter = 0 self.root = path - prof = cProfile.Profile() - self.prof = prof - - def dump(): - prof.dump_stats('/home/ubuntu/notebooks/out.prof') - - atexit.register(dump) @_tracemethod def getattr(self, path, fh=None): - self.prof.enable() try: info = self.gcs.info(''.join([self.root, path])) except FileNotFoundError: @@ -189,17 +192,14 @@ def getattr(self, path, fh=None): data['st_size'] = info['size'] data['st_blksize'] = 5 * 2**20 data['st_nlink'] = 1 - self.prof.disable() return data @_tracemethod def readdir(self, path, fh): - self.prof.enable() path = ''.join([self.root, path]) logger.info("List {}, {}".format(path, fh)) files = self.gcs.ls(path) files = [os.path.basename(f.rstrip('/')) for f in files] - self.prof.disable() return ['.', '..'] + files @_tracemethod @@ -219,12 +219,10 @@ def rmdir(self, path): @_tracemethod def read(self, path, size, offset, fh): - self.prof.enable() fn = ''.join([self.root, path]) logger.info('read #{} ({}) offset: {}, size: {}'.format( fh, fn, offset,size)) out = self.cache.read(fn, offset, size) - self.prof.disable() return out @_tracemethod From 11b2c8aedb884457a0f9b22efa71210e5f16b518 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 15:16:04 -0500 Subject: [PATCH 22/33] again --- gcsfs/gcsfuse.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index a2a63998..ee55dca5 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -17,10 +17,12 @@ import atexit prof = cProfile.Profile() +prof.enable() logger = logging.getLogger(__name__) def dump(): + prof.disable() prof.dump_stats('/home/ubuntu/out.prof') @@ -30,9 +32,7 @@ def dump(): @decorator.decorator def _tracemethod(f, self, *args, **kwargs): logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) - prof.enable() out = f(self, *args, **kwargs) - prof.disable() return out From d3fbc0d25678ce98f034a01196bf76e3b3f27d67 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 15:36:02 -0500 Subject: [PATCH 23/33] remove faulty log --- gcsfs/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index dab320e6..91270a0a 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -424,6 +424,8 @@ def _get_object(self, path): # listing. raise FileNotFoundError(path) + import pdb + pdb.set_trace() result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', bucket, key)) logger.debug("_get_object result: %s", result) @@ -1268,6 +1270,7 @@ def __exit__(self, *args): self.close() +@_tracemethod def _fetch_range(obj_dict, session, start=None, end=None): """ Get data from GCS @@ -1276,7 +1279,6 @@ def _fetch_range(obj_dict, session, start=None, end=None): start, end : None or integers if not both None, fetch only given range """ - logger.debug("Fetch: {}, {}-{}", obj_dict['name'], start, end) if start is not None or end is not None: start = start or 0 end = end or 0 From c6457bb1a8aaba1cbf23ca0a02b2749b13b2728d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 15:38:27 -0500 Subject: [PATCH 24/33] remove pdb --- gcsfs/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 91270a0a..adeb9a1f 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -424,8 +424,6 @@ def _get_object(self, path): # listing. raise FileNotFoundError(path) - import pdb - pdb.set_trace() result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', bucket, key)) logger.debug("_get_object result: %s", result) From 70a78c6011393e0bad904c6db360773b34e19b43 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 15:51:45 -0500 Subject: [PATCH 25/33] small --- gcsfs/core.py | 9 +++++++++ gcsfs/gcsfuse.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index adeb9a1f..ead4b04b 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -406,6 +406,7 @@ def _get_object(self, path): """Return object information at the given path.""" logger.debug("_get_object(%s)", path) bucket, key = split_path(path) + fn = "/".join([bucket, key.rstrip("/")]) + "/" # Check if parent dir is in listing cache parent = "/".join([bucket, posixpath.dirname(key.rstrip("/"))]) + "/" @@ -418,6 +419,14 @@ def _get_object(self, path): else: # Should error on missing cache or reprobe? raise FileNotFoundError + if fn in self._listing_cache: + return { + 'bucket': bucket, + 'name': key, + 'kind': 'storage#object', + 'size': 0, + 'storageClass': 'DIRECTORY', + } if not key: # Attempt to "get" the bucket root, return error instead of diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index ee55dca5..c721091a 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -160,7 +160,7 @@ class GCSFS(Operations): def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): if gcs is None: # minimum block size: still read on 5MB boundaries. - self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, **fsargs) + self.gcs = GCSFileSystem(block_size=5 * 2 ** 20, **fsargs) else: self.gcs = gcs self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles) From 381a83c08ca9316b291392a16318d38db6fa91b8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 16:00:00 -0500 Subject: [PATCH 26/33] mixed --- gcsfs/gcsfuse.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index c721091a..968b24fa 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -147,7 +147,8 @@ def open(self, fn): """ if fn not in self.cache: f = self.gcs.open(fn, 'rb') - self.cache[fn] = f, [] + chunk = f.read(5 * 2**20) + self.cache[fn] = f, [{'start': 0, 'end': 5 * 2**20, data:chunk}] f.lock = Lock() logger.info('{} inserted into cache'.format(fn)) else: @@ -160,7 +161,7 @@ class GCSFS(Operations): def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): if gcs is None: # minimum block size: still read on 5MB boundaries. - self.gcs = GCSFileSystem(block_size=5 * 2 ** 20, **fsargs) + self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, **fsargs) else: self.gcs = gcs self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles) From 87546c9141bcd0f128d2efffbe773e283db22cda Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 16:01:30 -0500 Subject: [PATCH 27/33] timeout --- gcsfs/gcsfuse.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 968b24fa..633ef2a5 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -161,7 +161,8 @@ class GCSFS(Operations): def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): if gcs is None: # minimum block size: still read on 5MB boundaries. - self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, **fsargs) + self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, + cache_timeout=600, **fsargs) else: self.gcs = gcs self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles) From 2fe8647ddd3f3d595ec12fda61fb5a1d7a54589d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 8 Feb 2018 16:02:37 -0500 Subject: [PATCH 28/33] op --- gcsfs/gcsfuse.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 633ef2a5..4401b42a 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -148,7 +148,7 @@ def open(self, fn): if fn not in self.cache: f = self.gcs.open(fn, 'rb') chunk = f.read(5 * 2**20) - self.cache[fn] = f, [{'start': 0, 'end': 5 * 2**20, data:chunk}] + self.cache[fn] = f, [{'start': 0, 'end': 5 * 2**20, 'data': chunk}] f.lock = Lock() logger.info('{} inserted into cache'.format(fn)) else: From 69362e2165710c690fb8a0c44d342aa1fb8a915c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 9 Feb 2018 09:55:35 -0500 Subject: [PATCH 29/33] more --- gcsfs/gcsfuse.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 4401b42a..973aadab 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -123,6 +123,8 @@ def read(self, fn, offset, size): return f.read(size) logger.info('cache miss') with f.lock: + bs = f.blocksize + f.blocksize = 2 * 2 ** 20 f.seek(offset) out = f.read(size) new = True @@ -137,6 +139,7 @@ def read(self, fn, offset, size): new = False if new: chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + f.blocksize = bs return out @@ -161,8 +164,8 @@ class GCSFS(Operations): def __init__(self, path='.', gcs=None, nfiles=10, **fsargs): if gcs is None: # minimum block size: still read on 5MB boundaries. - self.gcs = GCSFileSystem(block_size=2 * 2 ** 20, - cache_timeout=600, **fsargs) + self.gcs = GCSFileSystem(block_size=30 * 2 ** 20, + cache_timeout=6000, **fsargs) else: self.gcs = gcs self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles) @@ -223,7 +226,7 @@ def rmdir(self, path): def read(self, path, size, offset, fh): fn = ''.join([self.root, path]) logger.info('read #{} ({}) offset: {}, size: {}'.format( - fh, fn, offset,size)) + fh, fn, offset, size)) out = self.cache.read(fn, offset, size) return out From 024e6fe2982ae7a25e5b79fa986d1ccab5f9e1ff Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 22 Feb 2018 14:13:22 -0500 Subject: [PATCH 30/33] fix --- gcsfs/core.py | 23 +- gcsfs/gcsfuse.py | 20 +- gcsfs/tests/recordings/test_fuse.yaml | 483 ++++++++++++++++++++++++++ 3 files changed, 508 insertions(+), 18 deletions(-) create mode 100644 gcsfs/tests/recordings/test_fuse.yaml diff --git a/gcsfs/core.py b/gcsfs/core.py index ee9ab18a..b3b2279f 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -462,7 +462,6 @@ def _process_object(self, bucket, object_metadata): def _get_object(self, path): """Return object information at the given path.""" bucket, key = split_path(path) - fn = "/".join([bucket, key.rstrip("/")]) + "/" # Check if parent dir is in listing cache parent = "/".join([bucket, posixpath.dirname(key.rstrip("/"))]) + "/" @@ -481,7 +480,8 @@ def _get_object(self, path): # listing. raise FileNotFoundError(path) - result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', bucket, key)) + result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', + bucket, key)) logger.debug("_get_object result: %s", result) return result @@ -547,10 +547,11 @@ def _do_list_objects(self, path, max_results = None): next_page_token = page.get('nextPageToken', None) result = { - "kind" : "storage#objects", - "prefixes" : prefixes, - "items" : items, + "kind": "storage#objects", + "prefixes": prefixes, + "items": items } + [self._process_object(bucket, i) for i in items], return result @@ -577,8 +578,8 @@ def _list_buckets(self): next_page_token = page.get('nextPageToken', None) result = { - "kind" : "storage#buckets", - "items" : items, + "kind": "storage#buckets", + "items": items, } return result @@ -643,10 +644,12 @@ def ls(self, path, detail=False): elif path.endswith("/"): return self._ls(path, detail) else: - combined_listing = self._ls(path, detail) + self._ls(path + "/", detail) + combined_listing = self._ls(path, detail) + self._ls(path + "/", + detail) if detail: - combined_entries = dict((l["path"],l) for l in combined_listing ) - combined_entries.pop(path+"/", None) + combined_entries = dict( + (l["path"], l) for l in combined_listing) + combined_entries.pop(path + "/", None) return list(combined_entries.values()) else: return list(set(combined_listing) - {path + "/"}) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index be4dbf61..1ee70f62 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -16,16 +16,17 @@ import cProfile import atexit -prof = cProfile.Profile() -prof.enable() +if True: + prof = cProfile.Profile() + prof.enable() -def dump(): - prof.disable() - prof.dump_stats('/home/ubuntu/out.prof') + def dump(): + prof.disable() + prof.dump_stats(os.path.join(os.path.expanduser("~"), 'out.prof')) -atexit.register(dump) + atexit.register(dump) @decorator.decorator @@ -34,12 +35,15 @@ def _tracemethod(f, self, *args, **kwargs): out = f(self, *args, **kwargs) return out + logger = logging.getLogger(__name__) + @decorator.decorator def _tracemethod(f, self, *args, **kwargs): - logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) - return f(self, *args, **kwargs) + logger.debug("%s(args=%s, kwargs=%s)", f.__name__, args, kwargs) + return f(self, *args, **kwargs) + def str_to_time(s): t = pd.to_datetime(s) diff --git a/gcsfs/tests/recordings/test_fuse.yaml b/gcsfs/tests/recordings/test_fuse.yaml new file mode 100644 index 00000000..0fea6afe --- /dev/null +++ b/gcsfs/tests/recordings/test_fuse.yaml @@ -0,0 +1,483 @@ +interactions: +- request: + body: client_secret=xxx&refresh_token=xxx&grant_type=refresh_token&client_id=xxx + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['208'] + content-type: [application/x-www-form-urlencoded] + method: POST + uri: https://www.googleapis.com/oauth2/v4/token + response: + body: + string: !!binary | + H4sIANYVj1oC/6tWSkxOTi0uji/Jz07NU7JSUKqoqFDSUVBKrSjILEotjs8ECRqbGRgAxTJTMJSB + +fEllQWpIEGn1MSi1CKlWgA253KRVgAAAA== + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Encoding: [gzip] + Content-Type: [application/json; charset=UTF-8] + Pragma: [no-cache] + Server: [GSE] + Transfer-Encoding: [chunked] + Vary: [Origin, X-Origin] + X-Content-Type-Options: [nosniff] + X-Frame-Options: [SAMEORIGIN] + X-XSS-Protection: [1; mode=block] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/?project=test_project + response: + body: {string: "{\n \"kind\": \"storage#buckets\",\n \"items\": [\n {\n \"kind\": + \"storage#bucket\",\n \"id\": \"anaconda-enterprise\",\n \"selfLink\": + \"https://www.googleapis.com/storage/v1/b/anaconda-enterprise\",\n \"projectNumber\": + \"586241054156\",\n \"name\": \"anaconda-enterprise\",\n \"timeCreated\": + \"2017-07-05T23:53:06.552Z\",\n \"updated\": \"2017-07-14T17:39:54.178Z\",\n + \ \"metageneration\": \"3\",\n \"location\": \"US\",\n \"storageClass\": + \"MULTI_REGIONAL\",\n \"etag\": \"CAM=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"anaconda-public-data\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/anaconda-public-data\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"anaconda-public-data\",\n + \ \"timeCreated\": \"2017-04-05T20:22:12.865Z\",\n \"updated\": \"2017-07-10T16:32:07.980Z\",\n + \ \"metageneration\": \"2\",\n \"location\": \"US\",\n \"storageClass\": + \"MULTI_REGIONAL\",\n \"etag\": \"CAI=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"artifacts.test_project.appspot.com\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/artifacts.test_project.appspot.com\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"artifacts.test_project.appspot.com\",\n + \ \"timeCreated\": \"2016-05-17T18:29:22.774Z\",\n \"updated\": \"2016-05-17T18:29:22.774Z\",\n + \ \"metageneration\": \"1\",\n \"location\": \"US\",\n \"storageClass\": + \"STANDARD\",\n \"etag\": \"CAE=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"test_project_cloudbuild\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/test_project_cloudbuild\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"test_project_cloudbuild\",\n + \ \"timeCreated\": \"2017-11-03T20:06:49.744Z\",\n \"updated\": \"2017-11-03T20:06:49.744Z\",\n + \ \"metageneration\": \"1\",\n \"location\": \"US\",\n \"storageClass\": + \"STANDARD\",\n \"etag\": \"CAE=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"dataflow-anaconda-compute\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/dataflow-anaconda-compute\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"dataflow-anaconda-compute\",\n + \ \"timeCreated\": \"2017-09-14T18:55:42.848Z\",\n \"updated\": \"2017-09-14T18:55:42.848Z\",\n + \ \"metageneration\": \"1\",\n \"location\": \"US\",\n \"storageClass\": + \"MULTI_REGIONAL\",\n \"etag\": \"CAE=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"gcsfs-test\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-test\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"gcsfs-test\",\n \"timeCreated\": + \"2017-12-02T23:25:23.058Z\",\n \"updated\": \"2018-01-04T14:07:08.519Z\",\n + \ \"metageneration\": \"2\",\n \"location\": \"US\",\n \"storageClass\": + \"MULTI_REGIONAL\",\n \"etag\": \"CAI=\"\n },\n {\n \"kind\": \"storage#bucket\",\n + \ \"id\": \"gcsfs-testing\",\n \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing\",\n + \ \"projectNumber\": \"586241054156\",\n \"name\": \"gcsfs-testing\",\n + \ \"timeCreated\": \"2017-12-12T16:52:13.675Z\",\n \"updated\": \"2017-12-12T16:52:13.675Z\",\n + \ \"metageneration\": \"1\",\n \"location\": \"US\",\n \"storageClass\": + \"STANDARD\",\n \"etag\": \"CAE=\"\n }\n ]\n}\n"} + headers: + Cache-Control: ['private, max-age=0, must-revalidate, no-transform'] + Content-Length: ['2944'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: DELETE + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/tmp%2Ftest%2Fa + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: DELETE + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/tmp%2Ftest%2Fb + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: DELETE + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/tmp%2Ftest%2Fc + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: DELETE + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/tmp%2Ftest%2Fd + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/.DS_Store + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/?delimiter=%2F + response: + body: {string: "{\n \"kind\": \"storage#objects\"\n}\n"} + headers: + Cache-Control: ['private, max-age=0, must-revalidate, no-transform'] + Content-Length: ['31'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/.DS_Store + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: POST + uri: https://www.googleapis.com/upload/storage/v1/b/gcsfs-testing/o?name=hello&uploadType=media + response: + body: {string: "{\n \"kind\": \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326682064763\",\n + \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": \"1519326682064763\",\n + \"metageneration\": \"1\",\n \"timeCreated\": \"2018-02-22T19:11:22.058Z\",\n + \"updated\": \"2018-02-22T19:11:22.058Z\",\n \"storageClass\": \"STANDARD\",\n + \"timeStorageClassUpdated\": \"2018-02-22T19:11:22.058Z\",\n \"size\": \"0\",\n + \"md5Hash\": \"1B2M2Y8AsgTpgAmY7PhCfg==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326682064763&alt=media\",\n + \"crc32c\": \"AAAAAA==\",\n \"etag\": \"CPvOuvmcutkCEAE=\"\n}\n"} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['657'] + Content-Type: [application/json; charset=UTF-8] + ETag: [CPvOuvmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello + response: + body: {string: "{\n \"kind\": \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326682064763\",\n + \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": \"1519326682064763\",\n + \"metageneration\": \"1\",\n \"timeCreated\": \"2018-02-22T19:11:22.058Z\",\n + \"updated\": \"2018-02-22T19:11:22.058Z\",\n \"storageClass\": \"STANDARD\",\n + \"timeStorageClassUpdated\": \"2018-02-22T19:11:22.058Z\",\n \"size\": \"0\",\n + \"md5Hash\": \"1B2M2Y8AsgTpgAmY7PhCfg==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326682064763&alt=media\",\n + \"crc32c\": \"AAAAAA==\",\n \"etag\": \"CPvOuvmcutkCEAE=\"\n}\n"} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['657'] + Content-Type: [application/json; charset=UTF-8] + ETag: [CPvOuvmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/._hello + response: + body: {string: "{\n \"error\": {\n \"errors\": [\n {\n \"domain\": \"global\",\n + \ \"reason\": \"notFound\",\n \"message\": \"Not Found\"\n }\n ],\n + \ \"code\": 404,\n \"message\": \"Not Found\"\n }\n}\n"} + headers: + Cache-Control: ['private, max-age=0'] + Content-Length: ['165'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 404, message: Not Found} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/?delimiter=%2F + response: + body: {string: "{\n \"kind\": \"storage#objects\",\n \"items\": [\n {\n \"kind\": + \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326682064763\",\n + \ \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \ \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": + \"1519326682064763\",\n \"metageneration\": \"1\",\n \"timeCreated\": + \"2018-02-22T19:11:22.058Z\",\n \"updated\": \"2018-02-22T19:11:22.058Z\",\n + \ \"storageClass\": \"STANDARD\",\n \"timeStorageClassUpdated\": \"2018-02-22T19:11:22.058Z\",\n + \ \"size\": \"0\",\n \"md5Hash\": \"1B2M2Y8AsgTpgAmY7PhCfg==\",\n \"mediaLink\": + \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326682064763&alt=media\",\n + \ \"crc32c\": \"AAAAAA==\",\n \"etag\": \"CPvOuvmcutkCEAE=\"\n }\n ]\n}\n"} + headers: + Cache-Control: ['private, max-age=0, must-revalidate, no-transform'] + Content-Length: ['740'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: hello + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['5'] + method: POST + uri: https://www.googleapis.com/upload/storage/v1/b/gcsfs-testing/o?name=hello&uploadType=media + response: + body: {string: "{\n \"kind\": \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326683041310\",\n + \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": \"1519326683041310\",\n + \"metageneration\": \"1\",\n \"timeCreated\": \"2018-02-22T19:11:23.032Z\",\n + \"updated\": \"2018-02-22T19:11:23.032Z\",\n \"storageClass\": \"STANDARD\",\n + \"timeStorageClassUpdated\": \"2018-02-22T19:11:23.032Z\",\n \"size\": \"5\",\n + \"md5Hash\": \"XUFAKrxLKna5cZ2REBfFkg==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326683041310&alt=media\",\n + \"crc32c\": \"mnG7TA==\",\n \"etag\": \"CJ6c9vmcutkCEAE=\"\n}\n"} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['657'] + Content-Type: [application/json; charset=UTF-8] + ETag: [CJ6c9vmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello + response: + body: {string: "{\n \"kind\": \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326683041310\",\n + \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": \"1519326683041310\",\n + \"metageneration\": \"1\",\n \"timeCreated\": \"2018-02-22T19:11:23.032Z\",\n + \"updated\": \"2018-02-22T19:11:23.032Z\",\n \"storageClass\": \"STANDARD\",\n + \"timeStorageClassUpdated\": \"2018-02-22T19:11:23.032Z\",\n \"size\": \"5\",\n + \"md5Hash\": \"XUFAKrxLKna5cZ2REBfFkg==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326683041310&alt=media\",\n + \"crc32c\": \"mnG7TA==\",\n \"etag\": \"CJ6c9vmcutkCEAE=\"\n}\n"} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['657'] + Content-Type: [application/json; charset=UTF-8] + ETag: [CJ6c9vmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/?delimiter=%2F + response: + body: {string: "{\n \"kind\": \"storage#objects\",\n \"items\": [\n {\n \"kind\": + \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326683041310\",\n + \ \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \ \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": + \"1519326683041310\",\n \"metageneration\": \"1\",\n \"timeCreated\": + \"2018-02-22T19:11:23.032Z\",\n \"updated\": \"2018-02-22T19:11:23.032Z\",\n + \ \"storageClass\": \"STANDARD\",\n \"timeStorageClassUpdated\": \"2018-02-22T19:11:23.032Z\",\n + \ \"size\": \"5\",\n \"md5Hash\": \"XUFAKrxLKna5cZ2REBfFkg==\",\n \"mediaLink\": + \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326683041310&alt=media\",\n + \ \"crc32c\": \"mnG7TA==\",\n \"etag\": \"CJ6c9vmcutkCEAE=\"\n }\n ]\n}\n"} + headers: + Cache-Control: ['private, max-age=0, must-revalidate, no-transform'] + Content-Length: ['740'] + Content-Type: [application/json; charset=UTF-8] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + method: GET + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello + response: + body: {string: "{\n \"kind\": \"storage#object\",\n \"id\": \"gcsfs-testing/hello/1519326683041310\",\n + \"selfLink\": \"https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello\",\n + \"name\": \"hello\",\n \"bucket\": \"gcsfs-testing\",\n \"generation\": \"1519326683041310\",\n + \"metageneration\": \"1\",\n \"timeCreated\": \"2018-02-22T19:11:23.032Z\",\n + \"updated\": \"2018-02-22T19:11:23.032Z\",\n \"storageClass\": \"STANDARD\",\n + \"timeStorageClassUpdated\": \"2018-02-22T19:11:23.032Z\",\n \"size\": \"5\",\n + \"md5Hash\": \"XUFAKrxLKna5cZ2REBfFkg==\",\n \"mediaLink\": \"https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?generation=1519326683041310&alt=media\",\n + \"crc32c\": \"mnG7TA==\",\n \"etag\": \"CJ6c9vmcutkCEAE=\"\n}\n"} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['657'] + Content-Type: [application/json; charset=UTF-8] + ETag: [CJ6c9vmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 200, message: OK} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Range: [bytes=0-10485759] + method: GET + uri: https://www.googleapis.com/download/storage/v1/b/gcsfs-testing/o/hello?alt=media&generation=1519326683041310 + response: + body: {string: hello} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Disposition: [attachment] + Content-Length: ['5'] + Content-Range: [bytes 0-4/5] + Content-Type: [application/octet-stream] + ETag: [CJ6c9vmcutkCEAE=] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + X-Goog-Generation: ['1519326683041310'] + X-Goog-Metageneration: ['1'] + X-Goog-Storage-Class: [STANDARD] + status: {code: 206, message: Partial Content} +- request: + body: null + headers: + Accept: ['*/*'] + Accept-Encoding: ['gzip, deflate'] + Connection: [keep-alive] + Content-Length: ['0'] + method: DELETE + uri: https://www.googleapis.com/storage/v1/b/gcsfs-testing/o/hello + response: + body: {string: ''} + headers: + Cache-Control: ['no-cache, no-store, max-age=0, must-revalidate'] + Content-Length: ['0'] + Content-Type: [application/json] + Pragma: [no-cache] + Server: [UploadServer] + Vary: [Origin, X-Origin] + status: {code: 204, message: No Content} +version: 1 From c9737c32011bcbd96d719eda924eb6c05911eeae Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 22 Feb 2018 17:53:45 -0500 Subject: [PATCH 31/33] fix for py2 --- gcsfs/gcsfuse.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 1ee70f62..2a8da1ed 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -53,13 +53,15 @@ def str_to_time(s): class LRUDict(MutableMapping): """A dict that discards least-recently-used items""" - def __init__(self, *args, size=128, **kwargs): + DEFAULT_SIZE = 128 + + def __init__(self, *args, **kwargs): """Same arguments as OrderedDict with one additions: size: maximum number of entries """ + self.size = kwargs.pop('size', self.DEFAULT_SIZE) self.data = OrderedDict(*args, **kwargs) - self.size = size self.purge() def purge(self): From 1b0aec30475b9bcdc69b752fbe7d81173c319dc9 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sat, 24 Feb 2018 10:41:24 -0500 Subject: [PATCH 32/33] style --- gcsfs/core.py | 37 ++++++++++++++----------------------- gcsfs/gcsfuse.py | 13 +------------ 2 files changed, 15 insertions(+), 35 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index c2ac7d7e..c242036e 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -415,9 +415,8 @@ def _save_tokens(): except Exception as e: warnings.warn('Saving token cache failed: ' + str(e)) + @_tracemethod def _call(self, method, path, *args, **kwargs): - logger.debug("_call(%s, %s, args=%s, kwargs=%s)", method, path, args, kwargs) - for k, v in list(kwargs.items()): # only pass parameters that have values if v is None: @@ -492,7 +491,6 @@ def _get_object(self, path): result = self._process_object(bucket, self._call('get', 'b/{}/o/{}', bucket, key)) - logger.debug("_get_object result: %s", result) return result @_tracemethod @@ -503,7 +501,8 @@ def _maybe_get_cached_listing(self, path): cache_age = time.time() - retrieved_time if self.cache_timeout is not None and cache_age > self.cache_timeout: logger.debug( - "expired cache path: %s retrieved_time: %.3f cache_age: %.3f cache_timeout: %.3f", + "expired cache path: %s retrieved_time: %.3f cache_age: " + "%.3f cache_timeout: %.3f", path, retrieved_time, cache_age, self.cache_timeout ) del self._listing_cache[path] @@ -556,20 +555,16 @@ def _do_list_objects(self, path, max_results = None): next_page_token = page.get('nextPageToken', None) result = { - "kind" : "storage#objects", - "prefixes" : prefixes, - "items" : [self._process_object(bucket, i) for i in items], + "kind": "storage#objects", + "prefixes": prefixes, + "items": [self._process_object(bucket, i) for i in items], } - logger.debug("_list_objects result: %s", {k : len(result[k]) for k in ("prefixes", "items")}) - return result + @_tracemethod def _list_buckets(self): """Return list of all buckets under the current project.""" - - logger.debug("_list_buckets") - items = [] page = self._call( 'get', 'b/', project=self.project @@ -597,12 +592,13 @@ def _list_buckets(self): @_tracemethod def invalidate_cache(self, path=None): """ - Invalidate listing cache for given path, so that it is reloaded on next use. + Invalidate listing cache for given path, it is reloaded on next use. Parameters ---------- path: string or None - If None, clear all listings cached else listings at or under given path. + If None, clear all listings cached else listings at or under given + path. """ if not path: @@ -610,10 +606,9 @@ def invalidate_cache(self, path=None): self._listing_cache.clear() else: path = norm_path(path) - logger.debug("invalidate_cache prefix: %s", path) - invalid_keys = [k for k in self._listing_cache if k.startswith(path)] - logger.debug("invalidate_cache keys: %s", invalid_keys) + invalid_keys = [k for k in self._listing_cache + if k.startswith(path)] for k in invalid_keys: self._listing_cache.pop(k, None) @@ -664,6 +659,7 @@ def ls(self, path, detail=False): else: return list(set(combined_listing) - {path + "/"}) + @_tracemethod def _ls(self, path, detail=False): listing = self._list_objects(path) bucket, key = split_path(path) @@ -673,14 +669,9 @@ def _ls(self, path, detail=False): # Convert item listing into list of 'item' and 'subdir/' # entries. Items may be of form "key/", in which case there # will be duplicate entries in prefix and item_names. - item_names = [ - f["name"] for f in listing["items"] if f["name"] - ] + item_names = [f["name"] for f in listing["items"] if f["name"]] prefixes = [p for p in listing["prefixes"]] - logger.debug("path: %s item_names: %s prefixes: %s", path, - item_names, prefixes) - return [ posixpath.join(bucket, n) for n in set(item_names + prefixes) ] diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 2a8da1ed..2749745e 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -138,18 +138,7 @@ def read(self, fn, offset, size): f.blocksize = 2 * 2 ** 20 f.seek(offset) out = f.read(size) - new = True - for chunk in chunks: - if chunk['end'] == f.start - 1: - chunk['end'] = f.end - chunk['data'] += f.cache - new = False - elif chunk['start'] == f.end + 1: - chunk['start'] = f.start - chunk['data'] = f.cache + chunk['data'] - new = False - if new: - chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) + chunks.append({'start': f.start, 'end': f.end, 'data': f.cache}) f.blocksize = bs return out From b6210afbcc336f4c44f55259c2e3abe344688969 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Sun, 25 Feb 2018 17:34:22 -0500 Subject: [PATCH 33/33] clean up --- gcsfs/cli/gcsfuse.py | 2 +- gcsfs/gcsfuse.py | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/gcsfs/cli/gcsfuse.py b/gcsfs/cli/gcsfuse.py index 9fe8d2ff..06f69f49 100644 --- a/gcsfs/cli/gcsfuse.py +++ b/gcsfs/cli/gcsfuse.py @@ -15,7 +15,7 @@ @click.option('--foreground/--background', default=True, help="Run in the foreground or as a background process") @click.option('--threads/--no-threads', default=True, - help="Run in the foreground or as a background process") + help="Whether to run with threads") @click.option('--cache_files', type=int, default=10, help="Number of open files to cache") @click.option('-v', '--verbose', count=True, diff --git a/gcsfs/gcsfuse.py b/gcsfs/gcsfuse.py index 2749745e..49c09c49 100644 --- a/gcsfs/gcsfuse.py +++ b/gcsfs/gcsfuse.py @@ -13,21 +13,6 @@ import time from threading import Lock -import cProfile -import atexit - -if True: - prof = cProfile.Profile() - prof.enable() - - - def dump(): - prof.disable() - prof.dump_stats(os.path.join(os.path.expanduser("~"), 'out.prof')) - - - atexit.register(dump) - @decorator.decorator def _tracemethod(f, self, *args, **kwargs):