Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollup PR #5499, #5269, #5558, #4643, #4375, #4235 and #5477 #5542

Merged
Merged
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ fmt: tools/bin/gofumports tools/bin/shfmt
tools/bin/gofumports -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run shfmt"
tools/bin/shfmt -d -w .
@echo "check log style"
scripts/check-log-style.sh

errdoc: tools/bin/errdoc-gen
@echo "generator errors.toml"
Expand Down
10 changes: 5 additions & 5 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (c *Capture) reset(ctx context.Context) error {
}

log.Info("init capture",
zap.String("capture-id", c.info.ID),
zap.String("capture-addr", c.info.AdvertiseAddr))
zap.String("captureID", c.info.ID),
zap.String("captureAddr", c.info.AdvertiseAddr))
return nil
}

Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *Capture) Run(ctx context.Context) error {
// 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block
// TODO: make sure the internal cancel should return the real error instead of context.Canceled
if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) {
log.Info("capture recovered", zap.String("capture-id", c.info.ID))
log.Info("capture recovered", zap.String("captureID", c.info.ID))
continue
}
return errors.Trace(err)
Expand Down Expand Up @@ -422,8 +422,8 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}

log.Info("campaign owner successfully",
zap.String("capture-id", c.info.ID),
zap.Int64("owner-rev", ownerRev))
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

owner := c.newOwner(c.pdClient)
c.setOwner(owner)
Expand Down
5 changes: 4 additions & 1 deletion cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
if err != nil {
return nil, err
}
if len(raw.OldValue) == 0 && len(raw.Value) == 0 {
log.Warn("empty value and old value", zap.Any("row", raw))
}
baseInfo := baseKVEntry{
StartTs: raw.StartTs,
CRTs: raw.CRTs,
Expand Down Expand Up @@ -538,7 +541,7 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e
default:
d = table.GetZeroValue(col)
if d.IsNull() {
log.Error("meet unsupported column type", zap.String("column info", col.String()))
log.Error("meet unsupported column type", zap.String("columnInfo", col.String()))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema
logTime = now
}
return err
}, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable))
}, retry.WithBackoffBaseDelay(10), retry.WithIsRetryableErr(isRetryable))

return snap, err
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func logMiddleware() gin.HandlerFunc {
zap.String("ip", c.ClientIP()),
zap.String("user-agent", c.Request.UserAgent()),
zap.Error(stdErr),
zap.Duration("cost", cost),
zap.Duration("duration", cost),
)
}
}
Expand Down
48 changes: 23 additions & 25 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ type CDCKVClient interface {
ctx context.Context,
span regionspan.ComparableSpan,
ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
Expand All @@ -307,6 +306,7 @@ var NewCDCKVClient = NewCDCClient
type CDCClient struct {
pd pd.Client

config *config.KVClientConfig
clusterID uint64

grpcPool GrpcPool
Expand All @@ -318,7 +318,14 @@ type CDCClient struct {
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grpcPool GrpcPool, regionCache *tikv.RegionCache) (c CDCKVClient) {
func NewCDCClient(
ctx context.Context,
pd pd.Client,
kvStorage tikv.Storage,
grpcPool GrpcPool,
regionCache *tikv.RegionCache,
cfg *config.KVClientConfig,
) (c CDCKVClient) {
clusterID := pd.GetClusterID(ctx)

var store TiKVStorage
Expand All @@ -333,6 +340,7 @@ func NewCDCClient(ctx context.Context, pd pd.Client, kvStorage tikv.Storage, grp

c = &CDCClient{
clusterID: clusterID,
config: cfg,
pd: pd,
kvStorage: store,
grpcPool: grpcPool,
Expand Down Expand Up @@ -393,14 +401,12 @@ type PullerInitialization interface {
// The `Start` and `End` field in input span must be memcomparable encoded.
func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span,
lockResolver, isPullerInit,
enableOldValue, ts, eventCh)
lockResolver, isPullerInit, ts, eventCh)
return s.eventFeed(ctx, ts)
}

Expand Down Expand Up @@ -441,8 +447,7 @@ type eventFeedSession struct {
// The queue is used to store region that reaches limit
rateLimitQueue []regionErrorInfo

rangeLock *regionspan.RegionRangeLock
enableOldValue bool
rangeLock *regionspan.RegionRangeLock

// To identify metrics of different eventFeedSession
id string
Expand All @@ -468,25 +473,22 @@ func newEventFeedSession(
totalSpan regionspan.ComparableSpan,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
enableOldValue bool,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
return &eventFeedSession{
client: client,
regionCache: regionCache,
kvStorage: kvStorage,
totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit),
regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit),
regionCh: make(chan singleRegionInfo, defaultRegionChanSize),
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize),
rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs),
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
id: id,
Expand Down Expand Up @@ -693,11 +695,8 @@ func (s *eventFeedSession) requestRegionToStore(
}
requestID := allocID()

extraOp := kvrpcpb.ExtraOp_Noop
if s.enableOldValue {
extraOp = kvrpcpb.ExtraOp_ReadOldValue
}

// Always read old value.
extraOp := kvrpcpb.ExtraOp_ReadOldValue
rpcCtx := sri.rpcCtx
regionID := rpcCtx.Meta.GetId()
req := &cdcpb.ChangeDataRequest{
Expand Down Expand Up @@ -897,6 +896,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
nextSpan := span
captureAddr := util.CaptureAddrFromCtx(ctx)

// Max backoff 500ms.
scanRegionMaxBackoff := int64(500)
for {
var (
regions []*tikv.Region
Expand Down Expand Up @@ -926,7 +927,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
}
log.Debug("ScanRegions", zap.Stringer("span", nextSpan), zap.Reflect("regions", metas))
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithMaxTries(100), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffMaxDelay(scanRegionMaxBackoff),
retry.WithTotalRetryDuratoin(time.Duration(s.client.config.RegionRetryDuration)))
if retryErr != nil {
return retryErr
}
Expand Down Expand Up @@ -1105,7 +1107,7 @@ func (s *eventFeedSession) receiveFromStream(
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

// always create a new region worker, because `receiveFromStreamV2` is ensured
// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)

Expand Down Expand Up @@ -1180,7 +1182,7 @@ func (s *eventFeedSession) receiveFromStream(
regionCount = len(cevent.ResolvedTs.Regions)
}
log.Warn("change data event size too large",
zap.Int("size", size), zap.Int("event length", len(cevent.Events)),
zap.Int("size", size), zap.Int("eventLen", len(cevent.Events)),
zap.Int("resolved region count", regionCount))
}

Expand Down Expand Up @@ -1330,7 +1332,7 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bool) (model.RegionFeedEvent, error) {
func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
Expand All @@ -1350,14 +1352,10 @@ func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row, enableOldValue bo
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
RegionID: regionID,
OldValue: entry.GetOldValue(),
},
}

// when old-value is disabled, it is still possible for the tikv to send a event containing the old value
// we need avoid a old-value sent to downstream when old-value is disabled
if enableOldValue {
revent.Val.OldValue = entry.GetOldValue()
}
return revent, nil
}

Expand Down
9 changes: 5 additions & 4 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
Expand Down Expand Up @@ -192,11 +193,11 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh)
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -283,11 +284,11 @@ func prepareBench(b *testing.B, regionNum int) (
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdClient)
defer regionCache.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache)
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache, config.GetDefaultServerConfig().KVClient)
eventCh := make(chan model.RegionFeedEvent, 1000000)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, false, lockresolver, isPullInit, eventCh)
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, 100, lockresolver, isPullInit, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading