Skip to content

Commit

Permalink
insert_deduplication_token setting for INSERT statement
Browse files Browse the repository at this point in the history
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated

Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same

Issue: ClickHouse#7461
  • Loading branch information
devcrafter committed Dec 19, 2021
1 parent fa011c6 commit 100ee92
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class IColumn;
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

Expand Down
12 changes: 10 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1586,13 +1586,21 @@ String IMergeTreeDataPart::getUniqueId() const
}


String IMergeTreeDataPart::getZeroLevelPartBlockID() const
String IMergeTreeDataPart::getZeroLevelPartBlockID(const String& token) const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);

SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
if (token.empty())
{
checksums.computeTotalChecksumDataOnly(hash);
}
else
{
hash.update(token);
}

union
{
char bytes[16];
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
bool isEmpty() const { return rows_count == 0; }

/// Compute part block id for zero level part. Otherwise throws an exception.
String getZeroLevelPartBlockID() const;
/// If token is not empty, block id is calculated based on it instead of block data
String getZeroLevelPartBlockID(const String & token = String()) const;

const MergeTreeData & storage;

Expand Down
10 changes: 9 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,16 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
block_id = part->getZeroLevelPartBlockID();

String block_dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!block_dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token += fmt::format("_{}", chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
block_id = part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}
else
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ class ReplicatedMergeTreeSink : public SinkToStorage

bool is_attach = false;
bool quorum_parallel = false;
bool deduplicate = true;
const bool deduplicate = true;
bool last_block_is_duplicate = false;

using Logger = Poco::Logger;
Poco::Logger * log;

ContextPtr context;
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE table = 'block_dedup_token' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{table}', '{replica}') ORDER BY id"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC"
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
create replica 1 and check deduplication
two inserts with exact data, one inserted, one deduplicated by data digest
1 1001
two inserts with the same dedup token, one inserted, one deduplicated by the token
1 1001
1 1001
reset deduplication token and insert new row
1 1001
1 1001
2 1002
create replica 2 and check deduplication
inserted value deduplicated by data digest, the same result as before
1 1001
1 1001
2 1002
inserted value deduplicated by dedup token, the same result as before
1 1001
1 1001
2 1002
new record inserted by providing new deduplication token
1 1001
1 1001
2 1002
2 1002
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- insert data duplicates by providing deduplication token on insert

DROP TABLE IF EXISTS insert_dedup_token1 SYNC;
DROP TABLE IF EXISTS insert_dedup_token2 SYNC;

select 'create replica 1 and check deduplication';
CREATE TABLE insert_dedup_token1 (
id Int32, val UInt32
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/insert_dedup_token', 'r1') ORDER BY id;

select 'two inserts with exact data, one inserted, one deduplicated by data digest';
INSERT INTO insert_dedup_token1 VALUES(1, 1001);
INSERT INTO insert_dedup_token1 VALUES(1, 1001);
SELECT * FROM insert_dedup_token1 ORDER BY id;

select 'two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '1';
INSERT INTO insert_dedup_token1 VALUES(1, 1001);
INSERT INTO insert_dedup_token1 VALUES(2, 1002);
SELECT * FROM insert_dedup_token1 ORDER BY id;

select 'reset deduplication token and insert new row';
set insert_deduplication_token = '';
INSERT INTO insert_dedup_token1 VALUES(2, 1002);
SELECT * FROM insert_dedup_token1 ORDER BY id;

select 'create replica 2 and check deduplication';
CREATE TABLE insert_dedup_token2 (
id Int32, val UInt32
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{database}/insert_dedup_token', 'r2') ORDER BY id;

select 'inserted value deduplicated by data digest, the same result as before';
set insert_deduplication_token = '';
INSERT INTO insert_dedup_token2 VALUES(1, 1001); -- deduplicated by data digest
SELECT * FROM insert_dedup_token2 ORDER BY id;

select 'inserted value deduplicated by dedup token, the same result as before';
set insert_deduplication_token = '1';
INSERT INTO insert_dedup_token2 VALUES(3, 1003); -- deduplicated by dedup token
SELECT * FROM insert_dedup_token2 ORDER BY id;

select 'new record inserted by providing new deduplication token';
set insert_deduplication_token = '2';
INSERT INTO insert_dedup_token2 VALUES(2, 1002); -- inserted
SELECT * FROM insert_dedup_token2 ORDER BY id;

DROP TABLE insert_dedup_token1 SYNC;
DROP TABLE insert_dedup_token2 SYNC;

0 comments on commit 100ee92

Please sign in to comment.