Skip to content

Commit

Permalink
[performance] Compress windows in-memory
Browse files Browse the repository at this point in the history
This reduces maximum memory usage from 22 GB to 12 GB when decompressing
without an index, and from 13 GB to 2.5 GB when using an existing index.
The execution time is mostly unaffected when importing an index but
when creating an index, it increases from 500 s to 615 s!
This probably can be reduced again by doing the window compression
on the worker threads.
There also is overhead for writing the index, which should also reduce
again when writing out the compressed windows to the index.
The index CRC32 is also unchanged by this change.

After:

    /usr/bin/time -v src/tools/rapidgzip -v -P 24 \
        --export-index index <( fcat wikidata-20220103-all.json.gz )

        Time: 617.55 s, Bandwidth: 2312.94 MB/s, Max RSS: 11.95 GB
        exportIndex took /dev/shm/: 10.7339 s, QLC SSD: 126.635 s
        CRC32 index: 387ce305
        Index size: 11 161 141 317 B
        Index write bandwidth: /dev/shm/: 1040 MB/s, QLC SSD: 88.1 MB/s

    /usr/bin/time -v src/tools/rapidgzip -v -P 24 -d -o /dev/null
        --import-index index <( fcat wikidata-20220103-all.json.gz )

        Time: 205.185 s, Bandwidth: 6961.29 MB/s, Max RSS: 2.47 GB
        importIndex took /dev/shm/: 14.2654 s , QLC SSD: 14.5275

    Without rpmalloc:
    /usr/bin/time -v src/tools/rapidgzip -v -P 24 -d -o /dev/null
        --import-index index <( fcat wikidata-20220103-all.json.gz )

        Time: 354.215 s, Bandwidth: 4032.45 MB/s, Max RSS: 2.46 GB
        importIndex took /dev/shm/: 54.2631 s, QLC SSD:

Before:

    /usr/bin/time -v src/tools/rapidgzip -v -P 24 \
        --export-index index <( fcat wikidata-20220103-all.json.gz )

        Time: 501.487 s, Bandwidth: 2848.24 MB/s, Max RSS: 22.11 GB
        exportIndex took /dev/shm/: 4.14887 s, QLC SSD: 106.954 s
        CRC32 index: 387ce305
        Index size: 11 161 141 317 B
        Index write bandwidth: /dev/shm/: 2690 MB/s, QLC SSD: 104.4 MB/s

    /usr/bin/time -v src/tools/rapidgzip -v -P 24 -d -o /dev/null
        --import-index index <( fcat wikidata-20220103-all.json.gz )

        Time: 202.629 s, Bandwidth: 7049.12 MB/s, Max RSS: 13.22 GB
        importIndex took /dev/shm/: 6.61642 s, QLC SSD: 9.06049 s
  • Loading branch information
mxmlnkn committed Feb 4, 2024
1 parent f378587 commit 9edb3b9
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 70 deletions.
8 changes: 8 additions & 0 deletions src/rapidgzip/ChunkData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include <crc32.hpp>
#include <CompressedVector.hpp>
#include <DecodedData.hpp>
#include <gzip.hpp>

Expand Down Expand Up @@ -155,6 +156,13 @@ struct ChunkData :
ChunkData& operator=( ChunkData&& ) = default;
ChunkData& operator=( const ChunkData& ) = delete;

[[nodiscard]] CompressionType
windowCompressionType() const
{
/* Only bother with overhead-introducing compression for large chunk compression ratios. */
return decodedSizeInBytes * 8 > 2 * encodedSizeInBits ? CompressionType::GZIP : CompressionType::NONE;
}

void
append( deflate::DecodedVector&& toAppend )
{
Expand Down
6 changes: 6 additions & 0 deletions src/rapidgzip/CompressedVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class CompressedVector
&& ( m_decompressedSize == other.m_decompressedSize );
}

[[nodiscard]] bool
operator!=( const CompressedVector& other ) const
{
return !( *this == other );
}

private:
CompressionType m_compressionType{ CompressionType::GZIP };
std::shared_ptr<const Container> m_data;
Expand Down
106 changes: 55 additions & 51 deletions src/rapidgzip/GzipChunkFetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class GzipChunkFetcher :
using BaseType = BlockFetcher<GzipBlockFinder, ChunkData, FetchingStrategy>;
using BitReader = rapidgzip::BitReader;
using SharedWindow = WindowMap::SharedWindow;
using SharedDecompressedWindow = std::shared_ptr<const FasterVector<uint8_t> >;
using WindowView = VectorView<uint8_t>;
using BlockFinder = typename BaseType::BlockFinder;
using PostProcessingFutures = std::map</* block offset */ size_t, std::future<void> >;

static constexpr bool REPLACE_MARKERS_IN_PARALLEL = true;

