Skip to content

Commit

Permalink
insert_deduplication_token support in non-replicated MergeTree
Browse files Browse the repository at this point in the history
  • Loading branch information
devcrafter committed Jan 9, 2022
1 parent 1e06420 commit 0857a8d
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 13 deletions.
5 changes: 3 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "IMergeTreeDataPart.h"

#include <optional>
#include <string_view>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -1615,7 +1616,7 @@ String IMergeTreeDataPart::getUniqueId() const
}


String IMergeTreeDataPart::getZeroLevelPartBlockID(const String& token) const
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);
Expand All @@ -1627,7 +1628,7 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(const String& token) const
}
else
{
hash.update(token);
hash.update(token.data(), token.size());
}

union
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar

/// Compute part block id for zero level part. Otherwise throws an exception.
/// If token is not empty, block id is calculated based on it instead of block data
String getZeroLevelPartBlockID(const String & token = String()) const;
String getZeroLevelPartBlockID(std::string_view token) const;

const MergeTreeData & storage;

Expand Down
20 changes: 15 additions & 5 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,12 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}


bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand All @@ -2433,7 +2438,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log))
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
return false;
}
if (!covered_parts.empty())
Expand All @@ -2445,8 +2450,13 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem


bool MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log)
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
std::unique_lock<std::mutex> & lock,
DataPartsVector * out_covered_parts,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand Down Expand Up @@ -2506,7 +2516,7 @@ bool MergeTreeData::renameTempPartAndReplace(
/// deduplication.
if (deduplication_log)
{
String block_id = part->getZeroLevelPartBlockID();
String block_id = part->getZeroLevelPartBlockID(deduplication_token);
auto res = deduplication_log->addPart(block_id, part_info);
if (!res.second)
{
Expand Down
17 changes: 13 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,12 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment = nullptr,
Transaction * out_transaction = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
Expand All @@ -508,9 +513,13 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);

MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/MergeTree/MergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void MergeTreeSink::onStart()
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
String block_dedup_token;

auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
Expand All @@ -31,8 +32,20 @@ void MergeTreeSink::consume(Chunk chunk)
if (!part)
continue;

if (storage.getDeduplicationLog())
{
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
}

/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog(), block_dedup_token))
{
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());

Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MergeTreeSink : public SinkToStorage
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
create and check deduplication
two inserts with exact data, one inserted, one deduplicated by data digest
0 1000
two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
1 1001
reset deduplication token and insert new row
0 1000
1 1001
1 1001
2 1002
34 changes: 34 additions & 0 deletions tests/queries/0_stateless/02124_insert_deduplication_token.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- insert data duplicates by providing deduplication token on insert

DROP TABLE IF EXISTS insert_dedup_token SYNC;

select 'create and check deduplication';
CREATE TABLE insert_dedup_token (
id Int32, val UInt32
) ENGINE=MergeTree() ORDER BY id
SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;

select 'two inserts with exact data, one inserted, one deduplicated by data digest';
INSERT INTO insert_dedup_token VALUES(0, 1000);
INSERT INTO insert_dedup_token VALUES(0, 1000);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x62';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x63';
-- set insert_deduplication_token = '2';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'reset deduplication token and insert new row';
set insert_deduplication_token = '';
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

DROP TABLE insert_dedup_token SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE table = 'block_dedup_token' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=MergeTree() ORDER BY id SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC"

0 comments on commit 0857a8d

Please sign in to comment.