From 91a7d86ce994625dbda32cc3a29ffc6a9e9a9a26 Mon Sep 17 00:00:00 2001 From: Jason Wang <jason.ruonan.wang@gmail.com> Date: Mon, 27 Sep 2021 17:16:12 -0400 Subject: [PATCH] make zfp buffer self-contained and backward-compatible --- source/adios2/core/Operator.h | 42 +++++++++ .../operator/compress/CompressBZIP2.cpp | 4 +- .../operator/compress/CompressBlosc.cpp | 2 +- .../operator/compress/CompressMGARD.cpp | 4 +- .../adios2/operator/compress/CompressSZ.cpp | 4 +- .../operator/compress/CompressSirius.cpp | 6 +- .../adios2/operator/compress/CompressZFP.cpp | 85 ++++++++++++++++--- source/adios2/operator/compress/CompressZFP.h | 12 +++ 8 files changed, 139 insertions(+), 20 deletions(-) diff --git a/source/adios2/core/Operator.h b/source/adios2/core/Operator.h index 8a79e2c12e..033759d878 100644 --- a/source/adios2/core/Operator.h +++ b/source/adios2/core/Operator.h @@ -16,6 +16,7 @@ /// \cond EXCLUDE_FROM_DOXYGEN #include <cstring> #include <functional> +#include <iostream> #include <string> #include <vector> /// \endcond @@ -125,6 +126,47 @@ class Operator return ret; } + template <typename U> + void PutParameters(char *buffer, U &pos, const Params ¶meters) + { + uint8_t size = static_cast<uint8_t>(parameters.size()); + PutParameter(buffer, pos, size); + for (const auto &p : parameters) + { + size = static_cast<uint8_t>(p.first.size()); + PutParameter(buffer, pos, size); + + std::memcpy(buffer + pos, p.first.data(), size); + pos += size; + + size = static_cast<uint8_t>(p.second.size()); + PutParameter(buffer, pos, size); + + std::memcpy(buffer + pos, p.second.data(), size); + pos += size; + } + } + + template <typename U> + Params GetParameters(const char *buffer, U &pos) + { + Params ret; + uint8_t params = GetParameter<uint8_t>(buffer, pos); + for (uint8_t i = 0; i < params; ++i) + { + uint8_t size = GetParameter<uint8_t>(buffer, pos); + std::string key = + std::string(reinterpret_cast<const char *>(buffer + pos), size); + pos += size; + size = GetParameter<uint8_t>(buffer, pos); + std::string value = + std::string(reinterpret_cast<const char *>(buffer + pos), size); + pos += size; + ret[key] = value; + } + return ret; + } + private: void CheckCallbackType(const std::string type) const; }; diff --git a/source/adios2/operator/compress/CompressBZIP2.cpp b/source/adios2/operator/compress/CompressBZIP2.cpp index e861b71fa0..8ea686a761 100644 --- a/source/adios2/operator/compress/CompressBZIP2.cpp +++ b/source/adios2/operator/compress/CompressBZIP2.cpp @@ -126,8 +126,8 @@ size_t CompressBZIP2::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 4; // skip the first four bytes - size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset); - size_t batches = GetParameter<size_t>(bufferIn, bufferInOffset); + size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); + size_t batches = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); int small = 0; int verbosity = 0; diff --git a/source/adios2/operator/compress/CompressBlosc.cpp b/source/adios2/operator/compress/CompressBlosc.cpp index d91ede36c3..2d0b91b570 100644 --- a/source/adios2/operator/compress/CompressBlosc.cpp +++ b/source/adios2/operator/compress/CompressBlosc.cpp @@ -234,7 +234,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn, // DecompressV2 and keep this function for decompressing lagacy data. size_t bufferInOffset = 0; - size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset); + size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); if (sizeIn - bufferInOffset < sizeof(DataHeader)) { throw("corrupted blosc buffer header"); diff --git a/source/adios2/operator/compress/CompressMGARD.cpp b/source/adios2/operator/compress/CompressMGARD.cpp index cc07122018..1abc929139 100644 --- a/source/adios2/operator/compress/CompressMGARD.cpp +++ b/source/adios2/operator/compress/CompressMGARD.cpp @@ -133,11 +133,11 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 0; - const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset); + const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset); + blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); } const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressSZ.cpp b/source/adios2/operator/compress/CompressSZ.cpp index cd4a449b07..d0ecd02811 100644 --- a/source/adios2/operator/compress/CompressSZ.cpp +++ b/source/adios2/operator/compress/CompressSZ.cpp @@ -319,11 +319,11 @@ size_t CompressSZ::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 0; - const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset); + const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset); + blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); } const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressSirius.cpp b/source/adios2/operator/compress/CompressSirius.cpp index 87ff912a8d..33247d6ec5 100644 --- a/source/adios2/operator/compress/CompressSirius.cpp +++ b/source/adios2/operator/compress/CompressSirius.cpp @@ -101,16 +101,16 @@ size_t CompressSirius::DecompressV1(const char *bufferIn, const size_t sizeIn, // DecompressV2 and keep this function for decompressing lagacy data. size_t bufferInOffset = 0; - const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset); + const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); Dims blockStart(ndims); Dims blockCount(ndims); for (size_t i = 0; i < ndims; ++i) { - blockStart[i] = GetParameter<size_t>(bufferIn, bufferInOffset); + blockStart[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); } for (size_t i = 0; i < ndims; ++i) { - blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset); + blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); } const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset); diff --git a/source/adios2/operator/compress/CompressZFP.cpp b/source/adios2/operator/compress/CompressZFP.cpp index b1d1affadf..6a7a0a6565 100644 --- a/source/adios2/operator/compress/CompressZFP.cpp +++ b/source/adios2/operator/compress/CompressZFP.cpp @@ -28,13 +28,33 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart, Params &info) { - Dims convertedDims = ConvertDims(blockCount, type, 3); + const uint8_t bufferVersion = 1; + size_t bufferOutOffset = 0; + + // Universal operator metadata + PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz); + PutParameter(bufferOut, bufferOutOffset, bufferVersion); + bufferOutOffset += 2; + // Universal operator metadata end + const size_t ndims = blockCount.size(); + + // zfp V1 metadata + PutParameter(bufferOut, bufferOutOffset, ndims); + for (const auto &d : blockCount) + { + PutParameter(bufferOut, bufferOutOffset, d); + } + PutParameter(bufferOut, bufferOutOffset, type); + PutParameters(bufferOut, bufferOutOffset, parameters); + // zfp V1 metadata end + + Dims convertedDims = ConvertDims(blockCount, type, 3); zfp_field *field = GetZFPField(dataIn, convertedDims, type); zfp_stream *stream = GetZFPStream(convertedDims, type, parameters); size_t maxSize = zfp_stream_maximum_size(stream, field); // associate bitstream - bitstream *bitstream = stream_open(bufferOut, maxSize); + bitstream *bitstream = stream_open(bufferOut + bufferOutOffset, maxSize); zfp_stream_set_bit_stream(stream, bitstream); zfp_stream_rewind(stream); @@ -46,24 +66,41 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart, "size is 0, in call to Compress"); } + bufferOutOffset += sizeOut; + zfp_field_free(field); zfp_stream_close(stream); stream_close(bitstream); - return sizeOut; + return bufferOutOffset; } -size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, - char *dataOut, const DataType type, - const Dims &blockStart, const Dims &blockCount, - const Params ¶meters, Params &info) +size_t CompressZFP::DecompressV1(const char *bufferIn, const size_t sizeIn, + char *dataOut) { + // Do NOT remove even if the buffer version is updated. Data might be still + // in lagacy formats. This function must be kept for backward compatibility. + // If a newer buffer format is implemented, create another function, e.g. + // DecompressV2 and keep this function for decompressing lagacy data. + + size_t bufferInOffset = 0; + + const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); + Dims blockCount(ndims); + for (size_t i = 0; i < ndims; ++i) + { + blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset); + } + const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset); + const Params parameters = GetParameters(bufferIn, bufferInOffset); + Dims convertedDims = ConvertDims(blockCount, type, 3); zfp_field *field = GetZFPField(dataOut, convertedDims, type); zfp_stream *stream = GetZFPStream(convertedDims, type, parameters); // associate bitstream - bitstream *bitstream = stream_open(const_cast<char *>(bufferIn), sizeIn); + bitstream *bitstream = stream_open( + const_cast<char *>(bufferIn + bufferInOffset), sizeIn - bufferInOffset); zfp_stream_set_bit_stream(stream, bitstream); zfp_stream_rewind(stream); @@ -80,13 +117,41 @@ size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, zfp_stream_close(stream); stream_close(bitstream); - const size_t typeSizeBytes = helper::GetDataTypeSize(type); const size_t dataSizeBytes = - helper::GetTotalSize(convertedDims) * typeSizeBytes; + helper::GetTotalSize(convertedDims, helper::GetDataTypeSize(type)); return dataSizeBytes; } +size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn, + char *dataOut, const DataType /*type*/, + const Dims & /*blockStart*/, + const Dims & /*blockCount*/, + const Params & /*parameters*/, Params &info) +{ + size_t bufferInOffset = 1; // skip operator type + const uint8_t bufferVersion = + GetParameter<uint8_t>(bufferIn, bufferInOffset); + bufferInOffset += 2; // skip two reserved bytes + + if (bufferVersion == 1) + { + return DecompressV1(bufferIn + bufferInOffset, sizeIn - bufferInOffset, + dataOut); + } + else if (bufferVersion == 2) + { + // TODO: if a Version 2 zfp buffer is being implemented, put it here + // and keep the DecompressV1 routine for backward compatibility + } + else + { + throw("unknown zfp buffer version"); + } + + return 0; +} + bool CompressZFP::IsDataTypeValid(const DataType type) const { #define declare_type(T) \ diff --git a/source/adios2/operator/compress/CompressZFP.h b/source/adios2/operator/compress/CompressZFP.h index 4887455ac0..e5dfd02731 100644 --- a/source/adios2/operator/compress/CompressZFP.h +++ b/source/adios2/operator/compress/CompressZFP.h @@ -94,6 +94,18 @@ class CompressZFP : public Operator * @param hint extra exception information */ void CheckStatus(const int status, const std::string hint) const; + + /** + * Decompress function for V1 buffer. Do NOT remove even if the buffer + * version is updated. Data might be still in lagacy formats. This function + * must be kept for backward compatibility + * @param bufferIn : compressed data buffer (V1 only) + * @param sizeIn : number of bytes in bufferIn + * @param dataOut : decompressed data buffer + * @return : number of bytes in dataOut + */ + size_t DecompressV1(const char *bufferIn, const size_t sizeIn, + char *dataOut); }; } // end namespace compress