Skip to content

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Sep 7, 2023
1 parent b92713f commit 287e2f2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (rc *Controller) Close() {

// Run starts the restore task.
func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeImportTables", func() {})
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
Expand Down
45 changes: 28 additions & 17 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func DefaultExpectPDCfgGenerators() map[string]pauseConfigGenerator {

// PdController manage get/update config from pd.
type PdController struct {
remove below!!!
addrs []string
cli *http.Client
pdClient pd.Client
Expand Down Expand Up @@ -330,6 +329,16 @@ func parseVersion(versionBytes []byte) *semver.Version {
return version
}

// TODO: always read latest PD nodes from PD client
func (p *PdController) getAllPDAddrs() []string {
ret := make([]string, 0, len(p.addrs)+1)
if p.pdClient != nil {
ret = append(ret, p.pdClient.GetLeaderAddr())
}
ret = append(ret, p.addrs...)
return ret
}

func (p *PdController) isPauseConfigEnabled() bool {
return p.version.Compare(pauseConfigVersion) >= 0
}
Expand Down Expand Up @@ -357,7 +366,7 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) {

func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
v, e := get(ctx, addr, clusterVersionPrefix, p.cli, http.MethodGet, nil)
if e != nil {
err = e
Expand All @@ -384,7 +393,7 @@ func (p *PdController) getRegionCountWith(
end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey)))
}
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
query := fmt.Sprintf(
"%s?start_key=%s&end_key=%s",
regionCountPrefix, start, end)
Expand All @@ -411,7 +420,7 @@ func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdtyp
func (p *PdController) getStoreInfoWith(
ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdtypes.StoreInfo, error) {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
query := fmt.Sprintf(
"%s/%d",
storePrefix, storeID)
Expand Down Expand Up @@ -441,7 +450,7 @@ func (p *PdController) doPauseSchedulers(ctx context.Context,
removedSchedulers := make([]string, 0, len(schedulers))
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
removedSchedulers = append(removedSchedulers, scheduler)
Expand Down Expand Up @@ -524,7 +533,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str
}
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
if err == nil {
break
Expand All @@ -548,7 +557,7 @@ func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) {

func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
v, e := get(ctx, addr, schedulerPrefix, p.cli, http.MethodGet, nil)
if e != nil {
err = e
Expand All @@ -570,7 +579,7 @@ func (p *PdController) GetPDScheduleConfig(
ctx context.Context,
) (map[string]interface{}, error) {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
v, e := pdRequest(
ctx, addr, scheduleConfigPrefix, p.cli, http.MethodGet, nil)
if e != nil {
Expand Down Expand Up @@ -608,7 +617,7 @@ func (p *PdController) doUpdatePDScheduleConfig(
newCfg[sc] = v
}

for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
reqData, err := json.Marshal(newCfg)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -836,7 +845,7 @@ func (p *PdController) doRemoveSchedulersWith(
// GetMinResolvedTS get min-resolved-ts from pd
func (p *PdController) GetMinResolvedTS(ctx context.Context) (uint64, error) {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil)
if e != nil {
log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
Expand Down Expand Up @@ -870,7 +879,7 @@ func (p *PdController) RecoverBaseAllocID(ctx context.Context, id uint64) error
ID: fmt.Sprintf("%d", id),
})
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
_, e := pdRequest(ctx, addr, baseAllocIDPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e != nil {
log.Warn("failed to recover base alloc id", zap.String("addr", addr), zap.Error(e))
Expand All @@ -894,7 +903,7 @@ func (p *PdController) ResetTS(ctx context.Context, ts uint64) error {
ForceUseLarger: true,
})
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
code, _, e := pdRequestWithCode(ctx, addr, resetTSPrefix, p.cli, http.MethodPost, bytes.NewBuffer(reqData))
if e != nil {
// for pd version <= 6.2, if the given ts < current ts of pd, pd returns StatusForbidden.
Expand Down Expand Up @@ -924,7 +933,7 @@ func (p *PdController) UnmarkRecovering(ctx context.Context) error {

func (p *PdController) operateRecoveringMark(ctx context.Context, method string) error {
var err error
for _, addr := range p.addrs {
for _, addr := range p.getAllPDAddrs() {
_, e := pdRequest(ctx, addr, recoveringMarkPrefix, p.cli, method, nil)
if e != nil {
log.Warn("failed to operate recovering mark", zap.String("method", method),
Expand Down Expand Up @@ -969,7 +978,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L
panic(err)
}
var lastErr error
for i, addr := range p.addrs {
addrs := p.getAllPDAddrs()
for i, addr := range addrs {
_, lastErr = pdRequest(ctx, addr, regionLabelPrefix,
p.cli, http.MethodPost, bytes.NewBuffer(reqData))
if lastErr == nil {
Expand All @@ -979,7 +989,7 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L
return errors.Trace(lastErr)
}

if i < len(p.addrs) {
if i < len(addrs) {
log.Warn("failed to create or update region label rule, will try next pd address",
zap.Error(lastErr), zap.String("pdAddr", addr))
}
Expand All @@ -990,7 +1000,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L
// DeleteRegionLabelRule deletes a region label rule.
func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error {
var lastErr error
for i, addr := range p.addrs {
addrs := p.getAllPDAddrs()
for i, addr := range addrs {
_, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", regionLabelPrefix, ruleID),
p.cli, http.MethodDelete, nil)
if lastErr == nil {
Expand All @@ -1000,7 +1011,7 @@ func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string)
return errors.Trace(lastErr)
}

if i < len(p.addrs) {
if i < len(addrs) {
log.Warn("failed to delete region label rule, will try next pd address",
zap.Error(lastErr), zap.String("pdAddr", addr))
}
Expand Down
18 changes: 15 additions & 3 deletions br/tests/lightning_pd_leader_switch/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ bin/pd-server --join "https://$PD_ADDR" \
--name pd2 \
--config $PD_CONFIG &

# strange that new PD can't join too quickly
sleep 10

bin/pd-server --join "https://$PD_ADDR" \
Expand All @@ -42,14 +43,25 @@ bin/pd-server --join "https://$PD_ADDR" \

# restart TiDB to let TiDB load new PD nodes
killall tidb-server
# wait for TiDB to exit
# wait for TiDB to exit to release file lock
sleep 5
start_tidb

export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/importer/beforeImportTables=sleep(60000)'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/importer/beforeRun=sleep(60000)'
run_lightning --backend local --enable-checkpoint=0 &
lightning_pid=$!
# in many libraries, etcd client's auto-sync-interval is 30s, so we need to wait at least 30s before kill PD leader
sleep 45
kill $(cat /tmp/backup_restore_test/pd_pid.txt)

read -p 123
# Check that everything is correctly imported
wait $lightning_pid
run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
check_contains 'count(*): 4'
check_contains 'sum(c): 10'

run_sql 'SELECT count(*), sum(c) FROM cpeng.b'
check_contains 'count(*): 4'
check_contains 'sum(c): 46'

restart_services
4 changes: 2 additions & 2 deletions br/tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ groups=(
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history"
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index'
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index'
["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index'
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'
Expand All @@ -37,7 +37,7 @@ groups=(
["G14"]='lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema'
["G15"]='lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3'
["G16"]='lightning_shard_rowid lightning_source_linkfile lightning_sqlmode lightning_tidb_duplicate_data lightning_tidb_rowid lightning_tiflash lightning_tikv_multi_rocksdb lightning_too_many_columns lightning_tool_135'
["G17"]='lightning_tool_1420 lightning_tool_1472 lightning_tool_241 lightning_ttl lightning_unused_config_keys lightning_various_types lightning_view lightning_write_batch lightning_write_limit'
["G17"]='lightning_tool_1420 lightning_tool_1472 lightning_tool_241 lightning_ttl lightning_unused_config_keys lightning_various_types lightning_view lightning_write_batch lightning_write_limit lightning_pd_leader_switch'
)

# Get other cases not in groups, to avoid missing any case
Expand Down

0 comments on commit 287e2f2

Please sign in to comment.