Skip to content

Commit

Permalink
Merge pull request ClickHouse#32304 from devcrafter/deduplication_tok…
Browse files Browse the repository at this point in the history
…en_7461

insert_deduplication_token setting for INSERT statement
  • Loading branch information
alexey-milovidov authored Jan 28, 2022
2 parents 484743e + 6bcac01 commit f6684db
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class IColumn;
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

Expand Down
13 changes: 11 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "IMergeTreeDataPart.h"

#include <optional>
#include <string_view>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -1630,13 +1631,21 @@ UInt32 IMergeTreeDataPart::getNumberOfRefereneces() const
}


String IMergeTreeDataPart::getZeroLevelPartBlockID() const
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);

SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
if (token.empty())
{
checksums.computeTotalChecksumDataOnly(hash);
}
else
{
hash.update(token.data(), token.size());
}

union
{
char bytes[16];
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
bool isEmpty() const { return rows_count == 0; }

/// Compute part block id for zero level part. Otherwise throws an exception.
String getZeroLevelPartBlockID() const;
/// If token is not empty, block id is calculated based on it instead of block data
String getZeroLevelPartBlockID(std::string_view token) const;

const MergeTreeData & storage;

Expand Down
20 changes: 15 additions & 5 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2442,7 +2442,12 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}


bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand All @@ -2451,7 +2456,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log))
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
return false;
}
if (!covered_parts.empty())
Expand All @@ -2463,8 +2468,13 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem


bool MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log)
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
std::unique_lock<std::mutex> & lock,
DataPartsVector * out_covered_parts,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand Down Expand Up @@ -2526,7 +2536,7 @@ bool MergeTreeData::renameTempPartAndReplace(
/// deduplication.
if (deduplication_log)
{
String block_id = part->getZeroLevelPartBlockID();
String block_id = part->getZeroLevelPartBlockID(deduplication_token);
auto res = deduplication_log->addPart(block_id, part_info);
if (!res.second)
{
Expand Down
17 changes: 13 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,12 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment = nullptr,
Transaction * out_transaction = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
Expand All @@ -502,9 +507,13 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);

MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/MergeTree/MergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void MergeTreeSink::onStart()
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
String block_dedup_token;

auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
Expand All @@ -31,8 +32,20 @@ void MergeTreeSink::consume(Chunk chunk)
if (!part)
continue;

if (storage.getDeduplicationLog())
{
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
}

/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog(), block_dedup_token))
{
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());

Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MergeTreeSink : public SinkToStorage
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
10 changes: 9 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,16 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
block_id = part->getZeroLevelPartBlockID();

String block_dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!block_dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token += fmt::format("_{}", chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
block_id = part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}
else
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ class ReplicatedMergeTreeSink : public SinkToStorage

bool is_attach = false;
bool quorum_parallel = false;
bool deduplicate = true;
const bool deduplicate = true;
bool last_block_is_duplicate = false;

using Logger = Poco::Logger;
Poco::Logger * log;

ContextPtr context;
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
create and check deduplication
two inserts with exact data, one inserted, one deduplicated by data digest
0 1000
two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
1 1001
reset deduplication token and insert new row
0 1000
1 1001
1 1001
2 1002
33 changes: 33 additions & 0 deletions tests/queries/0_stateless/02124_insert_deduplication_token.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- insert data duplicates by providing deduplication token on insert

DROP TABLE IF EXISTS insert_dedup_token SYNC;

select 'create and check deduplication';
CREATE TABLE insert_dedup_token (
id Int32, val UInt32
) ENGINE=MergeTree() ORDER BY id
SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;

select 'two inserts with exact data, one inserted, one deduplicated by data digest';
INSERT INTO insert_dedup_token VALUES(0, 1000);
INSERT INTO insert_dedup_token VALUES(0, 1000);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x62';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x63';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'reset deduplication token and insert new row';
set insert_deduplication_token = '';
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

DROP TABLE insert_dedup_token SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=MergeTree() ORDER BY id SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC"
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token_replica' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token_replica ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token_replica SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token_replica (id Int32) ENGINE=ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{table}', '{replica}') ORDER BY id"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token_replica SYNC"
Loading

0 comments on commit f6684db

Please sign in to comment.