Expand Down Expand Up @@ -118,7 +120,7 @@ class GzipChunkFetcher :
if ( !firstBlockInStream ) {
throw std::logic_error( "The block finder is required to find the first block itself!" );
}
m_windowMap->emplace( *firstBlockInStream, {} );
m_windowMap->emplace( *firstBlockInStream, {}, CompressionType::NONE );
}

/* Choose default value for CRC32 enable flag. Can still be overwritten by the setter. */
Expand Down Expand Up @@ -310,7 +312,7 @@ class GzipChunkFetcher :
<< formatBits( chunkData->encodedOffsetInBits ) << ", "
<< formatBits( chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits ) << "].\n"
<< " Window size for the chunk offset: "
<< ( lastWindow ? std::to_string( lastWindow->size() ) : "no window" ) << ".";
<< ( lastWindow ? std::to_string( lastWindow->decompressedSize() ) : "no window" ) << ".";
throw std::logic_error( std::move( message ).str() );
}

Expand All @@ -337,27 +339,33 @@ class GzipChunkFetcher :

auto chunkData = getBlock( *nextBlockOffset, m_nextUnprocessedBlockIndex );

/* Should only happen when encountering EOF during decodeBlock call. */
if ( chunkData->encodedSizeInBits == 0 ) {
m_blockMap->finalize();
m_blockFinder->finalize();
return {};
}

/* Because this is a new block, it might contain markers that we have to replace with the window
* of the last block. The very first block should not contain any markers, ensuring that we
* can successively propagate the window through all blocks. */
auto sharedLastWindow = m_windowMap->get( chunkData->encodedOffsetInBits );
auto sharedLastWindow = m_windowMap->get( *nextBlockOffset );
if ( !sharedLastWindow ) {
std::stringstream message;
message << "The window of the last block at " << formatBits( chunkData->encodedOffsetInBits )
message << "The window of the last block at " << formatBits( *nextBlockOffset )
<< " should exist at this point!";
throw std::logic_error( std::move( message ).str() );
}
const auto& lastWindow = *sharedLastWindow;
const auto lastWindow = sharedLastWindow->decompress();

postProcessChunk( chunkData, lastWindow );
appendSubchunksToIndexes( chunkData, chunkData->subchunks, lastWindow );

/* Care has to be taken that we store the correct block offset not the speculative possible range!
* This call corrects encodedSizeInBits, which only contains a guess from finalize().
* This should only be called after post-processing has finished because encodedSizeInBits is also
* used in windowCompressionType() during post-processing to compress the windows. */
chunkData->setEncodedOffset( *nextBlockOffset );
/* Should only happen when encountering EOF during decodeBlock call. */
if ( chunkData->encodedSizeInBits == 0 ) {
m_blockMap->finalize();
m_blockFinder->finalize();
return {};
}

appendSubchunksToIndexes( chunkData, chunkData->subchunks, *lastWindow );

m_statistics.merge( *chunkData );

Expand Down Expand Up @@ -427,25 +435,26 @@ class GzipChunkFetcher :
const auto windowOffset = subchunk.encodedOffset + subchunk.encodedSize;
/* Avoid recalculating what we already emplaced in waitForReplacedMarkers when calling getLastWindow. */
if ( !m_windowMap->get( windowOffset ) ) {
m_windowMap->emplace( windowOffset, chunkData->getWindowAt( lastWindow, decodedOffsetInBlock ) );
m_windowMap->emplace( windowOffset, chunkData->getWindowAt( lastWindow, decodedOffsetInBlock ),
chunkData->windowCompressionType() );
}
}
}

void
postProcessChunk( const std::shared_ptr<ChunkData>& chunkData,
const FasterVector<uint8_t>& lastWindow )
const SharedDecompressedWindow& lastWindow )
{
if constexpr ( REPLACE_MARKERS_IN_PARALLEL ) {
waitForReplacedMarkers( chunkData, lastWindow );
} else {
replaceMarkers( chunkData, lastWindow );
replaceMarkers( chunkData, *lastWindow );
}
}

