diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index e8d607aed9c..0bd61a75967 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -304,6 +304,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { _ = c.Error(err) return } +<<<<<<< HEAD infoStr, err := info.Marshal() if err != nil { @@ -322,6 +323,8 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } +======= +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) upstreamInfo := &model.UpstreamInfo{ ID: up.ID, PDEndpoints: strings.Join(up.PdEndpoints, ","), @@ -338,7 +341,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } - log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr)) + log.Info("Create changefeed successfully!", + zap.String("id", changefeedConfig.ID), + zap.String("changefeed", info.String())) c.Status(http.StatusAccepted) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index e91efe355aa..2194265c977 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -116,6 +116,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CAPath: cfg.CAPath, CertAllowedCN: cfg.CertAllowedCN, } +<<<<<<< HEAD infoStr, err := info.Marshal() if err != nil { needRemoveGCSafePoint = true @@ -123,6 +124,11 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { return } o, err := h.capture.GetOwner() +======= + + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) if err != nil { needRemoveGCSafePoint = true _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) @@ -147,7 +153,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ID), - zap.String("changefeed", infoStr)) + zap.String("changefeed", info.String())) c.JSON(http.StatusOK, toAPIModel(info, info.StartTs, info.StartTs, nil, true)) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 359f5a19f19..265bcf322cb 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -233,9 +233,9 @@ func (info *ChangeFeedInfo) String() (str string) { return } - clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI) - if err != nil { - log.Error("failed to marshal changefeed info", zap.Error(err)) + clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI) + if clone.Config != nil { + clone.Config.MaskSensitiveData() } str, err = clone.Marshal() @@ -476,11 +476,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() { } func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) { - oldRawQuery := uri.RawQuery newRawQuery := newQuery.Encode() + maskedURI, _ := util.MaskSinkURI(uri.String()) log.Info("handle incompatible protocol from sink URI", - zap.String("oldUriQuery", oldRawQuery), - zap.String("fixedUriQuery", newQuery.Encode())) + zap.String("oldURI", maskedURI), + zap.String("newProtocol", newProtocol)) uri.RawQuery = newRawQuery fixedSinkURI := uri.String() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 4954a9afd50..80a32391a07 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -615,7 +615,7 @@ LOOP2: zap.Uint64("changefeedEpoch", epoch), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.resolvedTs), - zap.Stringer("info", c.state.Info)) + zap.String("info", c.state.Info.String())) return nil } @@ -688,7 +688,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), zap.Any("status", c.state.Status), - zap.Stringer("info", c.state.Info), + zap.String("info", c.state.Info.String()), zap.Bool("isRemoved", c.isRemoved)) } diff --git a/cdc/sink/codec/avro/schema_registry.go b/cdc/sink/codec/avro/schema_registry.go index cdc366adca7..db179789c1f 100644 --- a/cdc/sink/codec/avro/schema_registry.go +++ b/cdc/sink/codec/avro/schema_registry.go @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err ) req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil) if err != nil { - log.Error("Could not construct request for clearRegistry", zap.String("uri", uri)) + log.Error("Could not construct request for clearRegistry", zap.Error(err)) return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err) } req.Header.Add( diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go new file mode 100644 index 00000000000..561d5515a5c --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -0,0 +1,108 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*PulsarMockProducers)(nil) + +// PulsarMockProducers is a mock pulsar producer +type PulsarMockProducers struct { + events map[string][]*pulsar.ProducerMessage +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message, +) error { + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message, +) error { + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + p.events[topic] = append(p.events[topic], data) + + return nil +} + +// NewMockPulsarProducer creates a pulsar producer +func NewMockPulsarProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *config.PulsarConfig, + client pulsar.Client, +) (*PulsarMockProducers, error) { + return &PulsarMockProducers{ + events: map[string][]*pulsar.ProducerMessage{}, + }, nil +} + +// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer +func NewMockPulsarProducerDDL( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *config.PulsarConfig, + client pulsar.Client, + sinkConfig *config.SinkConfig, +) (DDLProducer, error) { + return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) +} + +// GetProducerByTopic returns a producer by topic name +func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + return producer, nil +} + +// Close close all producers +func (p *PulsarMockProducers) Close() { + p.events = make(map[string][]*pulsar.ProducerMessage) +} + +// Flush waits for all the messages in the async producer to be sent to Pulsar. +// Notice: this method is not thread-safe. +// Do not try to call AsyncSendMessage and Flush functions in different threads, +// otherwise Flush will not work as expected. It may never finish or flush the wrong message. +// Because inflight will be modified by mistake. +func (p *PulsarMockProducers) Flush(ctx context.Context) error { + return nil +} + +// GetAllEvents returns the events received by the mock producer. +func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage { + var events []*pulsar.ProducerMessage + for _, v := range p.events { + events = append(events, v...) + } + return events +} + +// GetEvents returns the event filtered by the key. +func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage { + return p.events[topic] +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go new file mode 100644 index 00000000000..6015ad46e02 --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -0,0 +1,106 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. +func NewPulsarDDLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, + producerCreator ddlproducer.PulsarFactory, +) (_ *DDLSink, err error) { + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) + + // NewEventRouter + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, + sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + start := time.Now() + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) + if err != nil { + log.Error("DDL sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) + log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + topicManager, err := pulsarTopicManagerCreator(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + + s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) + + return s, nil +} diff --git a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go new file mode 100644 index 00000000000..80594ddb5ab --- /dev/null +++ b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go @@ -0,0 +1,322 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dmlproducer + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + lru "github.com/hashicorp/golang-lru" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/metrics/mq" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "go.uber.org/zap" +) + +var _ DMLProducer = (*pulsarDMLProducer)(nil) + +// pulsarDMLProducer is used to send messages to pulsar. +type pulsarDMLProducer struct { + // id indicates which processor (changefeed) this sink belongs to. + id model.ChangeFeedID + // We hold the client to make close operation faster. + // Please see the comment of Close(). + client pulsar.Client + // producers is used to send messages to pulsar. + // One topic only use one producer , so we want to have many topics but use less memory, + // lru is a good idea to solve this question. + // support multiple topics + producers *lru.Cache + + // closedMu is used to protect `closed`. + // We need to ensure that closed producers are never written to. + closedMu sync.RWMutex + // closed is used to indicate whether the producer is closed. + // We also use it to guard against double closes. + closed bool + + // failpointCh is used to inject failpoints to the run loop. + // Only used in test. + failpointCh chan error + // closeCh is send error + errChan chan error + + pConfig *config.PulsarConfig +} + +// NewPulsarDMLProducer creates a new pulsar producer. +func NewPulsarDMLProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + client pulsar.Client, + sinkConfig *config.SinkConfig, + errCh chan error, + failpointCh chan error, +) (DMLProducer, error) { + log.Info("Creating pulsar DML producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + start := time.Now() + + var pulsarConfig *config.PulsarConfig + if sinkConfig.PulsarConfig == nil { + log.Error("new pulsar DML producer fail,sink:pulsar config is empty") + return nil, cerror.ErrPulsarInvalidConfig. + GenWithStackByArgs("pulsar config is empty") + } + + pulsarConfig = sinkConfig.PulsarConfig + defaultTopicName := pulsarConfig.GetDefaultTopicName() + defaultProducer, err := newProducer(pulsarConfig, client, defaultTopicName) + if err != nil { + go client.Close() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + producerCacheSize := config.DefaultPulsarProducerCacheSize + if pulsarConfig != nil && pulsarConfig.PulsarProducerCacheSize != nil { + producerCacheSize = int(*pulsarConfig.PulsarProducerCacheSize) + } + + producers, err := lru.NewWithEvict(producerCacheSize, func(key interface{}, value interface{}) { + // this is call when lru Remove producer or auto remove producer + pulsarProducer, ok := value.(pulsar.Producer) + if ok && pulsarProducer != nil { + pulsarProducer.Close() + } + }) + if err != nil { + go client.Close() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + producers.Add(defaultTopicName, defaultProducer) + + p := &pulsarDMLProducer{ + id: changefeedID, + client: client, + producers: producers, + pConfig: pulsarConfig, + closed: false, + failpointCh: failpointCh, + errChan: errCh, + } + log.Info("Pulsar DML producer created", zap.Stringer("changefeed", p.id), + zap.Duration("duration", time.Since(start))) + return p, nil +} + +// AsyncSendMessage Async send one message +func (p *pulsarDMLProducer) AsyncSendMessage( + ctx context.Context, topic string, + partition int32, message *common.Message, +) error { + wrapperSchemaAndTopic(message) + + // We have to hold the lock to avoid writing to a closed producer. + // Close may be blocked for a long time. + p.closedMu.RLock() + defer p.closedMu.RUnlock() + + // If producers are closed, we should skip the message and return an error. + if p.closed { + return cerror.ErrPulsarProducerClosed.GenWithStackByArgs() + } + failpoint.Inject("PulsarSinkAsyncSendError", func() { + // simulate sending message to input channel successfully but flushing + // message to Pulsar meets error + log.Info("PulsarSinkAsyncSendError error injected", zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID)) + p.failpointCh <- errors.New("pulsar sink injected error") + failpoint.Return(nil) + }) + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + + producer, err := p.GetProducerByTopic(topic) + if err != nil { + return err + } + + // if for stress test record , add count to message callback function + + producer.SendAsync(ctx, data, + func(id pulsar.MessageID, m *pulsar.ProducerMessage, err error) { + // fail + if err != nil { + e := cerror.WrapError(cerror.ErrPulsarAsyncSendMessage, err) + log.Error("Pulsar DML producer async send error", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Error(err)) + mq.IncPublishedDMLFail(topic, p.id.ID, message.GetSchema()) + // use this select to avoid send error to a closed channel + // the ctx will always be called before the errChan is closed + select { + case <-ctx.Done(): + return + case p.errChan <- e: + default: + log.Warn("Error channel is full in pulsar DML producer", + zap.Stringer("changefeed", p.id), zap.Error(e)) + } + } else if message.Callback != nil { + // success + message.Callback() + mq.IncPublishedDMLSuccess(topic, p.id.ID, message.GetSchema()) + } + }) + + mq.IncPublishedDMLCount(topic, p.id.ID, message.GetSchema()) + + return nil +} + +func (p *pulsarDMLProducer) Close() { + // We have to hold the lock to synchronize closing with writing. + p.closedMu.Lock() + defer p.closedMu.Unlock() + // If the producer has already been closed, we should skip this close operation. + if p.closed { + // We need to guard against double closing the clients, + // which could lead to panic. + log.Warn("Pulsar DML producer already closed", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID)) + return + } + close(p.failpointCh) + p.closed = true + + start := time.Now() + keys := p.producers.Keys() + for _, topic := range keys { + p.producers.Remove(topic) // callback func will be called + topicName, _ := topic.(string) + log.Info("Async client closed in pulsar DML producer", + zap.Duration("duration", time.Since(start)), + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), zap.String("topic", topicName)) + } + + p.client.Close() +} + +// newProducer creates a pulsar producer +// One topic is used by one producer +func newProducer( + pConfig *config.PulsarConfig, + client pulsar.Client, + topicName string, +) (pulsar.Producer, error) { + po := pulsar.ProducerOptions{ + Topic: topicName, + } + if pConfig.BatchingMaxMessages != nil { + po.BatchingMaxMessages = *pConfig.BatchingMaxMessages + } + if pConfig.BatchingMaxPublishDelay != nil { + po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay.Duration() + } + if pConfig.CompressionType != nil { + po.CompressionType = pConfig.CompressionType.Value() + po.CompressionLevel = pulsar.Default + } + if pConfig.SendTimeout != nil { + po.SendTimeout = pConfig.SendTimeout.Duration() + } + + producer, err := client.CreateProducer(po) + if err != nil { + return nil, err + } + + log.Info("create pulsar producer success", zap.String("topic", topicName)) + + return producer, nil +} + +func (p *pulsarDMLProducer) getProducer(topic string) (pulsar.Producer, bool) { + target, ok := p.producers.Get(topic) + if ok { + producer, ok := target.(pulsar.Producer) + if ok { + return producer, true + } + } + return nil, false +} + +// GetProducerByTopic get producer by topicName, +// if not exist, it will create a producer with topicName, and set in LRU cache +// more meta info at pulsarDMLProducer's producers +func (p *pulsarDMLProducer) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + getProducer, ok := p.getProducer(topicName) + if ok && getProducer != nil { + return getProducer, nil + } + + if !ok { // create a new producer for the topicName + producer, err = newProducer(p.pConfig, p.client, topicName) + if err != nil { + return nil, err + } + p.producers.Add(topicName, producer) + } + + return producer, nil +} + +// wrapperSchemaAndTopic wrapper schema and topic +func wrapperSchemaAndTopic(m *common.Message) { + if m.Schema == nil { + if m.Protocol == config.ProtocolMaxwell { + mx := &maxwellMessage{} + err := json.Unmarshal(m.Value, mx) + if err != nil { + log.Error("unmarshal maxwell message failed", zap.Error(err)) + return + } + if len(mx.Database) > 0 { + m.Schema = &mx.Database + } + if len(mx.Table) > 0 { + m.Table = &mx.Table + } + } + if m.Protocol == config.ProtocolCanal { // canal protocol set multi schemas in one topic + m.Schema = str2Pointer("multi_schema") + } + } +} + +// maxwellMessage is the message format of maxwell +type maxwellMessage struct { + Database string `json:"database"` + Table string `json:"table"` +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go new file mode 100644 index 00000000000..b29688d5bca --- /dev/null +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -0,0 +1,125 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + "github.com/pingcap/tiflow/pkg/sink/kafka" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewKafkaDMLSink will verify the config and create a KafkaSink. +func NewKafkaDMLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + errCh chan error, + factoryCreator kafka.FactoryCreator, + producerCreator dmlproducer.Factory, +) (_ *dmlSink, err error) { + topic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + options := kafka.NewOptions() + if err := options.Apply(changefeedID, sinkURI, replicaConfig); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + factory, err := factoryCreator(options, changefeedID) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + adminClient, err := factory.AdminClient(ctx) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + // We must close adminClient when this func return cause by an error + // otherwise the adminClient will never be closed and lead to a goroutine leak. + defer func() { + if err != nil && adminClient != nil { + adminClient.Close() + } + }() + + // adjust the option configuration before creating the kafka client + if err = kafka.AdjustOptions(ctx, adminClient, options, topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + topicManager, err := util.GetTopicManagerAndTryCreateTopic( + ctx, + changefeedID, + topic, + options.DeriveTopicConfig(), + adminClient, + ) + if err != nil { + return nil, errors.Trace(err) + } + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + failpointCh := make(chan error, 1) + asyncProducer, err := factory.AsyncProducer(ctx, failpointCh) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) + dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) + concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) + encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) + s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, + eventRouter, encoderGroup, protocol, errCh) + log.Info("DML sink producer created", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeedID", changefeedID.ID)) + + return s, nil +} diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go new file mode 100644 index 00000000000..6a11fc512a6 --- /dev/null +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -0,0 +1,117 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewPulsarDMLSink will verify the config and create a PulsarSink. +func NewPulsarDMLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + errCh chan error, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, + producerCreator dmlproducer.PulsarFactory, +) (_ *dmlSink, err error) { + log.Info("Starting pulsar DML producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) + if err != nil { + return nil, errors.Trace(err) + } + + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) + if err != nil { + log.Error("DML sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + failpointCh := make(chan error, 1) + log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String())) + start := time.Now() + p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh) + log.Info("DML sink producer created", + zap.String("changefeed", changefeedID.String()), + zap.Duration("duration", time.Since(start))) + if err != nil { + defer func() { + if p != nil { + p.Close() + } + }() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + // The topicManager is not actually used in pulsar , it is only used to create dmlSink. + // TODO: Find a way to remove it in newDMLSink. + topicManager, err := pulsarTopicManagerCreator(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, + config.DefaultMaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarInvalidConfig, err) + } + + concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) + encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) + + s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderGroup, protocol, errCh) + + return s, nil +} diff --git a/cdc/sink/mysql/mysql_syncpoint_store.go b/cdc/sink/mysql/mysql_syncpoint_store.go index 879a97d67bf..e187979b17c 100644 --- a/cdc/sink/mysql/mysql_syncpoint_store.go +++ b/cdc/sink/mysql/mysql_syncpoint_store.go @@ -144,7 +144,7 @@ func newMySQLSyncPointStore( return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } - log.Info("Start mysql syncpoint sink") + log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String())) return &mysqlSyncPointStore{ db: syncDB, @@ -213,7 +213,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, var secondaryTs string err = row.Scan(&secondaryTs) if err != nil { - log.Info("sync table: get tidb_current_ts err") + log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String())) err2 := tx.Rollback() if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) diff --git a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go index 2603965effc..d02b309546f 100644 --- a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go @@ -119,7 +119,19 @@ func NewKafkaDDLSink( return nil, errors.Trace(err) } +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go s, err := newDDLSink(ctx, p, topicManager, eventRouter, encoderConfig) +======= + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + start := time.Now() + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) + syncProducer, err := factory.SyncProducer(ctx) +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)):cdc/sink/ddlsink/mq/kafka_ddl_sink.go if err != nil { return nil, errors.Trace(err) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index c57d4694252..48582905789 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -868,7 +868,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti func openDB(ctx context.Context, dsn string) (*sql.DB, error) { db, err := sql.Open("mysql", dsn) if err != nil { - log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("open db failed", zap.Error(err)) return nil, errors.Trace(err) } @@ -879,7 +879,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err = db.PingContext(ctx); err != nil { - log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("ping db failed", zap.Error(err)) return nil, errors.Trace(err) } log.Info("open db success", zap.String("dsn", dsn)) diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 85baaf5673f..636edcf865f 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/util" ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { } return redo.ValidateStorage(uri) } + +// MaskSensitiveData masks sensitive data in ConsistentConfig +func (c *ConsistentConfig) MaskSensitiveData() { + c.Storage = util.MaskSensitiveDataInURI(c.Storage) +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 090db4cd7c2..a602056300b 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -279,3 +279,13 @@ func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url return nil } + +// MaskSensitiveData masks sensitive data in ReplicaConfig +func (c *ReplicaConfig) MaskSensitiveData() { + if c.Sink != nil { + c.Sink.MaskSensitiveData() + } + if c.Consistent != nil { + c.Consistent.MaskSensitiveData() + } +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 5b80f46a299..04964bdc98a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -18,6 +18,11 @@ import ( "net/url" "strings" +<<<<<<< HEAD +======= + "github.com/apache/pulsar-client-go/pulsar" + "github.com/aws/aws-sdk-go-v2/aws" +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -154,6 +159,19 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } +// MaskSensitiveData masks sensitive data in SinkConfig +func (s *SinkConfig) MaskSensitiveData() { + if s.SchemaRegistry != nil { + s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry)) + } + if s.KafkaConfig != nil { + s.KafkaConfig.MaskSensitiveData() + } + if s.PulsarConfig != nil { + s.PulsarConfig.MaskSensitiveData() + } +} + // CSVConfig defines a series of configuration items for csv codec. type CSVConfig struct { // delimiter between fields @@ -273,6 +291,275 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } +<<<<<<< HEAD +======= +// CodecConfig represents a MQ codec configuration +type CodecConfig struct { + EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"` + MaxBatchSize *int `toml:"max-batch-size" json:"max-batch-size,omitempty"` + AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"` + AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"` + AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` + ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` + KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` + MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` + Compression *string `toml:"compression" json:"compression,omitempty"` + KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` + DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` + SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` + SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` + CA *string `toml:"ca" json:"ca,omitempty"` + Cert *string `toml:"cert" json:"cert,omitempty"` + Key *string `toml:"key" json:"key,omitempty"` + InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` +} + +// MaskSensitiveData masks sensitive data in KafkaConfig +func (k *KafkaConfig) MaskSensitiveData() { + k.SASLPassword = aws.String("********") + k.SASLGssAPIPassword = aws.String("********") + k.SASLOAuthClientSecret = aws.String("********") + k.Key = aws.String("********") + if k.GlueSchemaRegistryConfig != nil { + k.GlueSchemaRegistryConfig.AccessKey = "********" + k.GlueSchemaRegistryConfig.Token = "********" + k.GlueSchemaRegistryConfig.SecretAccessKey = "********" + } +} + +// PulsarCompressionType is the compression type for pulsar +type PulsarCompressionType string + +// Value returns the pulsar compression type +func (p *PulsarCompressionType) Value() pulsar.CompressionType { + if p == nil { + return 0 + } + switch strings.ToLower(string(*p)) { + case "lz4": + return pulsar.LZ4 + case "zlib": + return pulsar.ZLib + case "zstd": + return pulsar.ZSTD + default: + return 0 // default is no compression + } +} + +// TimeMill is the time in milliseconds +type TimeMill int + +// Duration returns the time in seconds as a duration +func (t *TimeMill) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Millisecond +} + +// NewTimeMill returns a new time in milliseconds +func NewTimeMill(x int) *TimeMill { + t := TimeMill(x) + return &t +} + +// TimeSec is the time in seconds +type TimeSec int + +// Duration returns the time in seconds as a duration +func (t *TimeSec) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Second +} + +// NewTimeSec returns a new time in seconds +func NewTimeSec(x int) *TimeSec { + t := TimeSec(x) + return &t +} + +// OAuth2 is the configuration for OAuth2 +type OAuth2 struct { + // OAuth2IssuerURL the URL of the authorization server. + OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` + // OAuth2Audience the URL of the resource server. + OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` + // OAuth2PrivateKey the private key used to sign the server. + OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` + // OAuth2ClientID the client ID of the application. + OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` + // OAuth2Scope scope + OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` +} + +func (o *OAuth2) validate() (err error) { + if o == nil { + return nil + } + if len(o.OAuth2IssuerURL) == 0 || len(o.OAuth2ClientID) == 0 || len(o.OAuth2PrivateKey) == 0 || + len(o.OAuth2Audience) == 0 { + return fmt.Errorf("issuer-url and audience and private-key and client-id not be empty") + } + return nil +} + +// PulsarConfig pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` + + // PulsarVersion print the version of pulsar + PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` + + // pulsar client compression + CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` + + // AuthenticationToken the token for the Pulsar server + AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` + + // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) + ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` + + // Set the operation timeout (default: 30 seconds) + // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + // operation will be marked as failed + OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` + + // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) + BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` + + // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) + // if batch messages are enabled. If set to a non zero value, messages will be queued until this time + // interval or until + BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` + + // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. + // Send and SendAsync returns an error after timeout. + // default: 30s + SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` + + // TokenFromFile Authentication from the file token, + // the path name of the file (the third priority authentication method) + TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` + + // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) + BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` + // BasicPassword with account + BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` + + // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id + // and 'type' always use 'client_credentials' + OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + + // BrokerURL is used to configure service brokerUrl for the Pulsar service. + // This parameter is a part of the `sink-uri`. Internal use only. + BrokerURL string `toml:"-" json:"-"` + // SinkURI is the parsed sinkURI. Internal use only. + SinkURI *url.URL `toml:"-" json:"-"` +} + +// MaskSensitiveData masks sensitive data in PulsarConfig +func (c *PulsarConfig) MaskSensitiveData() { + if c.AuthenticationToken != nil { + c.AuthenticationToken = aws.String("******") + } + if c.BasicPassword != nil { + c.BasicPassword = aws.String("******") + } + if c.OAuth2 != nil { + c.OAuth2.OAuth2PrivateKey = "******" + } +} + +// Check get broker url +func (c *PulsarConfig) validate() (err error) { + if c.OAuth2 != nil { + if err = c.OAuth2.validate(); err != nil { + return err + } + if c.TLSTrustCertsFilePath == nil { + return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") + } + } + + return nil +} + +// GetDefaultTopicName get default topic name +func (c *PulsarConfig) GetDefaultTopicName() string { + topicName := c.SinkURI.Path + return topicName[1:] +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` + MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` + MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` + TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` + SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` + SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` + SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` + TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + Timeout *string `toml:"timeout" json:"timeout,omitempty"` + EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` + EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` + EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` +} + +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err diff --git a/pkg/util/uri.go b/pkg/util/uri.go index dfe2b66f7b8..d033dd4d303 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) { } return uriParsed.Redacted(), nil } + +var name = []string{ + "password", + "sasl-password", + "access-key", + "secret-access-key", + "access_token", + "token", + "secret", + "passwd", + "pwd", +} + +// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked. +func MaskSensitiveDataInURI(uri string) string { + uriParsed, err := url.Parse(uri) + if err != nil { + log.Error("failed to parse sink URI", zap.Error(err)) + return "" + } + queries := uriParsed.Query() + for _, secretKey := range name { + if queries.Has(secretKey) { + queries.Set(secretKey, "******") + } + } + uriParsed.RawQuery = queries.Encode() + return uriParsed.Redacted() +}