diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 0caeb3d3e956f..1c2149b84bf84 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -458,7 +458,7 @@ func (s *testDBSuite) testAlterLock(c *C) { func (s *testDBSuite) TestAddIndex(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) - s.tk.MustExec("create table test_add_index (c1 int, c2 int, c3 int, primary key(c1))") + s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") done := make(chan error, 1) start := -10 @@ -483,6 +483,11 @@ func (s *testDBSuite) TestAddIndex(c *C) { otherKeys = append(otherKeys, n) } } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) + s.mustExec(c, sql) + otherKeys = append(otherKeys, v) sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) diff --git a/ddl/index.go b/ddl/index.go index c4316829c6668..0ce973f22ca38 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -473,7 +473,9 @@ func (w *worker) getIndexRecord(t table.Table, colMap map[int64]*types.FieldType } const ( + minTaskHandledCnt = 32 // minTaskHandledCnt is the minimum number of handles per batch. defaultTaskHandleCnt = 128 + maxTaskHandleCnt = 1 << 20 // maxTaskHandleCnt is the maximum number of handles per batch. defaultWorkers = 16 ) @@ -500,6 +502,7 @@ type worker struct { idxRecords []*indexRecord // It's used to reduce the number of new slice. taskRange handleInfo // Every task's handle range. taskRet *taskResult + batchSize int rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map. } @@ -507,6 +510,7 @@ func newWorker(ctx context.Context, id, batch, colsLen, indexColsLen int) *worke return &worker{ id: id, ctx: ctx, + batchSize: batch, idxRecords: make([]*indexRecord, 0, batch), defaultVals: make([]types.Datum, colsLen), rowMap: make(map[int64]types.Datum, indexColsLen), @@ -524,6 +528,13 @@ type handleInfo struct { endHandle int64 } +func getEndHandle(baseHandle, batch int64) int64 { + if baseHandle >= math.MaxInt64-batch { + return math.MaxInt64 + } + return baseHandle + batch +} + // addTableIndex adds index into table. // TODO: Move this to doc or wiki. // How to add index in reorganization state? @@ -550,25 +561,27 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo colMap[col.ID] = &col.FieldType } workerCnt := defaultWorkers - taskBatch := int64(defaultTaskHandleCnt) addedCount := job.GetRowCount() - baseHandle := reorgInfo.Handle + baseHandle, logStartHandle := reorgInfo.Handle, reorgInfo.Handle workers := make([]*worker, workerCnt) for i := 0; i < workerCnt; i++ { ctx := d.newContext() - workers[i] = newWorker(ctx, i, int(taskBatch), len(cols), len(colMap)) + workers[i] = newWorker(ctx, i, defaultTaskHandleCnt, len(cols), len(colMap)) // Make sure every worker has its own index buffer. workers[i].index = tables.NewIndexWithBuffer(t.Meta(), indexInfo) } for { startTime := time.Now() wg := sync.WaitGroup{} + currentBatchSize := int64(workers[0].batchSize) for i := 0; i < workerCnt; i++ { wg.Add(1) - workers[i].setTaskNewRange(baseHandle+int64(i)*taskBatch, baseHandle+int64(i+1)*taskBatch) + endHandle := getEndHandle(baseHandle, currentBatchSize) + workers[i].setTaskNewRange(baseHandle, endHandle) // TODO: Consider one worker to one goroutine. go workers[i].doBackfillIndexTask(t, colMap, &wg) + baseHandle = endHandle } wg.Wait() @@ -583,19 +596,19 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle)) }) - log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, take time %v, update handle err %v", - addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1) + log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, batch %d, take time %v, update handle err %v", + addedCount, logStartHandle, nextHandle, taskAddedCount, err, currentBatchSize, sub, err1) return errors.Trace(err) } d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle) batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub) - log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v", - addedCount, baseHandle, nextHandle, taskAddedCount, sub) + log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, batch %d, take time %v", + addedCount, logStartHandle, nextHandle, taskAddedCount, currentBatchSize, sub) if isEnd { return nil } - baseHandle = nextHandle + baseHandle, logStartHandle = nextHandle, nextHandle } } @@ -603,6 +616,8 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) { taskAddedCount, nextHandle := int64(0), workers[0].taskRange.startHandle var err error var isEnd bool + starvingWorkers := 0 + largerDefaultWorkers := 0 for _, worker := range workers { ret := worker.taskRet if ret.err != nil { @@ -610,9 +625,29 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) { break } taskAddedCount += int64(ret.count) + if ret.count < minTaskHandledCnt { + starvingWorkers++ + } else if ret.count > defaultTaskHandleCnt { + largerDefaultWorkers++ + } nextHandle = ret.outOfRangeHandle isEnd = ret.isAllDone } + + // Adjust the worker's batch size. + halfWorkers := len(workers) / 2 + if starvingWorkers >= halfWorkers && workers[0].batchSize < maxTaskHandleCnt { + // If the index data is discrete, we need to increase the batch size to speed up. + for _, worker := range workers { + worker.batchSize *= 2 + } + } else if largerDefaultWorkers >= halfWorkers && workers[0].batchSize > defaultTaskHandleCnt { + // If the batch size exceeds the limit after we increase it, + // we need to decrease the batch size to reduce write conflict. + for _, worker := range workers { + worker.batchSize /= 2 + } + } return taskAddedCount, nextHandle, isEnd, errors.Trace(err) } diff --git a/domain/domain.go b/domain/domain.go index d2db522d3a23e..2c2dfc1adb72b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -409,10 +409,10 @@ type EtcdBackend interface { } // NewDomain creates a new domain. Should not create multiple domains for the same store. -func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory, sysFactory func(*Domain) (pools.Resource, error)) (d *Domain, err error) { +func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory, sysFactory func(*Domain) (pools.Resource, error)) (*Domain, error) { capacity := 200 // capacity of the sysSessionPool size idleTimeout := 3 * time.Minute // sessions in the sysSessionPool will be recycled after idleTimeout - d = &Domain{ + d := &Domain{ store: store, SchemaValidator: NewSchemaValidator(ddlLease), exit: make(chan struct{}), @@ -422,8 +422,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio if ebd, ok := store.(EtcdBackend); ok { if addrs := ebd.EtcdAddrs(); addrs != nil { - var cli *clientv3.Client - cli, err = clientv3.New(clientv3.Config{ + cli, err := clientv3.New(clientv3.Config{ Endpoints: addrs, DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{ @@ -453,11 +452,22 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio } sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, idleTimeout) d.ddl = ddl.NewDDL(ctx, d.etcdClient, d.store, d.infoHandle, callback, ddlLease, sysCtxPool) + var err error + defer func() { + // Clean up domain when initializing syncer failed or reloading failed. + // If we don't clean it, there are some dirty data when retrying this function. + if err != nil { + d.Close() + log.Errorf("[ddl] new domain failed %v", errors.ErrorStack(errors.Trace(err))) + } + }() - if err = d.ddl.SchemaSyncer().Init(ctx); err != nil { + err = d.ddl.SchemaSyncer().Init(ctx) + if err != nil { return nil, errors.Trace(err) } - if err = d.Reload(); err != nil { + err = d.Reload() + if err != nil { return nil, errors.Trace(err) } diff --git a/table/tables/index_test.go b/table/tables/index_test.go index 0e0a9c0d3a15f..6c6cf1511d904 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -33,6 +33,7 @@ type testIndexSuite struct { } func (s *testIndexSuite) SetUpSuite(c *C) { + testleak.BeforeTest() store, err := tikv.NewMockTikvStore() c.Assert(err, IsNil) s.s = store @@ -41,10 +42,10 @@ func (s *testIndexSuite) SetUpSuite(c *C) { func (s *testIndexSuite) TearDownSuite(c *C) { err := s.s.Close() c.Assert(err, IsNil) + testleak.AfterTest(c)() } func (s *testIndexSuite) TestIndex(c *C) { - defer testleak.AfterTest(c)() tblInfo := &model.TableInfo{ ID: 1, Indices: []*model.IndexInfo{ @@ -183,7 +184,6 @@ func (s *testIndexSuite) TestIndex(c *C) { } func (s *testIndexSuite) TestCombineIndexSeek(c *C) { - defer testleak.AfterTest(c)() tblInfo := &model.TableInfo{ ID: 1, Indices: []*model.IndexInfo{