Skip to content

Commit

Permalink
AsyncIO compression part 2 - added async read and asyncio to compress…
Browse files Browse the repository at this point in the history
…ion code (#3022)

* Compression asyncio:
- Added asyncio functionality for compression flow
- Added ReadPool for async reads, implemented in both comp and decomp flows
  • Loading branch information
yoniko authored Jan 31, 2022
1 parent 0b70da6 commit cc0657f
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 302 deletions.
1 change: 1 addition & 0 deletions programs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Advanced arguments :
--filelist FILE : read list of files to operate upon from FILE
--output-dir-flat DIR : processed files are stored into DIR
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
--[no-]asyncio : use asynchronous IO (default: enabled)
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
-- : All arguments after "--" are treated as files
Expand Down
425 changes: 196 additions & 229 deletions programs/fileio.c

Large diffs are not rendered by default.

353 changes: 305 additions & 48 deletions programs/fileio_asyncio.c

Large diffs are not rendered by default.

85 changes: 73 additions & 12 deletions programs/fileio_asyncio.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down Expand Up @@ -28,7 +28,7 @@ typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int totalIoJobs;
FIO_prefs_t* prefs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;

/* Controls the file we currently write to, make changes only by using provided utility functions */
Expand All @@ -39,8 +39,36 @@ typedef struct {
ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS];
int availableJobsCount;
size_t jobBufferSize;
} IOPoolCtx_t;

typedef struct {
IOPoolCtx_t base;

/* State regarding the currently read file */
int reachedEof;
U64 nextReadOffset;
U64 waitingOnOffset;

/* We may hold an IOJob object as needed if we actively expose its buffer. */
void *currentJobHeld;

/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
* the first of them. Shouldn't be accessed from outside ot utility functions. */
U8 *coalesceBuffer;

/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
* change when consuming / refilling buffer. */
U8 *srcBuffer;
size_t srcBufferLoaded;

/* We need to know what tasks completed so we can use their buffers when their time comes.
* Should only be accessed after locking base.ioJobsMutex . */
void* completedJobs[MAX_IO_JOBS];
int completedJobsCount;
ZSTD_pthread_cond_t jobCompletedCond;
} ReadPoolCtx_t;

typedef struct {
IOPoolCtx_t base;
unsigned storedSkips;
Expand All @@ -59,15 +87,10 @@ typedef struct {
U64 offset;
} IOJob_t;

/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips);
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void);

void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);

/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
Expand Down Expand Up @@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);

/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);

/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
Expand All @@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);

/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);

/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);

/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);

/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);

/* AIO_ReadPool_fillBuffer:
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);

/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);

/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);

/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);

/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);

#if defined (__cplusplus)
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion programs/fileio_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down
4 changes: 2 additions & 2 deletions programs/fileio_types.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down Expand Up @@ -70,4 +70,4 @@ typedef struct FIO_prefs_s {
int allowBlockDevices;
} FIO_prefs_t;

#endif /* FILEIO_TYPES_HEADER */
#endif /* FILEIO_TYPES_HEADER */
8 changes: 4 additions & 4 deletions programs/zstdcli.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
# include "zstdcli_trace.h"
#endif
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
#include "fileio_asyncio.h"


/*-************************************
Expand Down Expand Up @@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
#ifdef UTIL_HAS_MIRRORFILELIST
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
#endif

if (AIO_supported())
DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");

#ifndef ZSTD_NOCOMPRESS
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
Expand Down Expand Up @@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
#ifdef ZSTD_MULTITHREAD
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
#endif
# if ZSTD_SPARSE_DEFAULT
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
# else
Expand Down Expand Up @@ -1459,6 +1458,7 @@ int main(int argCount, const char* argv[])
FIO_setTargetCBlockSize(prefs, targetCBlockSize);
FIO_setSrcSizeHint(prefs, srcSizeHint);
FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
FIO_setSparseWrite(prefs, 0);
if (adaptMin > cLevel) cLevel = adaptMin;
if (adaptMax < cLevel) cLevel = adaptMax;

Expand Down
15 changes: 9 additions & 6 deletions tests/playTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,13 @@ zstd -dc - < tmp.zst > $INTOVOID
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
zstd -d - < tmp.zst > $INTOVOID
println "test : impose memory limitation (must fail)"
zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
datagen -g500K > tmplimit
zstd -f tmplimit
zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
rm -f tmplimit tmplimit.zst
println "test : overwrite protection"
zstd -q tmp && die "overwrite check failed!"
println "test : force overwrite"
Expand Down Expand Up @@ -1596,11 +1599,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
exit 1
fi

println "\n===> zstd asyncio decompression tests "
println "\n===> zstd asyncio tests "

addFrame() {
datagen -g2M -s$2 >> tmp_uncompressed
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
}

addTwoFrames() {
Expand Down

0 comments on commit cc0657f

Please sign in to comment.