diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 00ab0b73807f..e3cd83248feb 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -548,7 +548,7 @@ class IColumn; /** Experimental functions */ \ M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \ M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \ - + M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS. diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 9da12a2dca23..39045ef0aac1 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1586,13 +1586,21 @@ String IMergeTreeDataPart::getUniqueId() const } -String IMergeTreeDataPart::getZeroLevelPartBlockID() const +String IMergeTreeDataPart::getZeroLevelPartBlockID(const String& token) const { if (info.level != 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name); SipHash hash; - checksums.computeTotalChecksumDataOnly(hash); + if (token.empty()) + { + checksums.computeTotalChecksumDataOnly(hash); + } + else + { + hash.update(token); + } + union { char bytes[16]; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index a203d45aa25e..b6c160889369 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -170,7 +170,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_thisgetZeroLevelPartBlockID(); + String block_dedup_token = context->getSettingsRef().insert_deduplication_token; + if (!block_dedup_token.empty()) + { + /// multiple blocks can be inserted within the same insert query + /// an ordinal number is added to dedup token to generate a distinctive block id for each block + block_dedup_token += fmt::format("_{}", chunk_dedup_seqnum); + ++chunk_dedup_seqnum; + } + block_id = part->getZeroLevelPartBlockID(block_dedup_token); LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows()); } else diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h index 7df82fd397eb..300791ff25b1 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.h @@ -82,13 +82,14 @@ class ReplicatedMergeTreeSink : public SinkToStorage bool is_attach = false; bool quorum_parallel = false; - bool deduplicate = true; + const bool deduplicate = true; bool last_block_is_duplicate = false; using Logger = Poco::Logger; Poco::Logger * log; ContextPtr context; + UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token }; } diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.reference b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.reference new file mode 100644 index 000000000000..5cf6230fd857 --- /dev/null +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.reference @@ -0,0 +1,34 @@ +insert 2 blocks with dedup token, 1 row per block +2 +1 +2 +insert deduplicated by token +2 +1 +2 +insert the same data by providing different dedup token +4 +1 +1 +2 +2 +insert 4 blocks, 2 deduplicated, 2 inserted +6 +1 +1 +2 +2 +3 +4 +disable token based deduplication, insert the same data as with token +10 +1 +1 +1 +2 +2 +2 +3 +3 +4 +4 diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh new file mode 100755 index 000000000000..68b346d75916 --- /dev/null +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_multiple_blocks_replica.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE table = 'block_dedup_token' AND min_block_number == max_block_number;" +QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;" + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC" +$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{table}', '{replica}') ORDER BY id" + +$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'" +DEDUP_TOKEN='dedup1' +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" + +$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'" +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" + +$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'" +DEDUP_TOKEN='dedup2' +echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" + +$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'" +echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" + +$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'" +DEDUP_TOKEN='' +echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @- +$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED" + +$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC" diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_replica.reference b/tests/queries/0_stateless/02124_insert_deduplication_token_replica.reference new file mode 100644 index 000000000000..27691557d467 --- /dev/null +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_replica.reference @@ -0,0 +1,24 @@ +create replica 1 and check deduplication +two inserts with exact data, one inserted, one deduplicated by data digest +1 1001 +two inserts with the same dedup token, one inserted, one deduplicated by the token +1 1001 +1 1001 +reset deduplication token and insert new row +1 1001 +1 1001 +2 1002 +create replica 2 and check deduplication +inserted value deduplicated by data digest, the same result as before +1 1001 +1 1001 +2 1002 +inserted value deduplicated by dedup token, the same result as before +1 1001 +1 1001 +2 1002 +new record inserted by providing new deduplication token +1 1001 +1 1001 +2 1002 +2 1002 diff --git a/tests/queries/0_stateless/02124_insert_deduplication_token_replica.sql b/tests/queries/0_stateless/02124_insert_deduplication_token_replica.sql new file mode 100644 index 000000000000..28631d6d93ae --- /dev/null +++ b/tests/queries/0_stateless/02124_insert_deduplication_token_replica.sql @@ -0,0 +1,48 @@ +-- insert data duplicates by providing deduplication token on insert + +DROP TABLE IF EXISTS insert_dedup_token1 SYNC; +DROP TABLE IF EXISTS insert_dedup_token2 SYNC; + +select 'create replica 1 and check deduplication'; +CREATE TABLE insert_dedup_token1 ( + id Int32, val UInt32 +) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/insert_dedup_token', 'r1') ORDER BY id; + +select 'two inserts with exact data, one inserted, one deduplicated by data digest'; +INSERT INTO insert_dedup_token1 VALUES(1, 1001); +INSERT INTO insert_dedup_token1 VALUES(1, 1001); +SELECT * FROM insert_dedup_token1 ORDER BY id; + +select 'two inserts with the same dedup token, one inserted, one deduplicated by the token'; +set insert_deduplication_token = '1'; +INSERT INTO insert_dedup_token1 VALUES(1, 1001); +INSERT INTO insert_dedup_token1 VALUES(2, 1002); +SELECT * FROM insert_dedup_token1 ORDER BY id; + +select 'reset deduplication token and insert new row'; +set insert_deduplication_token = ''; +INSERT INTO insert_dedup_token1 VALUES(2, 1002); +SELECT * FROM insert_dedup_token1 ORDER BY id; + +select 'create replica 2 and check deduplication'; +CREATE TABLE insert_dedup_token2 ( + id Int32, val UInt32 +) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/insert_dedup_token', 'r2') ORDER BY id; + +select 'inserted value deduplicated by data digest, the same result as before'; +set insert_deduplication_token = ''; +INSERT INTO insert_dedup_token2 VALUES(1, 1001); -- deduplicated by data digest +SELECT * FROM insert_dedup_token2 ORDER BY id; + +select 'inserted value deduplicated by dedup token, the same result as before'; +set insert_deduplication_token = '1'; +INSERT INTO insert_dedup_token2 VALUES(3, 1003); -- deduplicated by dedup token +SELECT * FROM insert_dedup_token2 ORDER BY id; + +select 'new record inserted by providing new deduplication token'; +set insert_deduplication_token = '2'; +INSERT INTO insert_dedup_token2 VALUES(2, 1002); -- inserted +SELECT * FROM insert_dedup_token2 ORDER BY id; + +DROP TABLE insert_dedup_token1 SYNC; +DROP TABLE insert_dedup_token2 SYNC;