Skip to content

Commit

Permalink
Move the data copy logic outside the decompression for the inverse op…
Browse files Browse the repository at this point in the history
…erator
  • Loading branch information
anagainaru committed Feb 28, 2023
1 parent c6fb723 commit c5ea7e3
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 12 deletions.
12 changes: 10 additions & 2 deletions source/adios2/operator/OperatorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,23 @@ std::shared_ptr<Operator> MakeOperator(const std::string &type,
}

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op)
MemorySpace memSpace, std::shared_ptr<Operator> op)
{
Operator::OperatorType compressorType;
std::memcpy(&compressorType, bufferIn, 1);
if (op == nullptr || op->m_TypeEnum != compressorType)
{
op = MakeOperator(OperatorTypeToString(compressorType), {});
}
return op->InverseOperate(bufferIn, sizeIn, dataOut);
size_t sizeOut = op->InverseOperate(bufferIn, sizeIn, dataOut);
if (sizeOut == 0) // the inverse operator was not applied
{
size_t headerSize = op->GetHeaderSize();
sizeOut = sizeIn - headerSize;
helper::CopyContiguousMemory(bufferIn + headerSize, sizeOut, dataOut,
/*endianReverse*/ false, memSpace);
}
return sizeOut;
}

} // end namespace core
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/OperatorFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ std::shared_ptr<Operator> MakeOperator(const std::string &type,
const Params &parameters);

size_t Decompress(const char *bufferIn, const size_t sizeIn, char *dataOut,
std::shared_ptr<Operator> op = nullptr);
MemorySpace memSpace, std::shared_ptr<Operator> op = nullptr);

} // end namespace core
} // end namespace adios2
17 changes: 13 additions & 4 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ size_t CompressBlosc::InverseOperate(const char *bufferIn, const size_t sizeIn,
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes
headerSize += bufferInOffset;

if (bufferVersion == 1)
{
Expand Down Expand Up @@ -265,6 +266,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
bool isCompressed = true;

m_VersionInfo =
" Data is compressed using BLOSC Version " +
Expand Down Expand Up @@ -296,18 +298,26 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
DecompressOldFormat(bufferIn + bufferInOffset,
sizeIn - bufferInOffset, dataOut, sizeOut);
}
if (decompressedSize == 0) // the decompression was not applied
{
isCompressed = false;
decompressedSize = bufferDecompressedSize;
}
if (decompressedSize != sizeOut)
{
helper::Throw<std::runtime_error>("Operator", "CompressBlosc",
"DecompressV1", m_VersionInfo);
}
headerSize += sizeIn - sizeOut;
if (!isCompressed)
return 0; // indicate that the inverse operator was not applied
return sizeOut;
}

size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
const size_t sizeIn,
char *dataOut,
const size_t sizeOut) const
const size_t sizeOut)
{
const DataHeader *dataPtr = reinterpret_cast<const DataHeader *>(bufferIn);
uint32_t num_chunks = dataPtr->GetNumChunks();
Expand Down Expand Up @@ -388,9 +398,8 @@ size_t CompressBlosc::DecompressChunkedFormat(const char *bufferIn,
}
else
{
std::memcpy(dataOut, inputDataBuff, inputDataSize);
currentOutputSize = inputDataSize;
inputOffset += inputDataSize;
bufferDecompressedSize = inputDataSize;
return 0; // the inverse operator was not applied
}

assert(currentOutputSize == uncompressedSize);
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ class CompressBlosc : public Operator
private:
using bloscSize_t = int32_t;
size_t headerSize = 0;
size_t bufferDecompressedSize = 0;

/** Decompress chunked data */
size_t DecompressChunkedFormat(const char *bufferIn, const size_t sizeIn,
char *dataOut, const size_t sizeOut) const;
char *dataOut, const size_t sizeOut);

/** Decompress data written before ADIOS2 supported large variables larger
* 2GiB. */
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ void BP3Deserializer::PostDataRead(
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize,
preOpData, op);
preOpData, blockInfo.MemSpace, op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ void BP4Deserializer::PostDataRead(
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize,
preOpData, op);
preOpData, blockInfo.MemSpace, op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,7 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr)
core::Decompress(IncomingData,
((MetaArrayRecOperator *)writer_meta_base)
->DataBlockSize[Read.BlockID],
decompressBuffer.data());
decompressBuffer.data(), Req.MemSpace);
}
IncomingData = decompressBuffer.data();
VirtualIncomingData = IncomingData;
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName,
decompressBuffer.reserve(
helper::GetTotalSize(j.count, sizeof(T)));
core::Decompress(j.buffer->data() + j.position, j.size,
decompressBuffer.data());
decompressBuffer.data(), MemorySpace::Host);
decompressed = true;
input_data = decompressBuffer.data();
}
Expand Down

0 comments on commit c5ea7e3

Please sign in to comment.