From c5ea7e3dbd9b50c10d9ff2d688b695128f464ed1 Mon Sep 17 00:00:00 2001 From: anagainaru Date: Tue, 28 Feb 2023 16:00:21 -0500 Subject: [PATCH] Move the data copy logic outside the decompression for the inverse operator --- source/adios2/operator/OperatorFactory.cpp | 12 ++++++++++-- source/adios2/operator/OperatorFactory.h | 2 +- .../adios2/operator/compress/CompressBlosc.cpp | 17 +++++++++++++---- source/adios2/operator/compress/CompressBlosc.h | 3 ++- .../toolkit/format/bp/bp3/BP3Deserializer.tcc | 2 +- .../toolkit/format/bp/bp4/BP4Deserializer.tcc | 2 +- .../toolkit/format/bp5/BP5Deserializer.cpp | 2 +- .../format/dataman/DataManSerializer.tcc | 2 +- 8 files changed, 30 insertions(+), 12 deletions(-) diff --git a/source/adios2/operator/OperatorFactory.cpp b/source/adios2/operator/OperatorFactory.cpp index 1979aeeb02..f6a6ab7037 100644 --- a/source/adios2/operator/OperatorFactory.cpp +++ b/source/adios2/operator/OperatorFactory.cpp @@ -169,7 +169,7 @@ std::shared_ptr MakeOperator(const std::string &type, } size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut, - std::shared_ptr op) + MemorySpace memSpace, std::shared_ptr op) { Operator::OperatorType compressorType; std::memcpy(&compressorType, bufferIn, 1); @@ -177,7 +177,15 @@ size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut, { op = MakeOperator(OperatorTypeToString(compressorType), {}); } - return op->InverseOperate(bufferIn, sizeIn, dataOut); + size_t sizeOut = op->InverseOperate(bufferIn, sizeIn, dataOut); + if (sizeOut == 0) // the inverse operator was not applied + { + size_t headerSize = op->GetHeaderSize(); + sizeOut = sizeIn - headerSize; + helper::CopyContiguousMemory(bufferIn + headerSize, sizeOut, dataOut, + /*endianReverse*/ false, memSpace); + } + return sizeOut; } } // end namespace core diff --git a/source/adios2/operator/OperatorFactory.h b/source/adios2/operator/OperatorFactory.h index ccac962c38..ba93b35178 100644 --- a/source/adios2/operator/OperatorFactory.h +++ b/source/adios2/operator/OperatorFactory.h @@ -21,7 +21,7 @@ std::shared_ptr MakeOperator(const std::string &type, const Params ¶meters); size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut, - std::shared_ptr op = nullptr); + MemorySpace memSpace, std::shared_ptr op = nullptr); } // end namespace core } // end namespace adios2 diff --git a/source/adios2/operator/compress/CompressBlosc.cpp b/source/adios2/operator/compress/CompressBlosc.cpp index b2cdf1df7d..407dff9f3e 100644 --- a/source/adios2/operator/compress/CompressBlosc.cpp +++ b/source/adios2/operator/compress/CompressBlosc.cpp @@ -232,6 +232,7 @@ size_t CompressBlosc::InverseOperate(const char *bufferIn, const size_t sizeIn, const uint8_t bufferVersion = GetParameter(bufferIn, bufferInOffset); bufferInOffset += 2; // skip two reserved bytes + headerSize += bufferInOffset; if (bufferVersion == 1) { @@ -265,6 +266,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn, size_t bufferInOffset = 0; size_t sizeOut = GetParameter(bufferIn, bufferInOffset); + bool isCompressed = true; m_VersionInfo = " Data is compressed using BLOSC Version " + @@ -296,18 +298,26 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn, DecompressOldFormat(bufferIn + bufferInOffset, sizeIn - bufferInOffset, dataOut, sizeOut); } + if (decompressedSize == 0) // the decompression was not applied + { + isCompressed = false; + decompressedSize = bufferDecompressedSize; + } if (decompressedSize != sizeOut) { helper::Throw("Operator", "CompressBlosc", "DecompressV1", m_VersionInfo); } + headerSize += sizeIn - sizeOut; + if (!isCompressed) + return 0; // indicate that the inverse operator was not applied return sizeOut; } size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn, const size_t sizeIn, char *dataOut, - const size_t sizeOut) const + const size_t sizeOut) { const DataHeader *dataPtr = reinterpret_cast(bufferIn); uint32_t num_chunks = dataPtr->GetNumChunks(); @@ -388,9 +398,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn, } else { - std::memcpy(dataOut, inputDataBuff, inputDataSize); - currentOutputSize = inputDataSize; - inputOffset += inputDataSize; + bufferDecompressedSize = inputDataSize; + return 0; // the inverse operator was not applied } assert(currentOutputSize == uncompressedSize); diff --git a/source/adios2/operator/compress/CompressBlosc.h b/source/adios2/operator/compress/CompressBlosc.h index 0d7c662d6b..3e1d8d273b 100644 --- a/source/adios2/operator/compress/CompressBlosc.h +++ b/source/adios2/operator/compress/CompressBlosc.h @@ -72,10 +72,11 @@ class CompressBlosc : public Operator private: using bloscSize_t = int32_t; size_t headerSize = 0; + size_t bufferDecompressedSize = 0; /** Decompress chunked data */ size_t DecompressChunkedFormat(const char *bufferIn, const size_t sizeIn, - char *dataOut, const size_t sizeOut) const; + char *dataOut, const size_t sizeOut); /** Decompress data written before ADIOS2 supported large variables larger * 2GiB. */ diff --git a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc index c07829db6b..afbda7c8c8 100644 --- a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc @@ -566,7 +566,7 @@ void BP3Deserializer::PostDataRead( } } core::Decompress(postOpData, blockOperationInfo.PayloadSize, - preOpData, op); + preOpData, blockInfo.MemSpace, op); // clip block to match selection helper::ClipVector(m_ThreadBuffers[threadID][0], diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc index cace837be8..c1494afbfb 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc @@ -570,7 +570,7 @@ void BP4Deserializer::PostDataRead( } } core::Decompress(postOpData, blockOperationInfo.PayloadSize, - preOpData, op); + preOpData, blockInfo.MemSpace, op); // clip block to match selection helper::ClipVector(m_ThreadBuffers[threadID][0], diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index 8519c50208..857e366d14 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -1773,7 +1773,7 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr) core::Decompress(IncomingData, ((MetaArrayRecOperator *)writer_meta_base) ->DataBlockSize[Read.BlockID], - decompressBuffer.data()); + decompressBuffer.data(), Req.MemSpace); } IncomingData = decompressBuffer.data(); VirtualIncomingData = IncomingData; diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc index d8da75cdf2..535e0a00cc 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc @@ -252,7 +252,7 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName, decompressBuffer.reserve( helper::GetTotalSize(j.count, sizeof(T))); core::Decompress(j.buffer->data() + j.position, j.size, - decompressBuffer.data()); + decompressBuffer.data(), MemorySpace::Host); decompressed = true; input_data = decompressBuffer.data(); }