diff --git a/cdc/changefeed.go b/cdc/changefeed.go index f0a53463e94..9181a753871 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -465,10 +465,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { } // ProcessorInfos don't contains the whole set table id now. - if len(c.orphanTables) > 0 { - return nil - } - if len(c.waitingConfirmTables) > 0 { + if len(c.orphanTables) > 0 || len(c.waitingConfirmTables) > 0 { return nil } diff --git a/cdc/processor.go b/cdc/processor.go index f28ee9ad3ce..a856ed3e7b7 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -377,7 +377,10 @@ func (p *processor) updateInfo(ctx context.Context) error { } p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs) syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos))) - + err = updatePosition() + if err != nil { + return errors.Trace(err) + } err = retry.Run(500*time.Millisecond, 5, func() error { err = p.tsRWriter.WriteInfoIntoStorage(ctx) switch errors.Cause(err) { @@ -393,7 +396,7 @@ func (p *processor) updateInfo(ctx context.Context) error { if err != nil { return errors.Trace(err) } - return updatePosition() + return nil } func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) { @@ -603,7 +606,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) defer p.tablesMu.Unlock() ctx = util.PutTableIDInCtx(ctx, tableID) - log.Debug("Add table", zap.Int64("tableID", tableID)) + log.Debug("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs)) if _, ok := p.tables[tableID]; ok { log.Warn("Ignore existing table", zap.Int64("ID", tableID)) } @@ -681,6 +684,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64) if p.position.CheckPointTs > startTs { p.position.CheckPointTs = startTs } + if p.position.ResolvedTs > startTs { + p.position.ResolvedTs = startTs + } syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc() p.collectMetrics(ctx, tableID) } diff --git a/scripts/jenkins_ci/integration_test_common.groovy b/scripts/jenkins_ci/integration_test_common.groovy index e0bda0ca5b4..a5597d368a8 100644 --- a/scripts/jenkins_ci/integration_test_common.groovy +++ b/scripts/jenkins_ci/integration_test_common.groovy @@ -1,4 +1,4 @@ -test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability"] +test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability", "ddl_sequence"] def prepare_binaries() { stage('Prepare Binaries') { diff --git a/tests/ddl_sequence/conf/diff_config.toml b/tests/ddl_sequence/conf/diff_config.toml new file mode 100644 index 00000000000..390a9f3128c --- /dev/null +++ b/tests/ddl_sequence/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "ddl_sequence" + tables = ["~.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/ddl_sequence/data/prepare.sql b/tests/ddl_sequence/data/prepare.sql new file mode 100644 index 00000000000..fbff85bacb5 --- /dev/null +++ b/tests/ddl_sequence/data/prepare.sql @@ -0,0 +1,45 @@ +drop database if exists `ddl_sequence`; +create database `ddl_sequence`; +use `ddl_sequence`; + +CREATE TABLE many_cols1 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +ALTER TABLE many_cols1 DROP COLUMN col0; +INSERT INTO many_cols1 (val) VALUES (1); + +CREATE TABLE many_cols2 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +ALTER TABLE many_cols2 DROP COLUMN col0; +INSERT INTO many_cols2 (val) VALUES (1); + +CREATE TABLE many_cols3 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +ALTER TABLE many_cols3 DROP COLUMN col0; +INSERT INTO many_cols3 (val) VALUES (1); + +CREATE TABLE many_cols4 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +ALTER TABLE many_cols4 DROP COLUMN col0; +INSERT INTO many_cols4 (val) VALUES (1); + +CREATE TABLE many_cols5 ( + id INT AUTO_INCREMENT PRIMARY KEY, + val INT DEFAULT 0, + col0 INT NOT NULL +); +ALTER TABLE many_cols5 DROP COLUMN col0; +INSERT INTO many_cols5 (val) VALUES (1); + +CREATE TABLE finish_mark(a int primary key) \ No newline at end of file diff --git a/tests/ddl_sequence/run.sh b/tests/ddl_sequence/run.sh new file mode 100644 index 00000000000..6cd2ddc90a1 --- /dev/null +++ b/tests/ddl_sequence/run.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT) + + run_cdc_server $WORK_DIR $CDC_BINARY + + TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + mysql) ;& + *) SINK_URI="mysql://root@127.0.0.1:3306/";; + esac + cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"