diff --git a/tensorboard/compat/tensorflow_stub/io/gfile.py b/tensorboard/compat/tensorflow_stub/io/gfile.py index 164cf20c6e..950711fe79 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile.py @@ -93,29 +93,40 @@ def join(self, path, *paths): """Join paths with path delimiter.""" return os.path.join(path, *paths) - def read(self, filename, binary_mode=False, size=None, offset=None): + def read(self, filename, binary_mode=False, size=None, continue_from=None): """Reads contents of a file to a string. Args: filename: string, a path binary_mode: bool, read as binary if True, otherwise text size: int, number of bytes or characters to read, otherwise - read all the contents of the file from the offset - offset: int, offset into file to read from, otherwise read - from the very beginning + read all the contents of the file (from the continuation + marker, if present). + continue_from: An opaque value returned from a prior invocation of + `read(...)` marking the last read position, so that reading + may continue from there. Otherwise read from the beginning. Returns: - Subset of the contents of the file as a string or bytes. + A tuple of `(data, continuation_token)` where `data' provides either + bytes read from the file (if `binary_mode == true`) or the decoded + string representation thereof (otherwise), and `continuation_token` + is an opaque value that can be passed to the next invocation of + `read(...) ' in order to continue from the last read position. """ mode = "rb" if binary_mode else "r" encoding = None if binary_mode else "utf8" + offset = None + if continue_from is not None: + offset = continue_from.get("opaque_offset", None) with io.open(filename, mode, encoding=encoding) as f: if offset is not None: f.seek(offset) - if size is not None: - return f.read(size) - else: - return f.read() + data = f.read(size) + # The new offset may not be `offset + len(data)`, due to decoding + # and newline translation. + # So, just measure it in whatever terms the underlying stream uses. + continuation_token = {"opaque_offset": f.tell()} + return (data, continuation_token) def write(self, filename, file_content, binary_mode=False): """Writes string file contents to a file, overwriting any @@ -186,10 +197,10 @@ def stat(self, filename): # NOTE: Size of the file is given by .st_size as returned from # os.stat(), but we convert to .length try: - len = os.stat(compat.as_bytes(filename)).st_size + file_length = os.stat(compat.as_bytes(filename)).st_size except OSError: raise errors.NotFoundError(None, None, "Could not find file") - return StatData(len) + return StatData(file_length) class S3FileSystem(object): @@ -222,31 +233,47 @@ def join(self, path, *paths): """Join paths with a slash.""" return "/".join((path,) + paths) - def read(self, filename, binary_mode=False, size=None, offset=None): + def read(self, filename, binary_mode=False, size=None, continue_from=None): """Reads contents of a file to a string. Args: filename: string, a path binary_mode: bool, read as binary if True, otherwise text size: int, number of bytes or characters to read, otherwise - read all the contents of the file from the offset - offset: int, offset into file to read from, otherwise read - from the very beginning + read all the contents of the file (from the continuation + marker, if present). + continue_from: An opaque value returned from a prior invocation of + `read(...)` marking the last read position, so that reading + may continue from there. Otherwise read from the beginning. Returns: - Subset of the contents of the file as a string or bytes. + A tuple of `(data, continuation_token)` where `data' provides either + bytes read from the file (if `binary_mode == true`) or the decoded + string representation thereof (otherwise), and `continuation_token` + is an opaque value that can be passed to the next invocation of + `read(...) ' in order to continue from the last read position. """ s3 = boto3.resource("s3") bucket, path = self.bucket_and_path(filename) args = {} - endpoint = 0 - if size is not None or offset is not None: - if offset is None: - offset = 0 - endpoint = '' if size is None else (offset + size) - if offset != 0 or endpoint != '': - # Asked for a range, so modify the request - args['Range'] = 'bytes={}-{}'.format(offset, endpoint) + + # For the S3 case, we use continuation tokens of the form + # {byte_offset: number} + offset = 0 + if continue_from is not None: + offset = continue_from.get("byte_offset", 0) + + endpoint = '' + if size is not None: + # TODO(orionr): This endpoint risks splitting a multi-byte + # character or splitting \r and \n in the case of CRLFs, + # producing decoding errors below. + endpoint = offset + size + + if offset != 0 or endpoint != '': + # Asked for a range, so modify the request + args['Range'] = 'bytes={}-{}'.format(offset, endpoint) + try: stream = s3.Object(bucket, path).get(**args)['Body'].read() except botocore.exceptions.ClientError as exc: @@ -256,8 +283,8 @@ def read(self, filename, binary_mode=False, size=None, offset=None): # in a second request so we don't check length in all cases. client = boto3.client("s3") obj = client.head_object(Bucket=bucket, Key=path) - len = obj['ContentLength'] - endpoint = min(len, offset + size) + content_length = obj['ContentLength'] + endpoint = min(content_length, offset + size) if offset == endpoint: # Asked for no bytes, so just return empty stream = b'' @@ -266,10 +293,14 @@ def read(self, filename, binary_mode=False, size=None, offset=None): stream = s3.Object(bucket, path).get(**args)['Body'].read() else: raise + # `stream` should contain raw bytes here (i.e., there has been neither + # decoding nor newline translation), so the byte offset increases by + # the expected amount. + continuation_token = {'byte_offset': (offset + len(stream))} if binary_mode: - return bytes(stream) + return (bytes(stream), continuation_token) else: - return stream.decode('utf-8') + return (stream.decode('utf-8'), continuation_token) def write(self, filename, file_content, binary_mode=False): """Writes string file contents to a file. @@ -384,10 +415,13 @@ def __init__(self, filename, mode): self.filename = compat.as_bytes(filename) self.fs = get_filesystem(self.filename) self.fs_supports_append = hasattr(self.fs, 'append') - self.buff_chunk_size = _DEFAULT_BLOCK_SIZE self.buff = None + # The buffer offset and the buffer chunk size are measured in the + # natural units of the underlying stream, i.e. bytes for binary mode, + # or characters in text mode. + self.buff_chunk_size = _DEFAULT_BLOCK_SIZE self.buff_offset = 0 - self.offset = 0 + self.continuation_token = None self.write_temp = None self.write_started = False self.binary_mode = 'b' in mode @@ -401,7 +435,7 @@ def __exit__(self, *args): self.close() self.buff = None self.buff_offset = 0 - self.offset = 0 + self.continuation_token = None def __iter__(self): return self @@ -409,7 +443,6 @@ def __iter__(self): def _read_buffer_to_offset(self, new_buff_offset): old_buff_offset = self.buff_offset read_size = min(len(self.buff), new_buff_offset) - old_buff_offset - self.offset += read_size self.buff_offset += read_size return self.buff[old_buff_offset:old_buff_offset + read_size] @@ -438,8 +471,8 @@ def read(self, n=None): # read from filesystem read_size = max(self.buff_chunk_size, n) if n is not None else None - self.buff = self.fs.read(self.filename, self.binary_mode, - read_size, self.offset) + (self.buff, self.continuation_token) = self.fs.read( + self.filename, self.binary_mode, read_size, self.continuation_token) self.buff_offset = 0 # add from filesystem diff --git a/tensorboard/compat/tensorflow_stub/io/gfile_test.py b/tensorboard/compat/tensorflow_stub/io/gfile_test.py index 03e7b75725..3151a9f6f2 100644 --- a/tensorboard/compat/tensorflow_stub/io/gfile_test.py +++ b/tensorboard/compat/tensorflow_stub/io/gfile_test.py @@ -165,16 +165,26 @@ def testReadLines(self): temp_dir = self.get_temp_dir() self._CreateDeepDirectoryStructure(temp_dir) ckpt_path = os.path.join(temp_dir, 'model.ckpt') - ckpt_lines = ( + + # Note \r\n, which io.read will automatically replace with \n. + # That substitution desynchronizes character offsets (omitting \r) from + # the underlying byte offsets (counting \r). Multibyte characters would + # similarly cause desynchronization. + raw_ckpt_lines = ( + [u'\r\n'] + [u'line {}\r\n'.format(i) for i in range(10)] + [u' '] + ) + expected_ckpt_lines = ( # without \r [u'\n'] + [u'line {}\n'.format(i) for i in range(10)] + [u' '] ) - # Write out \n as newline even on Windows + # Write out newlines as given (i.e., \r\n) regardless of OS, so as to + # test translation on read. with io.open(ckpt_path, 'w', newline='') as f: - f.write(u''.join(ckpt_lines)) + data = u''.join(raw_ckpt_lines) + f.write(data) with gfile.GFile(ckpt_path, 'r') as f: f.buff_chunk_size = 4 # Test buffering by reducing chunk size - ckpt_read_lines = list(f) - self.assertEqual(ckpt_lines, ckpt_read_lines) + read_ckpt_lines = list(f) + self.assertEqual(expected_ckpt_lines, read_ckpt_lines) def testReadWithOffset(self): temp_dir = self.get_temp_dir()