diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 69424c046a02..057d4ddf8dfd 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1644,7 +1644,7 @@ UInt32 IMergeTreeDataPart::getNumberOfRefereneces() const } -String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const +String IMergeTreeDataPart::getZeroLevelPartBlockID(const std::string_view token) const { if (info.level != 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 6db507247871..b8f0fc21da92 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2555,7 +2555,7 @@ bool MergeTreeData::renameTempPartAndReplace( /// deduplication. if (deduplication_log) { - String block_id = part->getZeroLevelPartBlockID(deduplication_token); + const String block_id = part->getZeroLevelPartBlockID(deduplication_token); auto res = deduplication_log->addPart(block_id, part_info); if (!res.second) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index c14672fe3826..dfa7d8186171 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -140,6 +140,7 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) checkQuorumPrecondition(zookeeper); auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context); + String block_dedup_token; for (auto & current_block : part_blocks) { @@ -161,12 +162,12 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk) /// We add the hash from the data and partition identifier to deduplication ID. /// That is, do not insert the same data to the same partition twice. - String block_dedup_token = context->getSettingsRef().insert_deduplication_token; - if (!block_dedup_token.empty()) + const String& dedup_token = context->getSettingsRef().insert_deduplication_token; + if (!dedup_token.empty()) { /// multiple blocks can be inserted within the same insert query /// an ordinal number is added to dedup token to generate a distinctive block id for each block - block_dedup_token += fmt::format("_{}", chunk_dedup_seqnum); + block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum); ++chunk_dedup_seqnum; } block_id = part->getZeroLevelPartBlockID(block_dedup_token); diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks.sh b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks.sh index b5f44794c609..04ccbda62354 100755 --- a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks.sh +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks.sh @@ -5,31 +5,32 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token' AND min_block_number == max_block_number;" QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;" +INSERT_BLOCK_SETTINGS="max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC" $CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=MergeTree() ORDER BY id SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;" $CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'" DEDUP_TOKEN='dedup1' -echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'" -echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'" DEDUP_TOKEN='dedup2' -echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'" -echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'" DEDUP_TOKEN='' -echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC" diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh index 928defd329f4..1c776263f78b 100755 --- a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh @@ -5,31 +5,32 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token_replica' AND min_block_number == max_block_number;" QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token_replica ORDER BY id;" +INSERT_BLOCK_SETTINGS="max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0" $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token_replica SYNC" $CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token_replica (id Int32) ENGINE=ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{table}', '{replica}') ORDER BY id" $CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'" DEDUP_TOKEN='dedup1' -echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'" -echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'" DEDUP_TOKEN='dedup2' -echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'" -echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'" DEDUP_TOKEN='' -echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&$INSERT_BLOCK_SETTINGS&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- $CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" $CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token_replica SYNC"