Skip to content

Commit

Permalink
owner: fix a bug about wrong timing sequence of DDL when add a table …
Browse files Browse the repository at this point in the history
…to processor (pingcap#450)
  • Loading branch information
leoppro authored Apr 10, 2020
1 parent f7f9435 commit 0f4980b
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 8 deletions.
5 changes: 1 addition & 4 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,7 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error {
}

// ProcessorInfos don't contains the whole set table id now.
if len(c.orphanTables) > 0 {
return nil
}
if len(c.waitingConfirmTables) > 0 {
if len(c.orphanTables) > 0 || len(c.waitingConfirmTables) > 0 {
return nil
}

Expand Down
12 changes: 9 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ func (p *processor) updateInfo(ctx context.Context) error {
}
p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs)
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos)))

err = updatePosition()
if err != nil {
return errors.Trace(err)
}
err = retry.Run(500*time.Millisecond, 5, func() error {
err = p.tsRWriter.WriteInfoIntoStorage(ctx)
switch errors.Cause(err) {
Expand All @@ -393,7 +396,7 @@ func (p *processor) updateInfo(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
return updatePosition()
return nil
}

func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) {
Expand Down Expand Up @@ -603,7 +606,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
defer p.tablesMu.Unlock()
ctx = util.PutTableIDInCtx(ctx, tableID)

log.Debug("Add table", zap.Int64("tableID", tableID))
log.Debug("Add table", zap.Int64("tableID", tableID), zap.Uint64("startTs", startTs))
if _, ok := p.tables[tableID]; ok {
log.Warn("Ignore existing table", zap.Int64("ID", tableID))
}
Expand Down Expand Up @@ -681,6 +684,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, startTs uint64)
if p.position.CheckPointTs > startTs {
p.position.CheckPointTs = startTs
}
if p.position.ResolvedTs > startTs {
p.position.ResolvedTs = startTs
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Inc()
p.collectMetrics(ctx, tableID)
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/jenkins_ci/integration_test_common.groovy
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability"]
test_case_names = ["simple", "cdc", "multi_capture", "split_region", "row_format", "tiflash", "availability", "ddl_sequence"]

def prepare_binaries() {
stage('Prepare Binaries') {
Expand Down
27 changes: 27 additions & 0 deletions tests/ddl_sequence/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "ddl_sequence"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
45 changes: 45 additions & 0 deletions tests/ddl_sequence/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
drop database if exists `ddl_sequence`;
create database `ddl_sequence`;
use `ddl_sequence`;

CREATE TABLE many_cols1 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols1 DROP COLUMN col0;
INSERT INTO many_cols1 (val) VALUES (1);

CREATE TABLE many_cols2 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols2 DROP COLUMN col0;
INSERT INTO many_cols2 (val) VALUES (1);

CREATE TABLE many_cols3 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols3 DROP COLUMN col0;
INSERT INTO many_cols3 (val) VALUES (1);

CREATE TABLE many_cols4 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols4 DROP COLUMN col0;
INSERT INTO many_cols4 (val) VALUES (1);

CREATE TABLE many_cols5 (
id INT AUTO_INCREMENT PRIMARY KEY,
val INT DEFAULT 0,
col0 INT NOT NULL
);
ALTER TABLE many_cols5 DROP COLUMN col0;
INSERT INTO many_cols5 (val) VALUES (1);

CREATE TABLE finish_mark(a int primary key)
43 changes: 43 additions & 0 deletions tests/ddl_sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST:$UP_PD_PORT)

run_cdc_server $WORK_DIR $CDC_BINARY

TOPIC_NAME="ticdc-ddl-sequence-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
mysql) ;&
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists ddl_sequence.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 0f4980b

Please sign in to comment.