Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9691
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Sep 11, 2023
1 parent 5fe84bb commit 4f8aed2
Show file tree
Hide file tree
Showing 17 changed files with 1,148 additions and 15 deletions.
7 changes: 6 additions & 1 deletion cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}
<<<<<<< HEAD

infoStr, err := info.Marshal()
if err != nil {
Expand All @@ -322,6 +323,8 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

=======
>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691))
upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand All @@ -338,7 +341,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
8 changes: 7 additions & 1 deletion cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,19 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
<<<<<<< HEAD
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
o, err := h.capture.GetOwner()
=======

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691))
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
Expand All @@ -147,7 +153,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
12 changes: 6 additions & 6 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -476,11 +476,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err
)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
108 changes: 108 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddlproducer

import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
)

// Assert DDLEventSink implementation
var _ DDLProducer = (*PulsarMockProducers)(nil)

// PulsarMockProducers is a mock pulsar producer
type PulsarMockProducers struct {
events map[string][]*pulsar.ProducerMessage
}

// SyncBroadcastMessage pulsar consume all partitions
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
totalPartitionsNum int32, message *common.Message,
) error {
return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message)
}

// SyncSendMessage sends a message
// partitionNum is not used,pulsar consume all partitions
func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string,
partitionNum int32, message *common.Message,
) error {
data := &pulsar.ProducerMessage{
Payload: message.Value,
Key: message.GetPartitionKey(),
}
p.events[topic] = append(p.events[topic], data)

return nil
}

// NewMockPulsarProducer creates a pulsar producer
func NewMockPulsarProducer(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *config.PulsarConfig,
client pulsar.Client,
) (*PulsarMockProducers, error) {
return &PulsarMockProducers{
events: map[string][]*pulsar.ProducerMessage{},
}, nil
}

// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer
func NewMockPulsarProducerDDL(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *config.PulsarConfig,
client pulsar.Client,
sinkConfig *config.SinkConfig,
) (DDLProducer, error) {
return NewMockPulsarProducer(ctx, changefeedID, pConfig, client)
}

// GetProducerByTopic returns a producer by topic name
func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) {
return producer, nil
}

// Close close all producers
func (p *PulsarMockProducers) Close() {
p.events = make(map[string][]*pulsar.ProducerMessage)
}

// Flush waits for all the messages in the async producer to be sent to Pulsar.
// Notice: this method is not thread-safe.
// Do not try to call AsyncSendMessage and Flush functions in different threads,
// otherwise Flush will not work as expected. It may never finish or flush the wrong message.
// Because inflight will be modified by mistake.
func (p *PulsarMockProducers) Flush(ctx context.Context) error {
return nil
}

// GetAllEvents returns the events received by the mock producer.
func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage {
var events []*pulsar.ProducerMessage
for _, v := range p.events {
events = append(events, v...)
}
return events
}

// GetEvents returns the event filtered by the key.
func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage {
return p.events[topic]
}
106 changes: 106 additions & 0 deletions cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mq

import (
"context"
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
tiflowutil "github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink.
func NewPulsarDDLSink(
ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
pulsarTopicManagerCreator manager.PulsarTopicManager,
clientCreator pulsarConfig.FactoryCreator,
producerCreator ddlproducer.PulsarFactory,
) (_ *DDLSink, err error) {
log.Info("Starting pulsar DDL producer ...",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeed", changefeedID.ID))

defaultTopic, err := util.GetTopic(sinkURI)
if err != nil {
return nil, errors.Trace(err)
}

protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol))
if err != nil {
return nil, errors.Trace(err)
}

pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("Try to create a DDL sink producer",
zap.String("changefeed", changefeedID.String()))

// NewEventRouter
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme)
if err != nil {
return nil, errors.Trace(err)
}

encoderConfig, err := util.GetEncoderConfig(changefeedID,
sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes)
if err != nil {
return nil, errors.Trace(err)
}

encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

start := time.Now()
client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink)
if err != nil {
log.Error("DDL sink producer client create fail", zap.Error(err))
return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err)
}

p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
if err != nil {
return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err)
}

topicManager, err := pulsarTopicManagerCreator(pConfig, client)
if err != nil {
return nil, errors.Trace(err)
}

s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)

return s, nil
}
Loading

0 comments on commit 4f8aed2

Please sign in to comment.