diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 08d3c5e3843..de9ee876f74 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/cdc/processor" "github.com/pingcap/tiflow/cdc/processor/pipeline/system" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" @@ -36,7 +37,6 @@ import ( "github.com/pingcap/tiflow/pkg/migrate" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/p2p" - sortmgr "github.com/pingcap/tiflow/pkg/sorter/manager" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/version" @@ -92,12 +92,12 @@ type captureImpl struct { EtcdClient etcd.CDCEtcdClient tableActorSystem *system.System - // useEventSortEngine indicates whether to use the new pull based sort engine or + // useSortEngine indicates whether to use the new pull based sort engine or // the old push based sorter system. the latter will be removed after all sorter // have been transformed into pull based sort engine. - useEventSortEngine bool - sorterSystem *ssystem.System - sortEngineManager *sortmgr.EventSortEngineManager + useSortEngine bool + sorterSystem *ssystem.System + sortEngineFactory *factory.SortEngineFactory // MessageServer is the receiver of the messages from the other nodes. // It should be recreated each time the capture is restarted. @@ -130,7 +130,7 @@ func NewCapture(pdEndpoints []string, etcdClient etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper, tableActorSystem *system.System, - sortEngineManager *sortmgr.EventSortEngineManager, + sortEngineMangerFactory *factory.SortEngineFactory, sorterSystem *ssystem.System, ) Capture { conf := config.GetGlobalServerConfig() @@ -146,9 +146,9 @@ func NewCapture(pdEndpoints []string, newOwner: owner.NewOwner, info: &model.CaptureInfo{}, - useEventSortEngine: sortEngineManager != nil, - sortEngineManager: sortEngineManager, - sorterSystem: sorterSystem, + useSortEngine: sortEngineMangerFactory != nil, + sortEngineFactory: sortEngineMangerFactory, + sorterSystem: sorterSystem, migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf), } @@ -311,14 +311,13 @@ func (c *captureImpl) run(stdCtx context.Context) error { g, stdCtx := errgroup.WithContext(stdCtx) ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ - CaptureInfo: c.info, - EtcdClient: c.EtcdClient, - TableActorSystem: c.tableActorSystem, - MessageServer: c.MessageServer, - MessageRouter: c.MessageRouter, - + CaptureInfo: c.info, + EtcdClient: c.EtcdClient, + TableActorSystem: c.tableActorSystem, + MessageServer: c.MessageServer, + MessageRouter: c.MessageRouter, SorterSystem: c.sorterSystem, - SortEngineManager: c.sortEngineManager, + SortEngineFactory: c.sortEngineFactory, }) g.Go(func() error { diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 84888e39223..c98be3a6aeb 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pipeline" "github.com/pingcap/tiflow/pkg/regionspan" - "github.com/pingcap/tiflow/pkg/sorter" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "golang.org/x/sync/errgroup" @@ -107,52 +106,3 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, n.cancel = cancel return nil } - -func (n *pullerNode) startWithEventSortEngine(ctx pipeline.NodeContext, - up *upstream.Upstream, wg *errgroup.Group, - eventSortEngine sorter.EventSortEngine, -) error { - n.wg = wg - ctxC, cancel := context.WithCancel(ctx) - ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) - ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor) - kvCfg := config.GetGlobalServerConfig().KVClient - // NOTICE: always pull the old value internally - // See also: https://github.com/pingcap/tiflow/issues/2301. - n.plr = puller.New( - ctxC, - up.PDClient, - up.GrpcPool, - up.RegionCache, - up.KVStorage, - up.PDClock, - n.startTs, - n.tableSpan(), - kvCfg, - n.changefeed, - n.tableID, - n.tableName, - ) - n.wg.Go(func() error { - ctx.Throw(errors.Trace(n.plr.Run(ctxC))) - return nil - }) - n.wg.Go(func() error { - for { - select { - case <-ctxC.Done(): - return nil - case rawKV := <-n.plr.Output(): - if rawKV == nil { - continue - } - pEvent := model.NewPolymorphicEvent(rawKV) - if err := eventSortEngine.Add(n.tableID, pEvent); err != nil { - return err - } - } - } - }) - n.cancel = cancel - return nil -} diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1c0d6206c00..f55174d7a63 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -34,7 +34,6 @@ import ( cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" - "github.com/pingcap/tiflow/pkg/sorter" "github.com/pingcap/tiflow/pkg/upstream" "github.com/tikv/client-go/v2/oracle" uberatomic "go.uber.org/atomic" @@ -78,12 +77,7 @@ type tableActor struct { // contains all nodes except pullerNode nodes []*ActorNode - // If useEventSortEngine is true eventSortEngine will be used, otherwise sortNode will be used. - // - // TODO(qupeng): adjust it after all sorters are transformed to EventSortEngine. - useEventSortEngine bool - eventSortEngine sorter.EventSortEngine - sortNode *sorterNode + sortNode *sorterNode // states of table actor started bool @@ -137,10 +131,6 @@ func NewTableActor( // All sub-goroutines should be spawn in this wait group. wg, cctx := errgroup.WithContext(ctx) - // TODO(qupeng): adjust it after all sorters are transformed to EventSortEngine. - debugConfig := serverConfig.GetGlobalServerConfig().Debug - useEventSortEngine := debugConfig.EnablePullBasedSink && debugConfig.EnableDBSorter - table := &tableActor{ // all errors in table actor will be reported to processor reportErr: cdcCtx.Throw, @@ -162,9 +152,7 @@ func NewTableActor( targetTs: targetTs, started: false, - useEventSortEngine: useEventSortEngine, - eventSortEngine: nil, - sortNode: nil, + sortNode: nil, changefeedID: changefeedVars.ID, changefeedVars: changefeedVars, @@ -263,9 +251,7 @@ func (t *tableActor) handleDataMsg(ctx context.Context) error { } func (t *tableActor) handleBarrierMsg(ctx context.Context, barrierTs model.Ts) error { - if !t.useEventSortEngine { - t.sortNode.updateBarrierTs(barrierTs) - } + t.sortNode.updateBarrierTs(barrierTs) return t.sinkNode.updateBarrierTs(ctx, barrierTs) } @@ -311,24 +297,12 @@ func (t *tableActor) start(sdtTableContext context.Context) error { flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled(), splitTxn) - if !t.useEventSortEngine { - sorterNode := newSorterNode(t.tableName, t.tableID, - t.replicaInfo.StartTs, flowController, - t.mg, &t.state, t.changefeedID, t.redoManager.Enabled(), - t.upstream.PDClient, - ) - t.sortNode = sorterNode - } else { - engine, err := t.globalVars.SortEngineManager.Create(t.changefeedVars.ID) - if err != nil { - log.Error("create sort engine fail", - zap.String("namespace", t.changefeedID.Namespace), - zap.String("changefeed", t.changefeedID.ID), - zap.Error(err)) - return err - } - t.eventSortEngine = engine - } + sorterNode := newSorterNode(t.tableName, t.tableID, + t.replicaInfo.StartTs, flowController, + t.mg, &t.state, t.changefeedID, t.redoManager.Enabled(), + t.upstream.PDClient, + ) + t.sortNode = sorterNode sortActorNodeContext := newContext(sdtTableContext, t.tableName, t.globalVars.TableActorSystem.Router(), @@ -397,13 +371,10 @@ func (t *tableActor) stop() { } atomic.StoreUint32(&t.stopped, stopped) - if !t.useEventSortEngine && t.sortNode != nil { + if t.sortNode != nil { // releaseResource will send a message to sorter router t.sortNode.releaseResource() } - if t.useEventSortEngine && t.eventSortEngine != nil { - t.eventSortEngine.RemoveTable(t.tableID) - } t.cancel() if t.sinkNode != nil && !t.sinkStopped.Load() { @@ -445,9 +416,6 @@ func (t *tableActor) ResolvedTs() model.Ts { if t.redoManager.Enabled() { return t.redoManager.GetResolvedTs(t.tableID) } - if t.useEventSortEngine { - return t.eventSortEngine.GetResolvedTs(t.tableID) - } return t.sortNode.ResolvedTs() } @@ -518,16 +486,14 @@ func (t *tableActor) Stats() tablepb.Stats { }, } - if !t.useEventSortEngine { - sorterStats := t.sortNode.sorter.Stats() - stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{ - CheckpointTs: sorterStats.CheckpointTsIngress, - ResolvedTs: sorterStats.ResolvedTsIngress, - } - stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{ - CheckpointTs: sorterStats.CheckpointTsEgress, - ResolvedTs: sorterStats.ResolvedTsEgress, - } + sorterStats := t.sortNode.sorter.Stats() + stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{ + CheckpointTs: sorterStats.CheckpointTsIngress, + ResolvedTs: sorterStats.ResolvedTsIngress, + } + stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{ + CheckpointTs: sorterStats.CheckpointTsEgress, + ResolvedTs: sorterStats.ResolvedTsEgress, } return stats @@ -570,48 +536,29 @@ func (t *tableActor) Wait() { // MemoryConsumption return the memory consumption in bytes func (t *tableActor) MemoryConsumption() uint64 { - if !t.useEventSortEngine { - return t.sortNode.flowController.GetConsumption() - } - // TODO(qupeng): sink manager should handle this. - return 0 + return t.sortNode.flowController.GetConsumption() } func (t *tableActor) Start(ts model.Ts) { - if !t.useEventSortEngine { - if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) { - t.sortNode.startTsCh <- ts - close(t.sortNode.startTsCh) - } - } else { - t.eventSortEngine.AddTable(t.tableID) + if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) { + t.sortNode.startTsCh <- ts + close(t.sortNode.startTsCh) } } func (t *tableActor) RemainEvents() int64 { - if !t.useEventSortEngine { - return t.sortNode.remainEvent() - } - // TODO(qupeng): record it in sort engine and sinkmanager. - return 0 + return t.sortNode.remainEvent() } // for ut var startPuller = func(t *tableActor, ctx *actorNodeContext) error { - if !t.useEventSortEngine { - return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode) - } - return t.pullerNode.startWithEventSortEngine(ctx, t.upstream, t.wg, t.eventSortEngine) + return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode) } var startSorter = func(t *tableActor, ctx *actorNodeContext) error { - if !t.useEventSortEngine { - eventSorter, err := createSorter(ctx, t.tableName, t.tableID) - if err != nil { - return errors.Trace(err) - } - return t.sortNode.start(ctx, t.wg, t.actorID, t.router, eventSorter) + eventSorter, err := createSorter(ctx, t.tableName, t.tableID) + if err != nil { + return errors.Trace(err) } - t.eventSortEngine.AddTable(t.tableID) - return nil + return t.sortNode.start(ctx, t.wg, t.actorID, t.router, eventSorter) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 33360e0713c..1f80b26e764 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -1086,9 +1086,9 @@ func (p *processor) Close(ctx cdcContext.Context) error { zap.Duration("duration", time.Since(start))) } - sortEngineManager := ctx.GlobalVars().SortEngineManager - if sortEngineManager != nil { - if err := sortEngineManager.Drop(p.changefeedID); err != nil { + engineFactory := ctx.GlobalVars().SortEngineFactory + if engineFactory != nil { + if err := engineFactory.Drop(p.changefeedID); err != nil { log.Error("drop event sort engine fail", zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), diff --git a/cdc/processor/sinkmanager/manager_impl.go b/cdc/processor/sinkmanager/manager_impl.go index 39359aa1706..95328b1e19d 100644 --- a/cdc/processor/sinkmanager/manager_impl.go +++ b/cdc/processor/sinkmanager/manager_impl.go @@ -23,10 +23,10 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory" - "github.com/pingcap/tiflow/pkg/sorter" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -61,13 +61,12 @@ type ManagerImpl struct { // if redo log is enabled. redoManager redo.LogManager // sortEngine is used by the sink manager to fetch data. - sortEngine sorter.EventSortEngine + sortEngine engine.SortEngine // sinkFactory used to create table sink. sinkFactory *factory.SinkFactory // tableSinks is a map from tableID to tableSink. tableSinks sync.Map - // lastBarrierTs is the last barrier ts. lastBarrierTs atomic.Uint64 @@ -95,7 +94,7 @@ func New( changefeedID model.ChangeFeedID, changefeedInfo *model.ChangeFeedInfo, redoManager redo.LogManager, - sortEngine sorter.EventSortEngine, + sortEngine engine.SortEngine, errChan chan error, metricsTableSinkTotalRows prometheus.Counter, ) (Manager, error) { @@ -287,7 +286,7 @@ func (m *ManagerImpl) generateSinkTasks() error { // Because if the redo log is enabled and the table just scheduled to this node, // the resolved ts from sorter may be smaller than the barrier ts. // So we use the min value of the barrier ts and the resolved ts from sorter. - getUpperBound := func() sorter.Position { + getUpperBound := func() engine.Position { barrierTs := m.lastBarrierTs.Load() resolvedTs := tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs() var upperBoundTs model.Ts @@ -297,7 +296,7 @@ func (m *ManagerImpl) generateSinkTasks() error { upperBoundTs = resolvedTs } - return sorter.Position{ + return engine.Position{ StartTs: upperBoundTs - 1, CommitTs: upperBoundTs, } @@ -315,7 +314,7 @@ func (m *ManagerImpl) generateSinkTasks() error { zap.Int64("tableID", tableID), zap.Uint64("memory", requestMemSize), ) - callback := func(lastWrittenPos sorter.Position) { + callback := func(lastWrittenPos engine.Position) { p := &progress{ tableID: tableID, nextLowerBoundPos: lastWrittenPos.Next(), @@ -367,9 +366,9 @@ func (m *ManagerImpl) generateRedoTasks() error { continue } // We use the table's resolved ts as the upper bound to fetch events. - getUpperBound := func() sorter.Position { + getUpperBound := func() engine.Position { upperBoundTs := tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs() - return sorter.Position{ + return engine.Position{ StartTs: upperBoundTs - 1, CommitTs: upperBoundTs, } @@ -380,7 +379,7 @@ func (m *ManagerImpl) generateRedoTasks() error { // Next time. continue } - callback := func(lastWrittenPos sorter.Position) { + callback := func(lastWrittenPos engine.Position) { p := &progress{ tableID: tableID, nextLowerBoundPos: lastWrittenPos.Next(), @@ -454,12 +453,12 @@ func (m *ManagerImpl) StartTable(tableID model.TableID) { startTs := tableSink.(*tableSinkWrapper).startTs m.sinkProgressHeap.push(&progress{ tableID: tableID, - nextLowerBoundPos: sorter.Position{StartTs: startTs - 1, CommitTs: startTs}, + nextLowerBoundPos: engine.Position{StartTs: startTs - 1, CommitTs: startTs}, }) if m.redoManager != nil { m.redoProgressHeap.push(&progress{ tableID: tableID, - nextLowerBoundPos: sorter.Position{StartTs: startTs - 1, CommitTs: startTs}, + nextLowerBoundPos: engine.Position{StartTs: startTs - 1, CommitTs: startTs}, }) } } @@ -486,7 +485,7 @@ func (m *ManagerImpl) RemoveTable(tableID model.TableID) error { ) // NOTICE: It is safe to only remove the table sink from the map. // Because if we found the table sink is closed, we will not add it back to the heap. - // Also, no need to GC the SorterEngine. Because the SorterEngine also removes this table. + // Also, no need to GC the SortEngine. Because the SortEngine also removes this table. m.tableSinks.Delete(tableID) if m.eventCache != nil { @@ -507,7 +506,7 @@ func (m *ManagerImpl) GetTableStats(tableID model.TableID) (pipeline.Stats, erro } checkpointTs := tableSink.(*tableSinkWrapper).getCheckpointTs() m.memQuota.release(tableID, checkpointTs) - cleanPos := sorter.Position{ + cleanPos := engine.Position{ StartTs: checkpointTs.Ts - 1, CommitTs: checkpointTs.Ts, } diff --git a/cdc/processor/sinkmanager/manager_impl_test.go b/cdc/processor/sinkmanager/manager_impl_test.go index eb617814d08..b095a9c8ee7 100644 --- a/cdc/processor/sinkmanager/manager_impl_test.go +++ b/cdc/processor/sinkmanager/manager_impl_test.go @@ -19,9 +19,9 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sorter" - "github.com/pingcap/tiflow/pkg/sorter/memory" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -35,8 +35,8 @@ func createManager( changefeedInfo *model.ChangeFeedInfo, errChan chan error, ) *ManagerImpl { - sorterEngine := memory.New(context.Background()) - manager, err := New(ctx, changefeedID, changefeedInfo, nil, sorterEngine, errChan, prometheus.NewCounter(prometheus.CounterOpts{})) + sortEngine := memory.New(context.Background()) + manager, err := New(ctx, changefeedID, changefeedInfo, nil, sortEngine, errChan, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, err) return manager.(*ManagerImpl) } @@ -53,9 +53,9 @@ func getChangefeedInfo() *model.ChangeFeedInfo { // nolint:unparam // It is ok to use the same tableID in test. -func addTableAndAddEventsToSorterEngine( +func addTableAndAddEventsToSortEngine( t *testing.T, - engine sorter.EventSortEngine, + engine engine.SortEngine, tableID model.TableID, ) { engine.AddTable(tableID) @@ -135,7 +135,7 @@ func TestAddTable(t *testing.T) { manager.StartTable(tableID) require.Equal(t, &progress{ tableID: tableID, - nextLowerBoundPos: sorter.Position{ + nextLowerBoundPos: engine.Position{ StartTs: 0, CommitTs: 1, }, @@ -160,7 +160,7 @@ func TestRemoveTable(t *testing.T) { require.True(t, ok) require.NotNil(t, tableSink) manager.StartTable(tableID) - addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) manager.UpdateBarrierTs(4) manager.UpdateReceivedSorterResolvedTs(tableID, 5) @@ -209,7 +209,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { }() tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) - addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) manager.UpdateBarrierTs(4) manager.UpdateReceivedSorterResolvedTs(tableID, 5) manager.StartTable(tableID) @@ -236,7 +236,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { }() tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) - addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) // This would happen when the table just added to this node and redo log is enabled. // So there is possibility that the resolved ts is smaller than the global barrier ts. manager.UpdateBarrierTs(4) @@ -265,7 +265,8 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { }() tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) - addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) + manager.UpdateBarrierTs(4) manager.UpdateReceivedSorterResolvedTs(tableID, 5) manager.StartTable(tableID) @@ -287,7 +288,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1)) tableID := model.TableID(1) manager.AddTable(tableID, 1, 100) - addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID) + addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID) manager.UpdateBarrierTs(4) manager.UpdateReceivedSorterResolvedTs(tableID, 5) diff --git a/cdc/processor/sinkmanager/redo_cache.go b/cdc/processor/sinkmanager/redo_cache.go index 3a20dfeabd8..9e71fd10909 100644 --- a/cdc/processor/sinkmanager/redo_cache.go +++ b/cdc/processor/sinkmanager/redo_cache.go @@ -19,7 +19,7 @@ import ( "sync/atomic" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" ) // redoEventCache caches events fetched from EventSortEngine. @@ -56,26 +56,26 @@ func (r *redoEventCache) getAppender(tableID model.TableID) *eventAppender { // pop some events from the cache. func (r *redoEventCache) pop( tableID model.TableID, - upperBound ...sorter.Position, -) ([]*model.RowChangedEvent, uint64, sorter.Position) { + upperBound ...engine.Position, +) ([]*model.RowChangedEvent, uint64, engine.Position) { r.mu.Lock() item, exists := r.tables[tableID] if !exists { r.mu.Unlock() - return nil, 0, sorter.Position{} + return nil, 0, engine.Position{} } r.mu.Unlock() item.mu.RLock() defer item.mu.RUnlock() if len(item.events) == 0 || item.readyCount == 0 { - return nil, 0, sorter.Position{} + return nil, 0, engine.Position{} } - var fetchCount int = item.readyCount + fetchCount := item.readyCount if len(upperBound) > 0 { fetchCount = sort.Search(item.readyCount, func(i int) bool { - pos := sorter.Position{ + pos := engine.Position{ CommitTs: item.events[i].CommitTs, StartTs: item.events[i].StartTs, } @@ -88,7 +88,7 @@ func (r *redoEventCache) pop( for _, x := range item.sizes[0:fetchCount] { size += x } - pos := sorter.Position{ + pos := engine.Position{ CommitTs: item.events[fetchCount-1].CommitTs, StartTs: item.events[fetchCount-1].StartTs, } diff --git a/cdc/processor/sinkmanager/redo_cache_test.go b/cdc/processor/sinkmanager/redo_cache_test.go index 3a13133095d..49f0d202dd6 100644 --- a/cdc/processor/sinkmanager/redo_cache_test.go +++ b/cdc/processor/sinkmanager/redo_cache_test.go @@ -17,7 +17,7 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/stretchr/testify/require" ) @@ -28,7 +28,7 @@ func TestRedoEventCache(t *testing.T) { require.True(t, appender.push(&model.RowChangedEvent{StartTs: 1, CommitTs: 2}, 100, false)) require.True(t, appender.push(&model.RowChangedEvent{StartTs: 1, CommitTs: 2}, 200, true)) require.True(t, appender.push(&model.RowChangedEvent{StartTs: 3, CommitTs: 4}, 300, true)) - // Append a unfinished transaction, which can't be poped. + // Append a unfinished transaction, which can't be popped. require.True(t, appender.push(&model.RowChangedEvent{StartTs: 5, CommitTs: 6}, 400, false)) // Append beyond capacity, can't success. require.False(t, appender.push(&model.RowChangedEvent{StartTs: 5, CommitTs: 6}, 100, false)) @@ -36,7 +36,7 @@ func TestRedoEventCache(t *testing.T) { events, size, pos := cache.pop(3) require.Equal(t, 3, len(events)) require.Equal(t, uint64(600), size) - require.Equal(t, 0, pos.Compare(sorter.Position{StartTs: 3, CommitTs: 4})) + require.Equal(t, 0, pos.Compare(engine.Position{StartTs: 3, CommitTs: 4})) require.Equal(t, uint64(400), cache.allocated) diff --git a/cdc/processor/sinkmanager/table_progress_heap.go b/cdc/processor/sinkmanager/table_progress_heap.go index eced91f8b2b..60e4aba263b 100644 --- a/cdc/processor/sinkmanager/table_progress_heap.go +++ b/cdc/processor/sinkmanager/table_progress_heap.go @@ -18,13 +18,13 @@ import ( "sync" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" ) // progress is the fetch progress of a table. type progress struct { tableID model.TableID - nextLowerBoundPos sorter.Position + nextLowerBoundPos engine.Position } // Assert progressHeap implements heap.Interface diff --git a/cdc/processor/sinkmanager/table_progress_heap_test.go b/cdc/processor/sinkmanager/table_progress_heap_test.go index b5057720558..2b8affdbde7 100644 --- a/cdc/processor/sinkmanager/table_progress_heap_test.go +++ b/cdc/processor/sinkmanager/table_progress_heap_test.go @@ -16,7 +16,7 @@ package sinkmanager import ( "testing" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/stretchr/testify/require" ) @@ -26,21 +26,21 @@ func TestTableProgresses(t *testing.T) { p := newTableProgresses() p.push(&progress{ tableID: 1, - nextLowerBoundPos: sorter.Position{ + nextLowerBoundPos: engine.Position{ StartTs: 1, CommitTs: 2, }, }) p.push(&progress{ tableID: 3, - nextLowerBoundPos: sorter.Position{ + nextLowerBoundPos: engine.Position{ StartTs: 2, CommitTs: 2, }, }) p.push(&progress{ tableID: 2, - nextLowerBoundPos: sorter.Position{ + nextLowerBoundPos: engine.Position{ StartTs: 2, CommitTs: 3, }, diff --git a/cdc/processor/sinkmanager/worker.go b/cdc/processor/sinkmanager/worker.go index 855d33edd55..9708d8d5293 100644 --- a/cdc/processor/sinkmanager/worker.go +++ b/cdc/processor/sinkmanager/worker.go @@ -17,14 +17,14 @@ import ( "context" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" ) // Used to record the progress of the table. -type writeSuccessCallback func(lastWrittenPos sorter.Position) +type writeSuccessCallback func(lastWrittenPos engine.Position) // Used to get an upper bound. -type upperBoundGetter func() sorter.Position +type upperBoundGetter func() engine.Position // Used to abort the task processing of the table. type isCanceled func() bool @@ -35,7 +35,7 @@ type sinkTask struct { tableID model.TableID // lowerBound indicates the lower bound of the task. // It is a closed interval. - lowerBound sorter.Position + lowerBound engine.Position // getUpperBound is used to get the upper bound of the task. // It is a closed interval. // Use a method to get the latest value, because the upper bound may change(only can increase). @@ -53,7 +53,7 @@ type sinkWorker interface { type redoTask struct { tableID model.TableID - lowerBound sorter.Position + lowerBound engine.Position getUpperBound upperBoundGetter tableSink *tableSinkWrapper callback writeSuccessCallback diff --git a/cdc/processor/sinkmanager/worker_impl.go b/cdc/processor/sinkmanager/worker_impl.go index 8e215390570..fdcdb583ad9 100644 --- a/cdc/processor/sinkmanager/worker_impl.go +++ b/cdc/processor/sinkmanager/worker_impl.go @@ -19,8 +19,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/redo" - "github.com/pingcap/tiflow/pkg/sorter" "go.uber.org/zap" ) @@ -48,7 +48,7 @@ var ( type sinkWorkerImpl struct { changefeedID model.ChangeFeedID - sortEngine sorter.EventSortEngine + sortEngine engine.SortEngine memQuota *memQuota eventCache *redoEventCache // splitTxn indicates whether to split the transaction into multiple batches. @@ -61,7 +61,7 @@ type sinkWorkerImpl struct { // newWorker creates a new worker. func newSinkWorker( changefeedID model.ChangeFeedID, - sortEngine sorter.EventSortEngine, + sortEngine engine.SortEngine, quota *memQuota, eventCache *redoEventCache, splitTxn bool, @@ -99,7 +99,7 @@ func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkT // Used to record the last written position. // We need to use it to update the lower bound of the table sink. - var lastPos sorter.Position + var lastPos engine.Position lastCommitTs := uint64(0) currentTotalSize := uint64(0) batchID := uint64(1) @@ -273,9 +273,9 @@ func (w *sinkWorkerImpl) handleTasks(ctx context.Context, taskChan <-chan *sinkT func (w *sinkWorkerImpl) fetchFromCache( task *sinkTask, // task is read-only here. - lowerBound sorter.Position, - upperBound sorter.Position, -) (sorter.Position, error) { + lowerBound engine.Position, + upperBound engine.Position, +) (engine.Position, error) { // Is it possible that after fetching something from cache, more events are // pushed into cache immediately? It's unlikely and if it happens, new events // are only available after resolvedTs has been advanced. So, here just pop one @@ -286,7 +286,7 @@ func (w *sinkWorkerImpl) fetchFromCache( w.memQuota.record(task.tableID, model.ResolvedTs{Ts: pos.CommitTs}, size) err := task.tableSink.updateResolvedTs(model.ResolvedTs{Ts: pos.CommitTs}) if err != nil { - return sorter.Position{}, err + return engine.Position{}, err } return pos.Next(), nil } @@ -314,7 +314,7 @@ func (w *sinkWorkerImpl) advanceTableSink(t *sinkTask, commitTs model.Ts, size u type redoWorkerImpl struct { changefeedID model.ChangeFeedID - sortEngine sorter.EventSortEngine + sortEngine engine.SortEngine memQuota *memQuota redoManager redo.LogManager eventCache *redoEventCache @@ -324,7 +324,7 @@ type redoWorkerImpl struct { func newRedoWorker( changefeedID model.ChangeFeedID, - sortEngine sorter.EventSortEngine, + sortEngine engine.SortEngine, quota *memQuota, redoManager redo.LogManager, eventCache *redoEventCache, @@ -367,7 +367,7 @@ func (w *redoWorkerImpl) handleTask(ctx context.Context, task *redoTask) error { memAllocated := true - var lastPos sorter.Position + var lastPos engine.Position maybeEmitBatchEvents := func(allFinished, txnFinished bool) error { if batchSize == 0 || (!allFinished && batchSize < requestMemSize) { return nil diff --git a/cdc/processor/sinkmanager/worker_impl_test.go b/cdc/processor/sinkmanager/worker_impl_test.go index 8a964ec32e8..46add798263 100644 --- a/cdc/processor/sinkmanager/worker_impl_test.go +++ b/cdc/processor/sinkmanager/worker_impl_test.go @@ -19,25 +19,25 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sorter" - "github.com/pingcap/tiflow/pkg/sorter/memory" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) func createWorker(changefeedID model.ChangeFeedID, memQuota uint64, splitTxn bool) sinkWorker { - sorterEngine := memory.New(context.Background()) + sortEngine := memory.New(context.Background()) quota := newMemQuota(changefeedID, memQuota) - return newSinkWorker(changefeedID, sorterEngine, quota, nil, splitTxn, false) + return newSinkWorker(changefeedID, sortEngine, quota, nil, splitTxn, false) } // nolint:unparam // It is ok to use the same tableID in test. -func addEventsToSorterEngine(t *testing.T, events []*model.PolymorphicEvent, sorterEngine sorter.EventSortEngine, tableID model.TableID) { - sorterEngine.AddTable(tableID) +func addEventsToSortEngine(t *testing.T, events []*model.PolymorphicEvent, sortEngine engine.SortEngine, tableID model.TableID) { + sortEngine.AddTable(tableID) for _, event := range events { - err := sorterEngine.Add(tableID, event) + err := sortEngine.Add(tableID, event) require.NoError(t, err) } } @@ -143,7 +143,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA } w := createWorker(changefeedID, eventSize, true) - addEventsToSorterEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -155,22 +155,22 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA }() wrapper, sink := createTableSinkWrapper(changefeedID, tableID) - lowerBoundPos := sorter.Position{ + lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, } - upperBoundGetter := func() sorter.Position { - return sorter.Position{ + upperBoundGetter := func() engine.Position { + return engine.Position{ StartTs: 3, CommitTs: 4, } } - callback := func(lastWritePos sorter.Position) { - require.Equal(suite.T(), sorter.Position{ + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ StartTs: 1, CommitTs: 3, }, lastWritePos) - require.Equal(suite.T(), sorter.Position{ + require.Equal(suite.T(), engine.Position{ StartTs: 2, CommitTs: 3, }, lastWritePos.Next()) @@ -258,7 +258,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA }, } w := createWorker(changefeedID, eventSize, true) - addEventsToSorterEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -270,22 +270,22 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndAbortWhenNoMemA }() wrapper, sink := createTableSinkWrapper(changefeedID, tableID) - lowerBoundPos := sorter.Position{ + lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, } - upperBoundGetter := func() sorter.Position { - return sorter.Position{ + upperBoundGetter := func() engine.Position { + return engine.Position{ StartTs: 3, CommitTs: 4, } } - callback := func(lastWritePos sorter.Position) { - require.Equal(suite.T(), sorter.Position{ + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ StartTs: 1, CommitTs: 2, }, lastWritePos) - require.Equal(suite.T(), sorter.Position{ + require.Equal(suite.T(), engine.Position{ StartTs: 2, CommitTs: 2, }, lastWritePos.Next()) @@ -385,7 +385,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndOnlyAdvanceTabl }, } w := createWorker(changefeedID, eventSize, true) - addEventsToSorterEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -397,22 +397,22 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithSplitTxnAndOnlyAdvanceTabl }() wrapper, sink := createTableSinkWrapper(changefeedID, tableID) - lowerBoundPos := sorter.Position{ + lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, } - upperBoundGetter := func() sorter.Position { - return sorter.Position{ + upperBoundGetter := func() engine.Position { + return engine.Position{ StartTs: 1, CommitTs: 2, } } - callback := func(lastWritePos sorter.Position) { - require.Equal(suite.T(), sorter.Position{ + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ StartTs: 1, CommitTs: 2, }, lastWritePos) - require.Equal(suite.T(), sorter.Position{ + require.Equal(suite.T(), engine.Position{ StartTs: 2, CommitTs: 2, }, lastWritePos.Next()) @@ -511,7 +511,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnAndAbortWhenNoM }, } w := createWorker(changefeedID, eventSize, false) - addEventsToSorterEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -523,22 +523,22 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnAndAbortWhenNoM }() wrapper, sink := createTableSinkWrapper(changefeedID, tableID) - lowerBoundPos := sorter.Position{ + lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, } - upperBoundGetter := func() sorter.Position { - return sorter.Position{ + upperBoundGetter := func() engine.Position { + return engine.Position{ StartTs: 3, CommitTs: 4, } } - callback := func(lastWritePos sorter.Position) { - require.Equal(suite.T(), sorter.Position{ + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ StartTs: 1, CommitTs: 3, }, lastWritePos) - require.Equal(suite.T(), sorter.Position{ + require.Equal(suite.T(), engine.Position{ StartTs: 2, CommitTs: 3, }, lastWritePos.Next()) @@ -636,7 +636,7 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnOnlyAdvanceTabl }, } w := createWorker(changefeedID, eventSize, false) - addEventsToSorterEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) + addEventsToSortEngine(suite.T(), events, w.(*sinkWorkerImpl).sortEngine, tableID) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -648,22 +648,22 @@ func (suite *workerSuite) TestReceiveTableSinkTaskWithoutSplitTxnOnlyAdvanceTabl }() wrapper, sink := createTableSinkWrapper(changefeedID, tableID) - lowerBoundPos := sorter.Position{ + lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, } - upperBoundGetter := func() sorter.Position { - return sorter.Position{ + upperBoundGetter := func() engine.Position { + return engine.Position{ StartTs: 3, CommitTs: 4, } } - callback := func(lastWritePos sorter.Position) { - require.Equal(suite.T(), sorter.Position{ + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ StartTs: 1, CommitTs: 3, }, lastWritePos) - require.Equal(suite.T(), sorter.Position{ + require.Equal(suite.T(), engine.Position{ StartTs: 2, CommitTs: 3, }, lastWritePos.Next()) diff --git a/pkg/sorter/sorter.go b/cdc/processor/sourcemanager/engine/engine.go similarity index 88% rename from pkg/sorter/sorter.go rename to cdc/processor/sourcemanager/engine/engine.go index 0c2333edaa5..da39938f321 100644 --- a/pkg/sorter/sorter.go +++ b/cdc/processor/sourcemanager/engine/engine.go @@ -11,16 +11,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sorter +package engine import ( "github.com/pingcap/tiflow/cdc/model" ) -// EventSortEngine is a storage engine to store and sort CDC events. -// Every changefeed will have one EventSortEngine instance. +// SortEngine is a storage engine to store and sort CDC events. +// Every changefeed will have one SortEngine instance. // NOTE: All interfaces are thread-safe. -type EventSortEngine interface { +type SortEngine interface { // IsTableBased tells whether the sort engine is based on table or not. // If it's based on table, fetching events by table is preferred. IsTableBased() bool @@ -40,7 +40,7 @@ type EventSortEngine interface { // GetResolvedTs gets resolved timestamp of the given table. GetResolvedTs(tableID model.TableID) model.Ts - // OnResolve pushes action into EventSortEngine's hook list, which + // OnResolve pushes action into SortEngine's hook list, which // will be called after any events are resolved. OnResolve(action func(model.TableID, model.Ts)) @@ -58,14 +58,14 @@ type EventSortEngine interface { // CleanByTable tells the engine events of the given table in the given range // (unlimited, upperBound] are committed and not necessary any more. - // The EventSortEngine instance can GC them later. + // The SortEngine instance can GC them later. // // NOTE: CleanByTable is always available even if IsTableBased returns false. CleanByTable(tableID model.TableID, upperBound Position) error // CleanAllTables tells the engine events of all tables in the given range // (unlimited, upperBound] are committed and not necessary any more. - // The EventSortEngine instance can GC them later. + // The SortEngine instance can GC them later. // // NOTE: It's only available if IsTableBased returns false. CleanAllTables(upperBound Position) error @@ -76,7 +76,7 @@ type EventSortEngine interface { Close() error } -// EventIterator is an iterator to fetch events from EventSortEngine. +// EventIterator is an iterator to fetch events from SortEngine. // It's unnecessary to be thread-safe. type EventIterator interface { // Next is used to fetch one event. nil indicates it reaches the stop point. @@ -93,7 +93,7 @@ type EventIterator interface { } // Position is used to -// 1. fetch or clear events from an engine, for example, see EventSortEngine.FetchByTable. +// 1. fetch or clear events from an engine, for example, see SortEngine.FetchByTable. // 2. calculate the next position with method Next. type Position struct { StartTs model.Ts diff --git a/pkg/sorter/manager/manager.go b/cdc/processor/sourcemanager/engine/factory/factory.go similarity index 73% rename from pkg/sorter/manager/manager.go rename to cdc/processor/sourcemanager/engine/factory/factory.go index 929b8837003..e2044b3178b 100644 --- a/pkg/sorter/manager/manager.go +++ b/cdc/processor/sourcemanager/engine/factory/factory.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package manager +package factory import ( "strconv" @@ -21,10 +21,10 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + epebble "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble" metrics "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sorter" - ngpebble "github.com/pingcap/tiflow/pkg/sorter/pebble" "go.uber.org/multierr" ) @@ -34,18 +34,18 @@ const ( // pebbleEngine details are in package document of pkg/sorter/pebble. pebbleEngine sortEngineType = iota + 1 - metricsCollectInterval time.Duration = 15 * time.Second + metricsCollectInterval = 15 * time.Second ) -// EventSortEngineManager is a manager to create or drop EventSortEngine. -type EventSortEngineManager struct { +// SortEngineFactory is a manager to create or drop SortEngine. +type SortEngineFactory struct { // Read-only fields. engineType sortEngineType dir string memQuotaInBytes uint64 mu sync.Mutex - engines map[model.ChangeFeedID]sorter.EventSortEngine + engines map[model.ChangeFeedID]engine.SortEngine wg sync.WaitGroup closed chan struct{} @@ -56,17 +56,17 @@ type EventSortEngineManager struct { writeStalls []writeStall } -// Create creates an EventSortEngine. If an engine with same ID already exists, +// Create creates a SortEngine. If an engine with same ID already exists, // it will be returned directly. -func (f *EventSortEngineManager) Create(ID model.ChangeFeedID) (engine sorter.EventSortEngine, err error) { +func (f *SortEngineFactory) Create(ID model.ChangeFeedID) (e engine.SortEngine, err error) { f.mu.Lock() defer f.mu.Unlock() switch f.engineType { case pebbleEngine: exists := false - if engine, exists = f.engines[ID]; exists { - return engine, nil + if e, exists = f.engines[ID]; exists { + return e, nil } if len(f.dbs) == 0 { f.dbs, f.writeStalls, err = createPebbleDBs(f.dir, f.pebbleConfig, f.memQuotaInBytes) @@ -74,8 +74,8 @@ func (f *EventSortEngineManager) Create(ID model.ChangeFeedID) (engine sorter.Ev return } } - engine = ngpebble.New(ID, f.dbs) - f.engines[ID] = engine + e = epebble.New(ID, f.dbs) + f.engines[ID] = e default: log.Panic("not implemented") } @@ -83,7 +83,7 @@ func (f *EventSortEngineManager) Create(ID model.ChangeFeedID) (engine sorter.Ev } // Drop cleans the given event sort engine. -func (f *EventSortEngineManager) Drop(ID model.ChangeFeedID) error { +func (f *SortEngineFactory) Drop(ID model.ChangeFeedID) error { f.mu.Lock() defer f.mu.Unlock() @@ -96,7 +96,7 @@ func (f *EventSortEngineManager) Drop(ID model.ChangeFeedID) error { } // Close will close all created engines and release all resources. -func (f *EventSortEngineManager) Close() (err error) { +func (f *SortEngineFactory) Close() (err error) { f.mu.Lock() defer f.mu.Unlock() @@ -112,9 +112,9 @@ func (f *EventSortEngineManager) Close() (err error) { return } -// NewForPebble will create a EventSortEngineManager for the pebble implementation. -func NewForPebble(dir string, memQuotaInBytes uint64, cfg *config.DBConfig) *EventSortEngineManager { - manager := &EventSortEngineManager{ +// NewForPebble will create a SortEngineFactory for the pebble implementation. +func NewForPebble(dir string, memQuotaInBytes uint64, cfg *config.DBConfig) *SortEngineFactory { + manager := &SortEngineFactory{ engineType: pebbleEngine, memQuotaInBytes: memQuotaInBytes, pebbleConfig: cfg, @@ -124,7 +124,7 @@ func NewForPebble(dir string, memQuotaInBytes uint64, cfg *config.DBConfig) *Eve return manager } -func (f *EventSortEngineManager) startMetricsCollector() { +func (f *SortEngineFactory) startMetricsCollector() { f.wg.Add(1) ticker := time.NewTicker(metricsCollectInterval) go func() { @@ -141,7 +141,7 @@ func (f *EventSortEngineManager) startMetricsCollector() { }() } -func (f *EventSortEngineManager) collectMetrics() { +func (f *SortEngineFactory) collectMetrics() { if f.engineType == pebbleEngine { for i, db := range f.dbs { stats := db.Metrics() diff --git a/pkg/sorter/manager/pebble.go b/cdc/processor/sourcemanager/engine/factory/pebble.go similarity index 92% rename from pkg/sorter/manager/pebble.go rename to cdc/processor/sourcemanager/engine/factory/pebble.go index 3ab7aa0cf74..f9c5376163a 100644 --- a/pkg/sorter/manager/pebble.go +++ b/cdc/processor/sourcemanager/engine/factory/pebble.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package manager +package factory import ( "fmt" @@ -20,8 +20,8 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/log" + epebble "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble" "github.com/pingcap/tiflow/pkg/config" - ngpebble "github.com/pingcap/tiflow/pkg/sorter/pebble" "go.uber.org/zap" ) @@ -50,7 +50,7 @@ func createPebbleDBs( } } - db, err := ngpebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust) + db, err := epebble.OpenPebble(id, dir, cfg, memQuotaInBytes/uint64(cfg.Count), adjust) if err != nil { log.Error("create pebble fails", zap.String("dir", dir), zap.Int("id", id), zap.Error(err)) for _, db := range dbs { diff --git a/pkg/sorter/memory/doc.go b/cdc/processor/sourcemanager/engine/memory/doc.go similarity index 100% rename from pkg/sorter/memory/doc.go rename to cdc/processor/sourcemanager/engine/memory/doc.go diff --git a/pkg/sorter/memory/event_sorter.go b/cdc/processor/sourcemanager/engine/memory/event_sorter.go similarity index 92% rename from pkg/sorter/memory/event_sorter.go rename to cdc/processor/sourcemanager/engine/memory/event_sorter.go index fc840fccec5..5e05c94106c 100644 --- a/pkg/sorter/memory/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter.go @@ -21,13 +21,13 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "go.uber.org/zap" ) var ( - _ sorter.EventSortEngine = (*EventSorter)(nil) - _ sorter.EventIterator = (*EventIter)(nil) + _ engine.SortEngine = (*EventSorter)(nil) + _ engine.EventIterator = (*EventIter)(nil) ) // EventSorter accepts out-of-order raw kv entries and output sorted entries. @@ -105,7 +105,7 @@ func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) { } // FetchByTable implements sorter.EventSortEngine. -func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound sorter.Position) sorter.EventIterator { +func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { value, exists := s.tables.Load(tableID) if !exists { log.Panic("fetch events from an unexist table", zap.Int64("tableID", tableID)) @@ -115,13 +115,13 @@ func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound } // FetchAllTables implements sorter.EventSortEngine. -func (s *EventSorter) FetchAllTables(lowerBound sorter.Position) sorter.EventIterator { +func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator { log.Panic("FetchAllTables should never be called") return nil } // CleanByTable implements sorter.EventSortEngine. -func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound sorter.Position) error { +func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error { value, exists := s.tables.Load(tableID) if !exists { log.Panic("clean an unexist table", zap.Int64("tableID", tableID)) @@ -132,7 +132,7 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound sorter.Posi } // CleanAllTables implements sorter.EventSortEngine. -func (s *EventSorter) CleanAllTables(upperBound sorter.Position) error { +func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { log.Panic("CleanAllTables should never be called") return nil } @@ -144,7 +144,7 @@ func (s *EventSorter) Close() error { } // Next implements sorter.EventIterator. -func (s *EventIter) Next() (event *model.PolymorphicEvent, txnFinished sorter.Position, err error) { +func (s *EventIter) Next() (event *model.PolymorphicEvent, txnFinished engine.Position, err error) { if len(s.resolved) == 0 { return } @@ -216,7 +216,7 @@ func (s *tableSorter) getResolvedTs() model.Ts { return *s.resolvedTs } -func (s *tableSorter) fetch(tableID model.TableID, lowerBound, upperBound sorter.Position) sorter.EventIterator { +func (s *tableSorter) fetch(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { s.mu.RLock() defer s.mu.RUnlock() @@ -239,7 +239,7 @@ func (s *tableSorter) fetch(tableID model.TableID, lowerBound, upperBound sorter return iter } -func (s *tableSorter) clean(tableID model.TableID, upperBound sorter.Position) { +func (s *tableSorter) clean(tableID model.TableID, upperBound engine.Position) { s.mu.Lock() defer s.mu.Unlock() if s.resolvedTs == nil || upperBound.CommitTs > *s.resolvedTs { diff --git a/pkg/sorter/memory/event_sorter_test.go b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go similarity index 96% rename from pkg/sorter/memory/event_sorter_test.go rename to cdc/processor/sourcemanager/engine/memory/event_sorter_test.go index 1efa82132ce..490b47ad5bc 100644 --- a/pkg/sorter/memory/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go @@ -18,7 +18,7 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/sorter" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/stretchr/testify/require" ) @@ -92,13 +92,13 @@ func TestEventSorter(t *testing.T) { es := New(context.Background()) es.AddTable(1) - var nextToFetch sorter.Position + var nextToFetch engine.Position for _, tc := range testCases { for _, entry := range tc.input { es.Add(1, model.NewPolymorphicEvent(entry)) } es.Add(1, model.NewResolvedPolymorphicEvent(0, tc.resolvedTs)) - iter := es.FetchByTable(1, nextToFetch, sorter.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs}) + iter := es.FetchByTable(1, nextToFetch, engine.Position{CommitTs: tc.resolvedTs, StartTs: tc.resolvedTs}) for _, expect := range tc.expect { event, pos, _ := iter.Next() require.NotNil(t, event) diff --git a/pkg/sorter/memory/main_test.go b/cdc/processor/sourcemanager/engine/memory/main_test.go similarity index 100% rename from pkg/sorter/memory/main_test.go rename to cdc/processor/sourcemanager/engine/memory/main_test.go diff --git a/pkg/sorter/pebble/db.go b/cdc/processor/sourcemanager/engine/pebble/db.go similarity index 95% rename from pkg/sorter/pebble/db.go rename to cdc/processor/sourcemanager/engine/pebble/db.go index 74e81ae15a2..3c48ff1aa07 100644 --- a/pkg/sorter/pebble/db.go +++ b/cdc/processor/sourcemanager/engine/pebble/db.go @@ -24,9 +24,9 @@ import ( "github.com/cockroachdb/pebble/bloom" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble/encoding" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sorter" - "github.com/pingcap/tiflow/pkg/sorter/pebble/encoding" "go.uber.org/zap" ) @@ -66,7 +66,7 @@ func (t *tableCRTsCollector) Name() string { func iterTable( db *pebble.DB, uniqueID uint32, tableID model.TableID, - lowerBound, upperBound sorter.Position, + lowerBound, upperBound engine.Position, ) *pebble.Iterator { // Pebble's iterator range is left-included but right-excluded. upperBoundNext := upperBound.Next() diff --git a/pkg/sorter/pebble/db_test.go b/cdc/processor/sourcemanager/engine/pebble/db_test.go similarity index 93% rename from pkg/sorter/pebble/db_test.go rename to cdc/processor/sourcemanager/engine/pebble/db_test.go index 4d94352a23c..16fbcc938a1 100644 --- a/pkg/sorter/pebble/db_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/db_test.go @@ -20,9 +20,9 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble/encoding" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sorter" - "github.com/pingcap/tiflow/pkg/sorter/pebble/encoding" "github.com/stretchr/testify/require" ) @@ -93,8 +93,8 @@ func TestIteratorWithTableFilter(t *testing.T) { count := 0 for tableID := 0; tableID <= 9; tableID++ { iter := iterTable(db, 1, model.TableID(tableID), - sorter.Position{CommitTs: x.lowerTs}, - sorter.Position{CommitTs: x.upperTs}.Next()) + engine.Position{CommitTs: x.lowerTs}, + engine.Position{CommitTs: x.upperTs}.Next()) valid := iter.Valid() for valid { count += 1 diff --git a/pkg/sorter/pebble/doc.go b/cdc/processor/sourcemanager/engine/pebble/doc.go similarity index 100% rename from pkg/sorter/pebble/doc.go rename to cdc/processor/sourcemanager/engine/pebble/doc.go diff --git a/pkg/sorter/pebble/encoding/key.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key.go similarity index 100% rename from pkg/sorter/pebble/encoding/key.go rename to cdc/processor/sourcemanager/engine/pebble/encoding/key.go diff --git a/pkg/sorter/pebble/encoding/key_test.go b/cdc/processor/sourcemanager/engine/pebble/encoding/key_test.go similarity index 100% rename from pkg/sorter/pebble/encoding/key_test.go rename to cdc/processor/sourcemanager/engine/pebble/encoding/key_test.go diff --git a/pkg/sorter/pebble/encoding/value.go b/cdc/processor/sourcemanager/engine/pebble/encoding/value.go similarity index 100% rename from pkg/sorter/pebble/encoding/value.go rename to cdc/processor/sourcemanager/engine/pebble/encoding/value.go diff --git a/pkg/sorter/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go similarity index 93% rename from pkg/sorter/pebble/event_sorter.go rename to cdc/processor/sourcemanager/engine/pebble/event_sorter.go index 684f73d0641..7447ff026b9 100644 --- a/pkg/sorter/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -23,15 +23,15 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/pebble/encoding" "github.com/pingcap/tiflow/pkg/chann" - "github.com/pingcap/tiflow/pkg/sorter" - "github.com/pingcap/tiflow/pkg/sorter/pebble/encoding" "go.uber.org/zap" ) var ( - _ sorter.EventSortEngine = (*EventSorter)(nil) - _ sorter.EventIterator = (*EventIter)(nil) + _ engine.SortEngine = (*EventSorter)(nil) + _ engine.EventIterator = (*EventIter)(nil) ) // EventSorter is an event sort engine. @@ -165,8 +165,8 @@ func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) { // FetchByTable implements sorter.EventSortEngine. func (s *EventSorter) FetchByTable( tableID model.TableID, - lowerBound, upperBound sorter.Position, -) sorter.EventIterator { + lowerBound, upperBound engine.Position, +) engine.EventIterator { s.mu.RLock() state, exists := s.tables[tableID] s.mu.RUnlock() @@ -189,7 +189,7 @@ func (s *EventSorter) FetchByTable( } // FetchAllTables implements sorter.EventSortEngine. -func (s *EventSorter) FetchAllTables(lowerBound sorter.Position) sorter.EventIterator { +func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator { log.Panic("FetchAllTables should never be called", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) @@ -197,7 +197,7 @@ func (s *EventSorter) FetchAllTables(lowerBound sorter.Position) sorter.EventIte } // CleanByTable implements sorter.EventSortEngine. -func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound sorter.Position) error { +func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error { s.mu.RLock() state, exists := s.tables[tableID] s.mu.RUnlock() @@ -212,7 +212,7 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound sorter.Posi } // CleanAllTables implements sorter.EventSortEngine. -func (s *EventSorter) CleanAllTables(upperBound sorter.Position) error { +func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { log.Panic("CleanAllTables should never be called", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) @@ -240,7 +240,7 @@ func (s *EventSorter) Close() error { } // Next implements sorter.EventIterator. -func (s *EventIter) Next() (event *model.PolymorphicEvent, pos sorter.Position, err error) { +func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, err error) { valid := s.iter != nil && s.iter.Valid() var value []byte for valid { @@ -284,7 +284,7 @@ type tableState struct { // Following fields are protected by mu. mu sync.RWMutex - cleaned sorter.Position + cleaned engine.Position } func (s *EventSorter) handleEvents(db *pebble.DB, inputCh <-chan eventWithTableID) { @@ -354,14 +354,14 @@ func (s *EventSorter) handleEvents(db *pebble.DB, inputCh <-chan eventWithTableI } // cleanTable uses DeleteRange to clean data of the given table. -func (s *EventSorter) cleanTable(state *tableState, tableID model.TableID, upperBound ...sorter.Position) error { - var toClean sorter.Position +func (s *EventSorter) cleanTable(state *tableState, tableID model.TableID, upperBound ...engine.Position) error { + var toClean engine.Position var start, end []byte if len(upperBound) == 1 { toClean = upperBound[0] } else { - toClean = sorter.Position{CommitTs: math.MaxUint64, StartTs: math.MaxUint64 - 1} + toClean = engine.Position{CommitTs: math.MaxUint64, StartTs: math.MaxUint64 - 1} } state.mu.RLock() diff --git a/pkg/sorter/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go similarity index 90% rename from pkg/sorter/pebble/event_sorter_test.go rename to cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index cf483735c22..8a0b62a4122 100644 --- a/pkg/sorter/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -21,8 +21,8 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sorter" "github.com/stretchr/testify/require" ) @@ -68,7 +68,7 @@ func TestNoResolvedTs(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) select { case ts := <-resolvedTs: - iter := s.FetchByTable(model.TableID(1), sorter.Position{}, sorter.Position{CommitTs: ts}) + iter := s.FetchByTable(model.TableID(1), engine.Position{}, engine.Position{CommitTs: ts}) event, _, err := iter.Next() require.Nil(t, event) require.Nil(t, err) @@ -125,12 +125,12 @@ func TestEventFetch(t *testing.T) { s.Add(model.TableID(1), model.NewResolvedPolymorphicEvent(0, 4)) sortedEvents := make([]*model.PolymorphicEvent, 0, len(inputEvents)) - sortedPositions := make([]sorter.Position, 0, len(inputEvents)) + sortedPositions := make([]engine.Position, 0, len(inputEvents)) timer := time.NewTimer(100 * time.Millisecond) select { case ts := <-resolvedTs: - iter := s.FetchByTable(model.TableID(1), sorter.Position{}, sorter.Position{CommitTs: ts, StartTs: ts - 1}) + iter := s.FetchByTable(model.TableID(1), engine.Position{}, engine.Position{CommitTs: ts, StartTs: ts - 1}) for { event, pos, err := iter.Next() require.Nil(t, err) @@ -150,7 +150,7 @@ func TestEventFetch(t *testing.T) { require.Equal(t, inputEvents, sortedEvents) - expectPositions := []sorter.Position{ + expectPositions := []engine.Position{ {CommitTs: 0, StartTs: 0}, {CommitTs: 2, StartTs: 1}, {CommitTs: 4, StartTs: 2}, @@ -172,6 +172,6 @@ func TestCleanData(t *testing.T) { require.True(t, s.IsTableBased()) s.AddTable(1) - require.Panics(t, func() { s.CleanByTable(2, sorter.Position{}) }) - require.Nil(t, s.CleanByTable(1, sorter.Position{})) + require.Panics(t, func() { s.CleanByTable(2, engine.Position{}) }) + require.Nil(t, s.CleanByTable(1, engine.Position{})) } diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go new file mode 100644 index 00000000000..e8e71814b74 --- /dev/null +++ b/cdc/processor/sourcemanager/manager.go @@ -0,0 +1,114 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sourcemanager + +import ( + "sync" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller" + cdccontext "github.com/pingcap/tiflow/pkg/context" + cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/upstream" +) + +// SourceManager is the manager of the source engine and puller. +type SourceManager struct { + // changefeedID is the changefeed ID. + // We use it to create the puller and log. + changefeedID model.ChangeFeedID + // up is the upstream of the puller. + up *upstream.Upstream + // engine is the source engine. + engine engine.SortEngine + // pullers is the puller wrapper map. + pullers sync.Map + // Used to report the error to the processor. + errChan chan error +} + +// New creates a new source manager. +func New( + changefeedID model.ChangeFeedID, + up *upstream.Upstream, + engine engine.SortEngine, + errChan chan error, +) *SourceManager { + return &SourceManager{ + changefeedID: changefeedID, + up: up, + engine: engine, + errChan: errChan, + } +} + +// IsTableBased just wrap the engine's IsTableBased method. +func (m *SourceManager) IsTableBased() bool { + return m.engine.IsTableBased() +} + +// AddTable adds a table to the source manager. Start puller and register table to the engine. +func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts) { + p := puller.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs) + p.Start(ctx, m.up, m.engine, m.errChan) + m.pullers.Store(tableID, p) + m.engine.AddTable(tableID) +} + +// RemoveTable removes a table from the source manager. Stop puller and unregister table from the engine. +func (m *SourceManager) RemoveTable(tableID model.TableID) { + if wrapper, ok := m.pullers.Load(tableID); ok { + wrapper.(*puller.Wrapper).Close() + m.pullers.Delete(tableID) + } + m.engine.RemoveTable(tableID) +} + +// OnResolve just wrap the engine's OnResolve method. +func (m *SourceManager) OnResolve(action func(model.TableID, model.Ts)) { + m.engine.OnResolve(action) +} + +// FetchByTable just wrap the engine's FetchByTable method. +func (m *SourceManager) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { + return m.engine.FetchByTable(tableID, lowerBound, upperBound) +} + +// FetchAllTables just wrap the engine's FetchAllTables method. +func (m *SourceManager) FetchAllTables(lowerBound engine.Position) engine.EventIterator { + return m.engine.FetchAllTables(lowerBound) +} + +// CleanByTable just wrap the engine's CleanByTable method. +func (m *SourceManager) CleanByTable(tableID model.TableID, upperBound engine.Position) error { + return m.engine.CleanByTable(tableID, upperBound) +} + +// CleanAllTables just wrap the engine's CleanAllTables method. +func (m *SourceManager) CleanAllTables(upperBound engine.Position) error { + return m.engine.CleanAllTables(upperBound) +} + +// Close closes the source manager. Stop all pullers and close the engine. +func (m *SourceManager) Close() error { + m.pullers.Range(func(key, value interface{}) bool { + value.(*puller.Wrapper).Close() + return true + }) + if err := m.engine.Close(); err != nil { + return cerrors.Trace(err) + } + return nil +} diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go new file mode 100644 index 00000000000..a01b0d2dedb --- /dev/null +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -0,0 +1,128 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package puller + +import ( + "context" + "sync" + + "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/puller" + "github.com/pingcap/tiflow/pkg/config" + cdccontext "github.com/pingcap/tiflow/pkg/context" + "github.com/pingcap/tiflow/pkg/regionspan" + "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" +) + +// Wrapper is a wrapper of puller used by source manager. +type Wrapper struct { + changefeed model.ChangeFeedID + tableID model.TableID + tableName string // quoted schema and table, used in metircs only + p puller.Puller + startTs model.Ts + // cancel is used to cancel the puller when remove or close the table. + cancel context.CancelFunc + // wg is used to wait the puller to exit. + wg *sync.WaitGroup +} + +// NewPullerWrapper creates a new puller wrapper. +func NewPullerWrapper( + changefeed model.ChangeFeedID, + tableID model.TableID, + tableName string, + startTs model.Ts, +) *Wrapper { + return &Wrapper{ + changefeed: changefeed, + tableID: tableID, + tableName: tableName, + startTs: startTs, + } +} + +// tableSpan returns the table span with the table ID. +func (n *Wrapper) tableSpan() []regionspan.Span { + // start table puller + spans := make([]regionspan.Span, 0, 4) + spans = append(spans, regionspan.GetTableSpan(n.tableID)) + return spans +} + +// Start the puller wrapper. +// We use cdc context to put capture info and role into context. +func (n *Wrapper) Start( + ctx cdccontext.Context, + up *upstream.Upstream, + eventSortEngine engine.SortEngine, + errChan chan<- error, +) { + ctxC, cancel := context.WithCancel(ctx) + ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) + ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor) + kvCfg := config.GetGlobalServerConfig().KVClient + // NOTICE: always pull the old value internally + // See also: https://github.com/pingcap/tiflow/issues/2301. + n.p = puller.New( + ctxC, + up.PDClient, + up.GrpcPool, + up.RegionCache, + up.KVStorage, + up.PDClock, + n.startTs, + n.tableSpan(), + kvCfg, + n.changefeed, + n.tableID, + n.tableName, + ) + n.wg.Add(1) + go func() { + defer n.wg.Done() + err := n.p.Run(ctxC) + if err != nil { + errChan <- err + } + }() + n.wg.Add(1) + go func() { + defer n.wg.Done() + for { + select { + case <-ctxC.Done(): + return + case rawKV := <-n.p.Output(): + if rawKV == nil { + continue + } + pEvent := model.NewPolymorphicEvent(rawKV) + if err := eventSortEngine.Add(n.tableID, pEvent); err != nil { + errChan <- err + } + } + } + }() + n.cancel = cancel +} + +// Close the puller wrapper. +func (n *Wrapper) Close() { + n.cancel() + n.wg.Wait() +} diff --git a/cdc/server/server.go b/cdc/server/server.go index 04b23eac4ae..ad8c8faac4a 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/processor/pipeline/system" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system" "github.com/pingcap/tiflow/cdc/sorter/unified" "github.com/pingcap/tiflow/pkg/config" @@ -39,7 +40,6 @@ import ( "github.com/pingcap/tiflow/pkg/fsutil" "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/pdutil" - sortmgr "github.com/pingcap/tiflow/pkg/sorter/manager" "github.com/pingcap/tiflow/pkg/tcpserver" p2pProto "github.com/pingcap/tiflow/proto/p2p" pd "github.com/tikv/pd/client" @@ -89,7 +89,7 @@ type server struct { // If it's true sortEngineManager will be used, otherwise sorterSystem will be used. useEventSortEngine bool - sortEngineManager *sortmgr.EventSortEngineManager + sortEngineFactory *factory.SortEngineFactory sorterSystem *ssystem.System } @@ -197,7 +197,7 @@ func (s *server) prepare(ctx context.Context) error { s.capture = capture.NewCapture( s.pdEndpoints, cdcEtcdClient, s.grpcService, - s.tableActorSystem, s.sortEngineManager, s.sorterSystem) + s.tableActorSystem, s.sortEngineFactory, s.sorterSystem) return nil } @@ -214,11 +214,11 @@ func (s *server) startActorSystems(ctx context.Context) error { return nil } - if s.useEventSortEngine && s.sortEngineManager != nil { - if err := s.sortEngineManager.Close(); err != nil { + if s.useEventSortEngine && s.sortEngineFactory != nil { + if err := s.sortEngineFactory.Close(); err != nil { log.Error("fails to close sort engine manager", zap.Error(err)) } - s.sortEngineManager = nil + s.sortEngineFactory = nil } if !s.useEventSortEngine && s.sorterSystem != nil { s.sorterSystem.Stop() @@ -236,7 +236,7 @@ func (s *server) startActorSystems(ctx context.Context) error { memPercentage := float64(conf.Sorter.MaxMemoryPercentage) / 100 memInBytes := uint64(float64(totalMemory) * memPercentage) if config.GetGlobalServerConfig().Debug.EnableDBSorter { - s.sortEngineManager = sortmgr.NewForPebble(sortDir, memInBytes, conf.Debug.DB) + s.sortEngineFactory = factory.NewForPebble(sortDir, memInBytes, conf.Debug.DB) } else { panic("only pebble is transformed to EventSortEngine") } @@ -430,8 +430,8 @@ func (s *server) stopActorSystems() { log.Info("table actor system closed", zap.Duration("duration", time.Since(start))) start = time.Now() - if s.useEventSortEngine && s.sortEngineManager != nil { - if err := s.sortEngineManager.Close(); err != nil { + if s.useEventSortEngine && s.sortEngineFactory != nil { + if err := s.sortEngineFactory.Close(); err != nil { log.Error("fails to close sort engine manager", zap.Error(err)) } log.Info("sort engine manager closed", zap.Duration("duration", time.Since(start))) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 6002a91205d..40920aab451 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1049,6 +1049,7 @@ var doc = `{ "type": "object", "properties": { "table_ids": { + "description": "All table ids that this processor are replicating.", "type": "array", "items": { "type": "integer" diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 778bd6dfd5b..84097dcf0c0 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1030,6 +1030,7 @@ "type": "object", "properties": { "table_ids": { + "description": "All table ids that this processor are replicating.", "type": "array", "items": { "type": "integer" diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index a871810ca34..06b39b4cb45 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -206,6 +206,7 @@ definitions: model.ProcessorDetail: properties: table_ids: + description: All table ids that this processor are replicating. items: type: integer type: array diff --git a/pkg/context/context.go b/pkg/context/context.go index 53b7e9ca97c..0712a5e3f8d 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -20,11 +20,11 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/pipeline/system" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory" ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" - sortmgr "github.com/pingcap/tiflow/pkg/sorter/manager" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -42,7 +42,7 @@ type GlobalVars struct { // TODO(qupeng): remove SorterSystem after all sorters are transformed // to adapt pull-based sinks. SorterSystem *ssystem.System - SortEngineManager *sortmgr.EventSortEngineManager + SortEngineFactory *factory.SortEngineFactory // OwnerRevision is the Etcd revision when the owner got elected. OwnerRevision int64