Skip to content

Commit

Permalink
improve Blosc compression operator
Browse files Browse the repository at this point in the history
The current implementation of the Blosc ADIOS2 operator is due to
limitations of Blosc not supporting variables larger than 2GiB.

- Add option `threshold` to set a threshold under which data will only
be copied instead of compressed (equivalent to Blosc in ADIOS1).
- Use chunk format to avoid 2GiB limit for variables (datasets).
- Add support to decompress blosc data written with ADIOS before this
PR.
  • Loading branch information
psychocoderHPC committed Jan 20, 2021
1 parent 5948ca8 commit ae5dc35
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 20 deletions.
236 changes: 217 additions & 19 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*
* Created on: Jun 18, 2019
* Author: William F Godoy godoywf@ornl.gov
* Rene Widera r.widera@hzdr.de
*/

#include "CompressBlosc.h"
Expand All @@ -16,6 +17,10 @@ extern "C" {

#include "adios2/helper/adiosFunctions.h"

#include <algorithm>
#include <cassert>
#include <cstring>

namespace adios2
{
namespace core
Expand All @@ -36,6 +41,25 @@ CompressBlosc::CompressBlosc(const Params &parameters)
{
}

size_t CompressBlosc::BufferMaxSize(const size_t sizeIn) const
{
const size_t maxInputPerChunk = BLOSC_MAX_BUFFERSIZE;
const size_t numFullChunks = sizeIn / maxInputPerChunk;
const size_t sizeLastChunk = sizeIn % maxInputPerChunk;

const size_t maxOutputPerChunk = maxInputPerChunk + BLOSC_MAX_OVERHEAD;
const size_t maxOutputLastChunk = sizeLastChunk + BLOSC_MAX_OVERHEAD;

/* DataHeader is used to detect of old format which can only handle
* BLOSC_MAX_BUFFERSIZE (<2GiB) or the new adios2 chunked blosc format is
* used.
*/
const size_t maxRquiredDataMem = maxOutputPerChunk * numFullChunks +
maxOutputLastChunk + sizeof(DataHeader);

return maxRquiredDataMem;
}

size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
const size_t elementSize, DataType type,
void *bufferOut, const Params &parameters,
Expand All @@ -44,6 +68,10 @@ size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
const size_t sizeIn =
static_cast<size_t>(helper::GetTotalSize(dimensions) * elementSize);

bool useMemcpy = false;
/* input size under this bound would not compressed */
size_t thresholdSize = 128;

blosc_init();

size_t threads = 1; // defaults
Expand Down Expand Up @@ -95,48 +123,218 @@ size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
{
throw std::invalid_argument(
"ERROR: invalid compressor " + compressor +
" valid values: blosclz (default), lz4, lz4hc, snappy, "
"zlib, or ztsd, in call to ADIOS2 Blosc Compression\n");
" valid values: blosclz (default), lz4, lz4hc, "
"snappy, "
"zlib, or zstd, in call to ADIOS2 Blosc Compression\n");
}
}
else if (key == "blocksize")
{
blockSize = static_cast<size_t>(helper::StringTo<uint64_t>(
value, "when setting Blosc blocksize parameter\n"));
}
else if (key == "threshold")
{
thresholdSize = static_cast<size_t>(helper::StringTo<uint64_t>(
value, "when setting Blosc threshold parameter\n"));
if (thresholdSize < 128u)
thresholdSize = 128u;
}
}

const int result = blosc_set_compressor(compressor.c_str());
if (result == -1)
// write header to detect new compression format (set first 8 byte to zero)
DataHeader *headerPtr = reinterpret_cast<DataHeader *>(bufferOut);

// set default header
*headerPtr = DataHeader{};

const uint8_t *inputDataBuff = reinterpret_cast<const uint8_t *>(dataIn);

int32_t typesize = elementSize;
if (typesize > BLOSC_MAX_TYPESIZE)
typesize = 1;

uint8_t *outputBuff = reinterpret_cast<uint8_t *>(bufferOut);
outputBuff += sizeof(DataHeader);

size_t currentOutputSize = 0u;
size_t inputOffset = 0u;

