Skip to content

Commit

Permalink
eventService: don't drop real scan task (#586)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen authored Nov 23, 2024
1 parent 948db4c commit baa5960
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 70 deletions.
5 changes: 4 additions & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,10 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []kvEvents) error {
}
}
metrics.EventStoreWriteBytes.Add(float64(batch.Len()))
return batch.Commit(pebble.NoSync)
start := time.Now()
err := batch.Commit(pebble.NoSync)
metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()))
return err
}

func (e *eventStore) deleteEvents(dbIndex int, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error {
Expand Down
96 changes: 90 additions & 6 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3085,7 +3085,7 @@
"targets": [
{
"exemplar": true,
"expr": "ticdc_event_store_max_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}",
"expr": "ticdc_event_store_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}",
"interval": "",
"legendFormat": "{{instance}}",
"queryType": "randomWalk",
Expand Down Expand Up @@ -3744,6 +3744,90 @@
"align": false,
"alignLevel": null
}
},
{
"cards": {
"cardPadding": 0,
"cardRound": 0
},
"color": {
"cardColor": "#FF9830",
"colorScale": "linear",
"colorScheme": "interpolateSpectral",
"exponent": 0.5,
"min": 0,
"mode": "spectrum"
},
"dataFormat": "tsbuckets",
"datasource": "${DS_C1}",
"description": "The time of sorter write",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 38
},
"heatmap": {},
"hideZeroBuckets": true,
"highlightCards": true,
"id": 274,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": true,
"total": false,
"values": true
},
"links": [],
"maxPerRow": 3,
"repeatDirection": "h",
"reverseYBuckets": false,
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_event_store_write_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (le)",
"format": "heatmap",
"instant": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{le}}",
"refId": "A"
}
],
"title": "Write duration",
"tooltip": {
"show": true,
"showHistogram": true
},
"tooltipDecimals": 1,
"type": "heatmap",
"xAxis": {
"show": true
},
"xBucketNumber": null,
"xBucketSize": null,
"yAxis": {
"decimals": 1,
"format": "s",
"logBase": 1,
"max": null,
"min": null,
"show": true,
"splitFactor": null
},
"yBucketBound": "upper",
"yBucketNumber": null,
"yBucketSize": null
}
],
"title": "Event Store",
Expand Down Expand Up @@ -4562,7 +4646,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_event_service_drop_notification_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", instance=~\"$ticdc_instance\"}[1m])) by (namespace, instance)",
"expr": "sum(rate(ticdc_event_service_pending_scan_task_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", namespace=~\"$namespace\", instance=~\"$ticdc_instance\"}[1m])) by (namespace, instance)",
"hide": false,
"interval": "",
"legendFormat": "dropped-notification-{{instance}}",
Expand All @@ -4574,7 +4658,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Event Service Dropped Notification",
"title": "Event Service Pending Scan Task",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -4660,10 +4744,10 @@
"targets": [
{
"exemplar": true,
"expr": "sum(rate(ticdc_event_service_drop_scan_task_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance)",
"expr": "sum(rate(ticdc_event_service_scan_task_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$ticdc_instance\"}[1m])) by (instance)",
"hide": false,
"interval": "",
"legendFormat": "dropped-scan-task-{{instance}}",
"legendFormat": "finished-scan-task-{{instance}}",
"queryType": "randomWalk",
"refId": "A"
}
Expand All @@ -4672,7 +4756,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Droped scan task count",
"title": "Event Service Finished Scan Task Count",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type DebugConfig struct {
Puller *PullerConfig `toml:"puller" json:"puller"`

SchemaStore *SchemaStoreConfig `toml:"schema-store" json:"schema-store"`

EventService *EventServiceConfig `toml:"event-service" json:"event-service"`
}

// ValidateAndAdjust validates and adjusts the debug configuration
Expand Down Expand Up @@ -79,3 +81,15 @@ func NewDefaultSchemaStoreConfig() *SchemaStoreConfig {
EnableGC: false,
}
}

// EventServiceConfig represents config for event service
type EventServiceConfig struct {
ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan-task-queue-size"`
}

