Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix a bug about wrong timing sequence of DDL when add a table to processor #450

Merged
merged 20 commits into from
Apr 10, 2020
5 changes: 1 addition & 4 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
}

// ProcessorInfos don't contains the whole set table id now.
if len(c.orphanTables) > 0 {
return nil
}
if len(c.waitingConfirmTables) > 0 {
if len(c.orphanTables) > 0 || len(c.waitingConfirmTables) > 0 {
return nil
}

Expand Down
12 changes: 9 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ func (p *processor) updateInfo(ctx context.Context) error {
}
p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos)))

err = updatePosition()
if err != nil {
return errors.Trace(err)
}
err = retry.Run(500*time.Millisecond, 5, func() error {
err = p.tsRWriter.WriteInfoIntoStorage(ctx)
switch errors.Cause(err) {
Expand All @@ -393,7 +396,7 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
return updatePosition()
return nil
}

func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) {
Expand Down Expand Up @@ -603,7 +606,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
defer p.tablesMu.Unlock()
ctx = util.PutTableIDInCtx(ctx, tableID)

log.Debug("Add table", zap.Int64("tableID", tableID))
log.Debug("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs))
if _, ok := p.tables[tableID]; ok {
log.Warn("Ignore existing table", zap.Int64("ID", tableID))
}
Expand Down Expand Up @@ -681,6 +684,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
if p.position.CheckPointTs > startTs {
p.position.CheckPointTs = startTs
}
if p.position.ResolvedTs > startTs {
p.position.ResolvedTs = startTs
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc()
p.collectMetrics(ctx, tableID)
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/jenkins_ci/integration_test_common.groovy
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability"]
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability", "ddl_sequence"]

def prepare_binaries() {
stage('Prepare Binaries') {
Expand Down
27 changes: 27 additions & 0 deletions tests/ddl_sequence/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "ddl_sequence"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
45 changes: 45 additions & 0 deletions tests/ddl_sequence/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
drop database if exists `ddl_sequence`;
create database `ddl_sequence`;
use `ddl_sequence`;

CREATE TABLE many_cols1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols1 DROP COLUMN col0;
INSERT INTO many_cols1 (val) VALUES (1);

CREATE TABLE many_cols2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols2 DROP COLUMN col0;
INSERT INTO many_cols2 (val) VALUES (1);

CREATE TABLE many_cols3 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols3 DROP COLUMN col0;
INSERT INTO many_cols3 (val) VALUES (1);

CREATE TABLE many_cols4 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols4 DROP COLUMN col0;
INSERT INTO many_cols4 (val) VALUES (1);

CREATE TABLE many_cols5 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols5 DROP COLUMN col0;
INSERT INTO many_cols5 (val) VALUES (1);

CREATE TABLE finish_mark(a int primary key)
43 changes: 43 additions & 0 deletions tests/ddl_sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)

run_cdc_server $WORK_DIR $CDC_BINARY

TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
mysql) ;&
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"