Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

insert_deduplication_token setting for INSERT statement #1

Merged
merged 7 commits into from
Jan 29, 2022
Merged
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class IColumn;
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

Expand Down
13 changes: 11 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "IMergeTreeDataPart.h"

#include <optional>
#include <string_view>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
Expand Down Expand Up @@ -1635,13 +1636,21 @@ UInt32 IMergeTreeDataPart::getNumberOfRefereneces() const
}


String IMergeTreeDataPart::getZeroLevelPartBlockID() const
String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);

SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
if (token.empty())
{
checksums.computeTotalChecksumDataOnly(hash);
}
else
{
hash.update(token.data(), token.size());
}

union
{
char bytes[16];
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
bool isEmpty() const { return rows_count == 0; }

/// Compute part block id for zero level part. Otherwise throws an exception.
String getZeroLevelPartBlockID() const;
/// If token is not empty, block id is calculated based on it instead of block data
String getZeroLevelPartBlockID(std::string_view token) const;

const MergeTreeData & storage;

Expand Down
20 changes: 15 additions & 5 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2426,7 +2426,12 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}


bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand All @@ -2435,7 +2440,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log))
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log, deduplication_token))
return false;
}
if (!covered_parts.empty())
Expand All @@ -2447,8 +2452,13 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem


bool MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log)
MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
std::unique_lock<std::mutex> & lock,
DataPartsVector * out_covered_parts,
MergeTreeDeduplicationLog * deduplication_log,
std::string_view deduplication_token)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
Expand Down Expand Up @@ -2510,7 +2520,7 @@ bool MergeTreeData::renameTempPartAndReplace(
/// deduplication.
if (deduplication_log)
{
String block_id = part->getZeroLevelPartBlockID();
String block_id = part->getZeroLevelPartBlockID(deduplication_token);
auto res = deduplication_log->addPart(block_id, part_info);
if (!res.second)
{
Expand Down
17 changes: 13 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,12 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
SimpleIncrement * increment = nullptr,
Transaction * out_transaction = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
Expand All @@ -508,9 +513,13 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);

MutableDataPartPtr & part,
SimpleIncrement * increment,
Transaction * out_transaction,
DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr,
MergeTreeDeduplicationLog * deduplication_log = nullptr,
std::string_view deduplication_token = std::string_view());

/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/MergeTree/MergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void MergeTreeSink::onStart()
void MergeTreeSink::consume(Chunk chunk)
{
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
String block_dedup_token;

auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
Expand All @@ -31,8 +32,20 @@ void MergeTreeSink::consume(Chunk chunk)
if (!part)
continue;

if (storage.getDeduplicationLog())
{
const String & dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token = fmt::format("{}_{}", dedup_token, chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
}

/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog(), block_dedup_token))
{
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());

Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MergeTreeSink : public SinkToStorage
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
ContextPtr context;
uint64_t chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
10 changes: 9 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,16 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
{
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
block_id = part->getZeroLevelPartBlockID();

String block_dedup_token = context->getSettingsRef().insert_deduplication_token;
if (!block_dedup_token.empty())
{
/// multiple blocks can be inserted within the same insert query
/// an ordinal number is added to dedup token to generate a distinctive block id for each block
block_dedup_token += fmt::format("_{}", chunk_dedup_seqnum);
++chunk_dedup_seqnum;
}
block_id = part->getZeroLevelPartBlockID(block_dedup_token);
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}
else
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ class ReplicatedMergeTreeSink : public SinkToStorage

bool is_attach = false;
bool quorum_parallel = false;
bool deduplicate = true;
const bool deduplicate = true;
bool last_block_is_duplicate = false;

using Logger = Poco::Logger;
Poco::Logger * log;

ContextPtr context;
UInt64 chunk_dedup_seqnum = 0; /// input chunk ordinal number in case of dedup token
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
create and check deduplication
two inserts with exact data, one inserted, one deduplicated by data digest
0 1000
two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token
0 1000
1 1001
1 1001
reset deduplication token and insert new row
0 1000
1 1001
1 1001
2 1002
33 changes: 33 additions & 0 deletions tests/queries/0_stateless/02124_insert_deduplication_token.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- insert data duplicates by providing deduplication token on insert

DROP TABLE IF EXISTS insert_dedup_token SYNC;

select 'create and check deduplication';
CREATE TABLE insert_dedup_token (
id Int32, val UInt32
) ENGINE=MergeTree() ORDER BY id
SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;

select 'two inserts with exact data, one inserted, one deduplicated by data digest';
INSERT INTO insert_dedup_token VALUES(0, 1000);
INSERT INTO insert_dedup_token VALUES(0, 1000);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x62';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'update dedup token, two inserts with the same dedup token, one inserted, one deduplicated by the token';
set insert_deduplication_token = '\x61\x00\x63';
INSERT INTO insert_dedup_token VALUES(1, 1001);
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

select 'reset deduplication token and insert new row';
set insert_deduplication_token = '';
INSERT INTO insert_dedup_token VALUES(2, 1002);
SELECT * FROM insert_dedup_token ORDER BY id;

DROP TABLE insert_dedup_token SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token (id Int32) ENGINE=MergeTree() ORDER BY id SETTINGS non_replicated_deduplication_window=0xFFFFFFFF;"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token SYNC"
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
insert 2 blocks with dedup token, 1 row per block
2
1
2
insert deduplicated by token
2
1
2
insert the same data by providing different dedup token
4
1
1
2
2
insert 4 blocks, 2 deduplicated, 2 inserted
6
1
1
2
2
3
4
disable token based deduplication, insert the same data as with token
10
1
1
1
2
2
2
3
3
4
4
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

QUERY_COUNT_ORIGIN_BLOCKS="SELECT COUNT(*) FROM system.parts WHERE database = currentDatabase() AND table = 'block_dedup_token_replica' AND min_block_number == max_block_number;"
QUERY_SELECT_FROM_TABLE_ORDERED="SELECT * FROM block_dedup_token_replica ORDER BY id;"

$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS block_dedup_token_replica SYNC"
$CLICKHOUSE_CLIENT --query="CREATE TABLE block_dedup_token_replica (id Int32) ENGINE=ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{table}', '{replica}') ORDER BY id"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 2 blocks with dedup token, 1 row per block'"
DEDUP_TOKEN='dedup1'
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert deduplicated by token'"
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert the same data by providing different dedup token'"
DEDUP_TOKEN='dedup2'
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'insert 4 blocks, 2 deduplicated, 2 inserted'"
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="SELECT 'disable token based deduplication, insert the same data as with token'"
DEDUP_TOKEN=''
echo 'INSERT INTO block_dedup_token_replica VALUES (1), (2), (3), (4)' | ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_insert_block_size=1&min_insert_block_size_rows=0&min_insert_block_size_bytes=0&insert_deduplication_token='$DEDUP_TOKEN'&query=" --data-binary @-
$CLICKHOUSE_CLIENT --multiquery --query "$QUERY_COUNT_ORIGIN_BLOCKS;$QUERY_SELECT_FROM_TABLE_ORDERED"

$CLICKHOUSE_CLIENT --query="DROP TABLE block_dedup_token_replica SYNC"
Loading