void
waitForReplacedMarkers( const std::shared_ptr<ChunkData>& chunkData,
const WindowView lastWindow )
const SharedDecompressedWindow& lastWindow )
{
using namespace std::chrono_literals;

Expand All @@ -458,17 +467,7 @@ class GzipChunkFetcher :
std::optional<std::future<void> > queuedFuture;
if ( markerReplaceFuture == m_markersBeingReplaced.end() ) {
/* First, we need to emplace the last window or else we cannot queue further blocks. */
const auto windowOffset = chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits;
if ( !m_windowMap->get( windowOffset ) ) {
m_windowMap->emplace( windowOffset, chunkData->getLastWindow( lastWindow ) );
}

markerReplaceFuture = m_markersBeingReplaced.emplace(
chunkData->encodedOffsetInBits,
this->submitTaskWithHighPriority(
[this, chunkData, lastWindow] () { replaceMarkers( chunkData, lastWindow ); }
)
).first;
markerReplaceFuture = queueChunkForPostProcessing( chunkData, lastWindow );
}

/* Check other enqueued marker replacements whether they are finished. */
Expand Down Expand Up @@ -521,26 +520,35 @@ class GzipChunkFetcher :
if ( !sharedPreviousWindow ) {
continue;
}
const auto& previousWindow = *sharedPreviousWindow;

const auto windowOffset = chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits;
if ( !m_windowMap->get( windowOffset ) ) {
m_windowMap->emplace( windowOffset, chunkData->getLastWindow( previousWindow ) );
}
queueChunkForPostProcessing( chunkData, sharedPreviousWindow->decompress() );
}
}

m_markersBeingReplaced.emplace(
chunkData->encodedOffsetInBits,
this->submitTaskWithHighPriority(
[this, chunkData, sharedPreviousWindow] () { replaceMarkers( chunkData, *sharedPreviousWindow ); }
)
);
PostProcessingFutures::iterator
queueChunkForPostProcessing( const std::shared_ptr<ChunkData>& chunkData,
SharedDecompressedWindow previousWindow )
{
const auto windowOffset = chunkData->encodedOffsetInBits + chunkData->encodedSizeInBits;
if ( !m_windowMap->get( windowOffset ) ) {
/* The last window is always inserted into the window map by the main thread because else
* it wouldn't be able queue the next chunk for post-processing in parallel. This is the critical
* path that cannot be parallelized. Therefore, do not compress the last window to save time. */
m_windowMap->emplace( windowOffset, chunkData->getLastWindow( *previousWindow ), CompressionType::NONE );
}

return m_markersBeingReplaced.emplace(
chunkData->encodedOffsetInBits,
this->submitTaskWithHighPriority(
[chunkData, window = std::move( previousWindow )] () {
replaceMarkers( chunkData, *window );
} ) ).first;
}

/**
* Must be thread-safe because it is submitted to the thread pool.
*/
void
static void
replaceMarkers( const std::shared_ptr<ChunkData>& chunkData,
const WindowView previousWindow )
{
Expand Down Expand Up @@ -642,8 +650,6 @@ class GzipChunkFetcher :
throw std::logic_error( std::move( message ).str() );
}

/* Care has to be taken that we store the correct block offset not the speculative possible range! */
chunkData->setEncodedOffset( blockOffset );
return chunkData;
}

Expand Down Expand Up @@ -710,14 +716,14 @@ class GzipChunkFetcher :
#endif

const auto fileSize = originalBitReader.size();
const auto& window = *initialWindow;
const auto window = initialWindow->decompress();

auto configuration = chunkDataConfiguration;
configuration.encodedOffsetInBits = blockOffset;
auto result = decodeBlockWithInflateWrapper<InflateWrapper>(
originalBitReader,
fileSize ? std::min( untilOffset, *fileSize ) : untilOffset,
window,
*window,
decodedSize,
configuration );

Expand All @@ -731,7 +737,7 @@ class GzipChunkFetcher :
<< " Decoded size : " << result.decodedSizeInBytes << " B\n"
<< " Expected decoded size : " << *decodedSize << " B\n"
<< " Until offset is exact : " << untilOffsetIsExact << "\n"
<< " Initial Window : " << std::to_string( window.size() ) << "\n";
<< " Initial Window : " << std::to_string( window->size() ) << "\n";
throw std::runtime_error( std::move( message ).str() );
}

Expand All @@ -741,8 +747,8 @@ class GzipChunkFetcher :
BitReader bitReader( originalBitReader );
if ( initialWindow ) {
bitReader.seek( blockOffset );
const auto& window = *initialWindow;
return decodeBlockWithRapidgzip( &bitReader, untilOffset, window, maxDecompressedChunkSize,
const auto window = initialWindow->decompress();
return decodeBlockWithRapidgzip( &bitReader, untilOffset, *window, maxDecompressedChunkSize,
chunkDataConfiguration );
}

Expand Down Expand Up @@ -1301,9 +1307,7 @@ class GzipChunkFetcher :

#ifdef WITH_ISAL
if ( cleanDataCount >= deflate::MAX_WINDOW_SIZE ) {
const deflate::DecodedVector window = result.getLastWindow( {} );
return finishDecodeBlockWithIsal( bitReader, untilOffset,
VectorView<uint8_t>( window.data(), window.size() ),
return finishDecodeBlockWithIsal( bitReader, untilOffset, result.getLastWindow( {} ),
maxDecompressedChunkSize, std::move( result ) );
}
#endif
Expand Down Expand Up @@ -1460,6 +1464,6 @@ class GzipChunkFetcher :
/* This is necessary when blocks have been split in order to find and reuse cached unsplit chunks. */
std::unordered_map</* block offset */ size_t, /* block offset of unsplit "parent" chunk */ size_t> m_unsplitBlocks;

std::map</* block offset */ size_t, std::future<void> > m_markersBeingReplaced;
PostProcessingFutures m_markersBeingReplaced;
};
} // namespace rapidgzip
33 changes: 26 additions & 7 deletions src/rapidgzip/IndexFileFormat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdexcept>
#include <string>
#include <string_view>
#include <tuple>
#include <vector>