// NewDefaultEventServiceConfig return the default event service configuration
func NewDefaultEventServiceConfig() *EventServiceConfig {
return &EventServiceConfig{
ScanTaskQueueSize: 1024 * 8,
}
}
7 changes: 4 additions & 3 deletions pkg/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ var defaultServerConfig = &ServerConfig{
DB: NewDefaultDBConfig(),
Messages: defaultMessageConfig.Clone(),

Scheduler: NewDefaultSchedulerConfig(),
Puller: NewDefaultPullerConfig(),
SchemaStore: NewDefaultSchemaStoreConfig(),
Scheduler: NewDefaultSchedulerConfig(),
Puller: NewDefaultPullerConfig(),
SchemaStore: NewDefaultSchemaStoreConfig(),
EventService: NewDefaultEventServiceConfig(),
},
ClusterID: "default",
GcTunerMemoryThreshold: DisableMemoryLimit,
Expand Down
78 changes: 29 additions & 49 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/utils/dynstream"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/eventstore"
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -35,10 +35,10 @@ const (

var metricEventServiceSendEventDuration = metrics.EventServiceSendEventDuration.WithLabelValues("txn")
var metricEventBrokerDropTaskCount = metrics.EventServiceDropScanTaskCount
var metricEventBrokerDropResolvedTsCount = metrics.EventServiceDropResolvedTsCount
var metricEventBrokerScanTaskCount = metrics.EventServiceScanTaskCount
var metricScanTaskQueueDuration = metrics.EventServiceScanTaskQueueDuration
var metricEventBrokerTaskHandleDuration = metrics.EventServiceTaskHandleDuration
var metricEventBrokerDropNotificationCount = metrics.EventServiceDropNotificationCount
var metricEventBrokerPendingScanTaskCount = metrics.EventServicePendingScanTaskCount
var metricEventBrokerDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("event-broker")
var metricEventBrokerDSChannelSize = metrics.DynamicStreamEventChanSize.WithLabelValues("event-broker")

Expand All @@ -62,7 +62,7 @@ type eventBroker struct {
tableTriggerDispatchers sync.Map
// taskPool is used to store the scan tasks and merge the tasks of same dispatcher.
// TODO: Make it support merge the tasks of the same table span, even if the tasks are from different dispatchers.
taskPool *scanTaskPool
taskQueue *chann.UnlimitedChannel[scanTask]

// GID here is the internal changefeedID, use to identify the area of the dispatcher.
ds dynstream.DynamicStream[common.GID, common.DispatcherID, scanTask, *eventBroker, *dispatcherEventsHandler]
Expand Down Expand Up @@ -114,6 +114,8 @@ func newEventBroker(
messageWorkerCount = streamCount
}

//conf := config.GetGlobalServerConfig().Debug.EventService

c := &eventBroker{
tidbClusterID: id,
eventStore: eventStore,
Expand All @@ -122,7 +124,7 @@ func newEventBroker(
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
msgSender: mc,
taskPool: newScanTaskPool(),
taskQueue: chann.NewUnlimitedChannel[scanTask](),
scanWorkerCount: defaultScanWorkerCount,
ds: ds,
messageCh: make([]chan wrapEvent, messageWorkerCount),
Expand Down Expand Up @@ -205,16 +207,17 @@ func (c *eventBroker) getMessageCh(workerIndex int) chan wrapEvent {

func (c *eventBroker) runScanWorker(ctx context.Context) {
for i := 0; i < c.scanWorkerCount; i++ {
taskCh := c.taskPool.popTask()
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
case task := <-taskCh:
c.doScan(ctx, task)
default:
task, ok := c.taskQueue.Get()
if ok {
c.doScan(ctx, task)
}
}
}
}()
Expand Down Expand Up @@ -440,16 +443,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
log.Panic("get ddl events failed", zap.Error(err))
}

//2. Get event iterator from eventStore.
iter, err := c.eventStore.GetIterator(dispatcherID, dataRange)
if err != nil {
log.Panic("read events failed", zap.Error(err))
}
// TODO: use error to indicate the dispatcher is removed
if iter == nil {
return
}

// After all the events are sent, we need to
// drain the ddlEvents and wake up the dispatcher.
defer func() {
Expand All @@ -464,11 +457,22 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
task.metricEventServiceSendResolvedTsCount)
}()

//2. Get event iterator from eventStore.
iter, err := c.eventStore.GetIterator(dispatcherID, dataRange)
if err != nil {
log.Panic("read events failed", zap.Error(err))
}
// TODO: use error to indicate the dispatcher is removed
if iter == nil {
return
}

defer func() {
eventCount, _ := iter.Close()
if eventCount != 0 {
task.metricSorterOutputEventCountKV.Add(float64(eventCount))
}
metricEventBrokerScanTaskCount.Inc()
}()

sendDML := func(dml *pevent.DMLEvent) {
Expand Down Expand Up @@ -663,6 +667,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
dsMetrics := c.ds.GetMetrics()
metricEventBrokerDSChannelSize.Set(float64(dsMetrics.EventChanSize))
metricEventBrokerDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen))
metricEventBrokerPendingScanTaskCount.Set(float64(c.taskQueue.Len()))
}
}
}()
Expand Down Expand Up @@ -946,33 +951,8 @@ func (t scanTask) handle() {
//metricScanTaskQueueDuration.Observe(float64(time.Since(t.createTime).Milliseconds()))
}

type scanTaskPool struct {
// pendingTaskQueue is used to store the tasks that are waiting to be handled by the scan workers.
// The length of the pendingTaskQueue is equal to the number of the scan workers.
pendingTaskQueue chan scanTask
}

func newScanTaskPool() *scanTaskPool {
return &scanTaskPool{
pendingTaskQueue: make(chan scanTask, defaultChannelSize),
}
}

// pushTask pushes a task to the pool,
// and merge the task if the task is overlapped with the existing tasks.
func (p *scanTaskPool) pushTask(task scanTask) bool {
select {
case p.pendingTaskQueue <- task:
return true
default:
metricEventBrokerDropTaskCount.Inc()
// If the queue is full, we just drop the task
return false
}
}

func (p *scanTaskPool) popTask() <-chan scanTask {
return p.pendingTaskQueue
func (t scanTask) GetKey() common.DispatcherID {
return t.id
}

type wrapEvent struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/eventservice/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask)
return false
}
// The dispatcher has new events. We need to push the task to the task pool.
return broker.taskPool.pushTask(task)
broker.taskQueue.Push(task)
return true
}

func (h *dispatcherEventsHandler) GetType(event scanTask) dynstream.EventType {
Expand Down
Loading

0 comments on commit baa5960

Please sign in to comment.