if (sizeIn < thresholdSize)
{
throw std::invalid_argument("ERROR: invalid compressor " + compressor +
" check if supported by blosc build, in "
"call to ADIOS2 Blosc Compression\n");
/* disable compression */
useMemcpy = true;
}

blosc_set_nthreads(threads);
blosc_set_blocksize(blockSize);
if (!useMemcpy)
{
const int result = blosc_set_compressor(compressor.c_str());
if (result == -1)
{
throw std::invalid_argument(
"ERROR: invalid compressor " + compressor +
" check if supported by blosc build, in "
"call to ADIOS2 Blosc Compression\n");
}
blosc_set_nthreads(threads);
blosc_set_blocksize(blockSize);

uint32_t chunk = 0;
for (; inputOffset < sizeIn; ++chunk)
{
size_t inputChunkSize =
std::min(sizeIn - inputOffset,
static_cast<size_t>(BLOSC_MAX_BUFFERSIZE));
bloscSize_t maxIntputSize =
static_cast<bloscSize_t>(inputChunkSize);

bloscSize_t maxChunkSize = maxIntputSize + BLOSC_MAX_OVERHEAD;

const uint8_t *in_ptr = inputDataBuff + inputOffset;
uint8_t *out_ptr = outputBuff + currentOutputSize;

const int compressedSize =
blosc_compress(compressionLevel, doShuffle, elementSize, sizeIn, dataIn,
bufferOut, sizeIn);
bloscSize_t compressedChunkSize =
blosc_compress(compressionLevel, doShuffle, typesize,
maxIntputSize, in_ptr, out_ptr, maxChunkSize);

if (compressedChunkSize > 0)
currentOutputSize += static_cast<size_t>(compressedChunkSize);
else
{
// something went wrong with the compression switch to memcopy
useMemcpy = true;
break;
}
/* add size to written output data */
inputOffset += static_cast<size_t>(maxIntputSize);
}

if (!useMemcpy)
{
// validate that all bytes are compressed
assert(inputOffset == sizeIn);
headerPtr->SetNumChunks(chunk);
}
}

if (compressedSize <= 0)
if (useMemcpy)
{
throw std::invalid_argument(
"ERROR: from blosc_compress return size: " +
std::to_string(compressedSize) +
", check operator parameters, "
" compression failed in ADIOS2 Blosc Compression\n");
std::memcpy(outputBuff, inputDataBuff, sizeIn);
currentOutputSize = sizeIn;
headerPtr->SetNumChunks(0u);
}

blosc_destroy();
return static_cast<size_t>(compressedSize);
return currentOutputSize + sizeof(DataHeader);
}

