Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request: Support MergeTree with version timestamp. #445

Open
subkanthi opened this issue Jan 22, 2024 · 8 comments
Open

Request: Support MergeTree with version timestamp. #445

subkanthi opened this issue Jan 22, 2024 · 8 comments
Labels
kafka Issues related to Kafka version lightweight Issues related to Lightweight version
Milestone

Comments

@subkanthi
Copy link
Collaborator

Hi Team, is there a way to use the lightweight altinity connector with a MergeTree engine and keep the version timestamp permanently as we want to keep the time series? We want to avoid losing the information after merging has happened on ReplacingMergeTree.

@aadant
Copy link
Collaborator

aadant commented Jan 22, 2024

That's on the roadmap. Please note that there is risk of duplicated data.
Actually no data should be lost with the RMT. The history is not kept, just like in MySQL.

If the version timestamp is part of the sort key, it will not be merged. Another idea is to create a history table with a materialized view but materialized views have issues with RMT.

@aadant
Copy link
Collaborator

aadant commented Jan 23, 2024

I tested the below and it works fine if you write to the history table and use a MV to send the updates to the current table.

CREATE TABLE user_history
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY (user_id, _version)
SETTINGS index_granularity = 8192;

CREATE MATERIALIZED VIEW user_mv to user as select * from user_history;

CREATE TABLE user
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY (user_id)
SETTINGS index_granularity = 8192 ;

@subkanthi subkanthi added the 2.0.2 label Mar 1, 2024
@svb-alt svb-alt added this to the 2.2.0 milestone Apr 4, 2024
@svb-alt svb-alt added lightweight Issues related to Lightweight version kafka Issues related to Kafka version and removed 2.0.2 labels Apr 4, 2024
@vpol
Copy link

vpol commented May 14, 2024

@aadant could you please elaborate what you've done?

I'm very interested in keeping history of the objects, and would like to implement this workaround.

@aadant
Copy link
Collaborator

aadant commented May 15, 2024

@vpol currently table overrides are not supported by the connector but you can stop it and apply, then restart the connector :


-- the sink-connector would write in the user table coming from the source

rename TABLE user to user_current;

-- add _version to the sort key 

CREATE TABLE user
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY (user_id, _version)
SETTINGS index_granularity = 8192;

-- replicate changes from user (history) to user_current (latest version)
CREATE MATERIALIZED VIEW user_mv to user_current as select * from user;

@subkanthi subkanthi added the dev-complete Development completed label May 28, 2024
@subkanthi subkanthi modified the milestones: 2.2.0, 2.3.0 May 29, 2024
@subkanthi
Copy link
Collaborator Author

Moving to 2.3.0, need to discuss on the scope of the implementation.

@BorisTyshkevich
Copy link

Why do we need ReplicatedMergeTree for the History table? Why don't we use plain MergeTree (or ReplicatedMergeTree) and beat duplicate with argMax? There are not too many rows in the table (not 10M), so ReplacingMT has no positive effect.

@subkanthi
Copy link
Collaborator Author

Possible implementation for handling DDL's

  • stop replication.
  • Rename history table to history_#
  • Apply the DDL to table_history
  • create materialized view.
  • start replication

@BorisTyshkevich
Copy link

  1. A slight change:
  • stop replication
  • Apply DDL to the main table - It could be a long process. How long connector could wait? Should we change mutation_sync mode and timeouts? Possibly, the connector should go to the restart loop.
  • Rename history table to history_# - quite fast, but in some cases with ON CLUSTER, DDL WORKER could get stuck.
  • CREATE NEW table_history
  • drop materialized view.
  • create NEW materialized view.
  • start replication

In case of failure, the connector should restart the operation from the beginning in an IDEMPOTENT way - not failing if some steps have already been processed. IF EXISTS/IF NOT EXISTS clauses should be added to all DDLs on the TABLE and COLUMN levels.

  1. The most complicated thing about _history tables is the ORDER BY/PARTITION BY. It makes no sense to make the order the same as for MySQL tables. So, the interesting discussion will be how to configure such history tables. The existing config is unsuitable for setting ORDER/PART BY for hundreds of tables.

My bet is creating a special Clickhouse table with rules for every history table with the columns as in system.tables:

  • db.table (primary key)
  • partition by
  • order by
  • primary key
  • engine
  • additional columns (String or Array)
  • suffix (_history by default)
  • dest database (the same db by default)
  • cluster name (no ON CLUSTER by default for CREATE/RENAME statements)

If the connector finds the table in the list while processing DDL update, it can do the mentioned processing. If not - do nothing, no history tables need to be created.

  1. The table itself is a KV structure, so Engines EmbeddedRocksDB (for a single server) or KeeperMap (for Replicated) will work fine here.

  2. All Clickhosue DDL statements (RENAME, CREATE TABLE, CREATE MATERIALIZED VIEW) should be cluster-ready and have been tested with the ON CLUSTER clause (if needed) and work idempotent way. We need to test all operations with falling after each step.

  3. The same approach of a special Connector CONFIG table could also be used for the MAIN table. Many clients want to configure ORDER/PART BY and ENGINE for the main tables. So we can add more columns:

  • db.table (primary key)
  • main table partition by
  • main table order by
  • main table primary key
  • main table engine
  • main table additional columns (String or Array)
  • history table partition by
  • history table order by
  • history table primary key
  • history table engine
  • history table additional columns (String or Array)
  • history suffix (_history by default)
  • history database (the same db by default)
  • cluster name (no ON CLUSTER by default for CREATE/RENAME statements)

Such a table could also filter the incoming table list instead of Debezium variables. So all table updates go to Connector; no need to change text configs when a new table is added to MySQL.

Boris.

@subkanthi subkanthi added the p1 label Jul 31, 2024
@subkanthi subkanthi removed the dev-complete Development completed label Aug 22, 2024
@subkanthi subkanthi modified the milestones: 2.3.0, 2.4.0 Aug 23, 2024
@svb-alt svb-alt modified the milestones: 2.4.0, 2.5.0 Oct 1, 2024
@subkanthi subkanthi modified the milestones: 2.5.0, 2.6.0 Nov 6, 2024
@svb-alt svb-alt removed the p1 label Dec 17, 2024
@svb-alt svb-alt modified the milestones: 2.6.0, 2.7.0 Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kafka Issues related to Kafka version lightweight Issues related to Lightweight version
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

5 participants