#include <blockfinder/Bgzf.hpp>
Expand Down Expand Up @@ -319,7 +320,7 @@ readGzipIndex( UniqueFileReader indexFile,
}

/* Emplace an empty window to show that the block does not need data. */
index.windows->emplace( checkpoint.compressedOffsetInBits, {} );
index.windows->emplace( checkpoint.compressedOffsetInBits, {}, CompressionType::NONE );
}

try {
Expand Down Expand Up @@ -400,7 +401,8 @@ readGzipIndex( UniqueFileReader indexFile,
}
const auto checkpointCount = readValue<uint32_t>( indexFile.get() );

std::vector<std::pair</* encoded offset */ size_t, /* window size */ size_t> > windowInfos;
std::vector<std::tuple</* encoded offset */ size_t, /* window size */ size_t,
/* compression ratio */ double> > windowInfos;

index.checkpoints.resize( checkpointCount );
for ( uint32_t i = 0; i < checkpointCount; ++i ) {
Expand Down Expand Up @@ -439,17 +441,29 @@ readGzipIndex( UniqueFileReader indexFile,
windowSize = index.windowSizeInBytes;
}
}
windowInfos.emplace_back( checkpoint.compressedOffsetInBits, windowSize );

auto compressionRatio = 1.0;
if ( i >= 1 ) {
const auto& previousCheckpoint = index.checkpoints[i - 1];
compressionRatio = static_cast<double>( checkpoint.uncompressedOffsetInBytes
- previousCheckpoint.uncompressedOffsetInBytes ) * 8
/ static_cast<double>( checkpoint.compressedOffsetInBits
- previousCheckpoint.compressedOffsetInBits );
}
windowInfos.emplace_back( checkpoint.compressedOffsetInBits, windowSize, compressionRatio );
}

index.windows = std::make_shared<WindowMap>();
for ( auto& [offset, windowSize] : windowInfos ) {
for ( auto& [offset, windowSize, compressionRatio] : windowInfos ) {
FasterVector<uint8_t> window;
if ( windowSize > 0 ) {
window.resize( windowSize );
checkedRead( indexFile.get(), window.data(), window.size() );
}
index.windows->emplace( offset, std::move( window ) );

/* Only bother with overhead-introducing compression for large chunk compression ratios. */
index.windows->emplace( offset, std::move( window ),
compressionRatio > 2 ? CompressionType::GZIP : CompressionType::NONE );
}

return index;
Expand Down Expand Up @@ -483,7 +497,7 @@ writeGzipIndex( const GzipIndex& in

if ( !std::all_of( checkpoints.begin(), checkpoints.end(), [&index, windowSizeInBytes] ( const auto& checkpoint ) {
const auto window = index.windows->get( checkpoint.compressedOffsetInBits );
return window && ( window->empty() || ( window->size() >= windowSizeInBytes ) );
return window && ( window->empty() || ( window->decompressedSize() >= windowSizeInBytes ) );
} ) )
{
throw std::invalid_argument( "All window sizes must be at least 32 KiB or empty!" );
Expand Down Expand Up @@ -527,7 +541,12 @@ writeGzipIndex( const GzipIndex& in
continue;
}

const auto window = *result;
const auto windowPointer = result->decompress();
if ( !windowPointer ) {
continue;
}

const auto& window = *windowPointer;
if ( window.empty() ) {
continue;
}
Expand Down
6 changes: 0 additions & 6 deletions src/rapidgzip/ParallelGzipReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,6 @@ class ParallelGzipReader final :
Checkpoint checkpoint;
checkpoint.compressedOffsetInBits = compressedOffsetInBits;
checkpoint.uncompressedOffsetInBytes = uncompressedOffsetInBytes;

const auto window = m_windowMap->get( compressedOffsetInBits );
if ( !window ) {
throw std::logic_error( "Did not find window to offset " + formatBits( compressedOffsetInBits ) );
}

index.checkpoints.emplace_back( std::move( checkpoint ) );
}

Expand Down
Loading

0 comments on commit 9edb3b9

Please sign in to comment.