size_t CompressBlosc::Decompress(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const
{
assert(sizeIn >= sizeof(DataHeader));
const bool isChunked =
reinterpret_cast<const DataHeader *>(bufferIn)->IsChunked();

size_t decompressedSize = 0u;
if (isChunked)
decompressedSize =
DecompressChunkedFormat(bufferIn, sizeIn, dataOut, sizeOut, info);
else
decompressedSize =
DecompressOldFormat(bufferIn, sizeIn, dataOut, sizeOut, info);

return decompressedSize;
}

size_t CompressBlosc::DecompressChunkedFormat(const void *bufferIn,
const size_t sizeIn,
void *dataOut,
const size_t sizeOut,
Params &info) const
{
const DataHeader *dataPtr = reinterpret_cast<const DataHeader *>(bufferIn);
uint32_t num_chunks = dataPtr->GetNumChunks();
size_t inputDataSize = sizeIn - sizeof(DataHeader);

bool isCompressed = true;
if (num_chunks == 0)
isCompressed = false;

size_t inputOffset = 0u;
size_t currentOutputSize = 0u;

const uint8_t *inputDataBuff =
reinterpret_cast<const uint8_t *>(bufferIn) + sizeof(DataHeader);

size_t uncompressedSize = sizeOut;

if (isCompressed)
{
blosc_init();
uint8_t *outputBuff = reinterpret_cast<uint8_t *>(dataOut);

while (inputOffset < inputDataSize)
{
/* move over the size of the compressed data */
const uint8_t *in_ptr = inputDataBuff + inputOffset;

/** read the size of the compress block from the blosc meta data
*
* blosc meta data format (all little endian):
* - 1 byte blosc format version
* - 1 byte blosclz format version
* - 1 byte flags
* - 1 byte typesize
* - 4 byte uncompressed data size
* - 4 byte block size
* - 4 byte compressed data size
*
* we need only the compressed size ( source address + 12 byte)
*/
bloscSize_t max_inputDataSize =
*reinterpret_cast<const bloscSize_t *>(in_ptr + 12u);

uint8_t *out_ptr = outputBuff + currentOutputSize;

size_t outputChunkSize =
std::min(uncompressedSize - currentOutputSize,
static_cast<size_t>(BLOSC_MAX_BUFFERSIZE));
bloscSize_t max_output_size =
static_cast<bloscSize_t>(outputChunkSize);

bloscSize_t decompressdSize =
blosc_decompress(in_ptr, out_ptr, max_output_size);

if (decompressdSize > 0)
currentOutputSize += static_cast<size_t>(decompressdSize);
else
{
throw std::runtime_error(
"ERROR: ADIOS2 Blosc Decompress failed. Decompressed chunk "
"results in zero decompressed bytes.\n");
}
inputOffset += static_cast<size_t>(max_inputDataSize);
}
blosc_destroy();
}
else
{
std::memcpy(dataOut, inputDataBuff, inputDataSize);
currentOutputSize = inputDataSize;
inputOffset += inputDataSize;
}

assert(currentOutputSize == uncompressedSize);
assert(inputOffset == inputDataSize);

return currentOutputSize;
}

size_t CompressBlosc::DecompressOldFormat(const void *bufferIn,
const size_t sizeIn, void *dataOut,
const size_t sizeOut,
Params &info) const
{
blosc_init();
const int decompressedSize = blosc_decompress(bufferIn, dataOut, sizeOut);
Expand Down
50 changes: 49 additions & 1 deletion source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*
* Created on: Jun 18, 2019
* Author: William F Godoy godoywf@ornl.gov
* Rene Widera r.widera@hzdr.de
*/

#ifndef ADIOS2_OPERATOR_COMPRESS_COMPRESSBLOSC_H_
Expand Down Expand Up @@ -34,12 +35,15 @@ class CompressBlosc : public Operator

~CompressBlosc() = default;

size_t BufferMaxSize(const size_t sizeIn) const final;

/**
* Compression signature for legacy libraries that use void*
* @param dataIn
* @param dimensions
* @param type
* @param bufferOut
* @param bufferOut format will be: 'DataHeader ; (BloscCompressedChunk |
* UncompressedData), [ BloscCompressedChunk, ...]'
* @param parameters
* @return size of compressed buffer in bytes
*/
Expand All @@ -60,6 +64,50 @@ class CompressBlosc : public Operator
const size_t sizeOut, Params &info) const final;

private:
using bloscSize_t = int32_t;

/** Decompress chunked data */
size_t DecompressChunkedFormat(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const;

/** Decompress data written before ADIOS2 supported large variables larger
* 2GiB. */
size_t DecompressOldFormat(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const;

class DataHeader
{
/** compatible to the first 4 byte of blosc header
*
* blosc meta data format (all little endian):
* - 1 byte blosc format version
* - 1 byte blosclz format version
* - 1 byte flags
* - 1 byte typesize
*
* If zero we writing the new adios blosc format which can handle more
* than 2GiB data chunks.
*/
uint32_t format = 0u;
/** number of blosc chunks within the data blob
*
* If zero the data is not compressed and must be decompressed by using
* 'memcpy'
*/
uint32_t numberOfChunks = 0u;

public:
void SetNumChunks(const uint32_t numChunks)
{
numberOfChunks = numChunks;
}
uint32_t GetNumChunks() const { return numberOfChunks; }

bool IsChunked() const { return format == 0; }
};

static const std::map<std::string, uint32_t> m_Shuffles;
static const std::set<std::string> m_Compressors;
};
Expand Down

0 comments on commit ae5dc35

Please sign in to comment.