From 4f8aed24c387a54356463e85cc2c4265d5c7e7ef Mon Sep 17 00:00:00 2001 From: Jianyuan Jiang Date: Mon, 11 Sep 2023 14:28:49 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #9691 Signed-off-by: ti-chi-bot --- cdc/api/v1/api.go | 7 +- cdc/api/v2/changefeed.go | 8 +- cdc/model/changefeed.go | 12 +- cdc/owner/changefeed.go | 4 +- cdc/sink/codec/avro/schema_registry.go | 2 +- .../ddlproducer/pulsar_ddl_mock_producer.go | 108 ++++++ cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 106 ++++++ .../mq/dmlproducer/pulsar_dml_producer.go | 322 ++++++++++++++++++ cdc/sink/dmlsink/mq/kafka_dml_sink.go | 125 +++++++ cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 117 +++++++ cdc/sink/mysql/mysql_syncpoint_store.go | 4 +- cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go | 12 + cmd/kafka-consumer/main.go | 4 +- pkg/config/consistent.go | 6 + pkg/config/replica_config.go | 10 + pkg/config/sink.go | 287 ++++++++++++++++ pkg/util/uri.go | 29 ++ 17 files changed, 1148 insertions(+), 15 deletions(-) create mode 100644 cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go create mode 100644 cdc/sink/ddlsink/mq/pulsar_ddl_sink.go create mode 100644 cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go create mode 100644 cdc/sink/dmlsink/mq/kafka_dml_sink.go create mode 100644 cdc/sink/dmlsink/mq/pulsar_dml_sink.go diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index e8d607aed9c..0bd61a75967 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -304,6 +304,7 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { _ = c.Error(err) return } +<<<<<<< HEAD infoStr, err := info.Marshal() if err != nil { @@ -322,6 +323,8 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } +======= +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) upstreamInfo := &model.UpstreamInfo{ ID: up.ID, PDEndpoints: strings.Join(up.PdEndpoints, ","), @@ -338,7 +341,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } - log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr)) + log.Info("Create changefeed successfully!", + zap.String("id", changefeedConfig.ID), + zap.String("changefeed", info.String())) c.Status(http.StatusAccepted) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index e91efe355aa..2194265c977 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -116,6 +116,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CAPath: cfg.CAPath, CertAllowedCN: cfg.CertAllowedCN, } +<<<<<<< HEAD infoStr, err := info.Marshal() if err != nil { needRemoveGCSafePoint = true @@ -123,6 +124,11 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { return } o, err := h.capture.GetOwner() +======= + + // cannot create changefeed if there are running lightning/restore tasks + tlsCfg, err := credential.ToTLSConfig() +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) if err != nil { needRemoveGCSafePoint = true _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) @@ -147,7 +153,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ID), - zap.String("changefeed", infoStr)) + zap.String("changefeed", info.String())) c.JSON(http.StatusOK, toAPIModel(info, info.StartTs, info.StartTs, nil, true)) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 359f5a19f19..265bcf322cb 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -233,9 +233,9 @@ func (info *ChangeFeedInfo) String() (str string) { return } - clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI) - if err != nil { - log.Error("failed to marshal changefeed info", zap.Error(err)) + clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI) + if clone.Config != nil { + clone.Config.MaskSensitiveData() } str, err = clone.Marshal() @@ -476,11 +476,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() { } func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) { - oldRawQuery := uri.RawQuery newRawQuery := newQuery.Encode() + maskedURI, _ := util.MaskSinkURI(uri.String()) log.Info("handle incompatible protocol from sink URI", - zap.String("oldUriQuery", oldRawQuery), - zap.String("fixedUriQuery", newQuery.Encode())) + zap.String("oldURI", maskedURI), + zap.String("newProtocol", newProtocol)) uri.RawQuery = newRawQuery fixedSinkURI := uri.String() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 4954a9afd50..80a32391a07 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -615,7 +615,7 @@ LOOP2: zap.Uint64("changefeedEpoch", epoch), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.resolvedTs), - zap.Stringer("info", c.state.Info)) + zap.String("info", c.state.Info.String())) return nil } @@ -688,7 +688,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), zap.Any("status", c.state.Status), - zap.Stringer("info", c.state.Info), + zap.String("info", c.state.Info.String()), zap.Bool("isRemoved", c.isRemoved)) } diff --git a/cdc/sink/codec/avro/schema_registry.go b/cdc/sink/codec/avro/schema_registry.go index cdc366adca7..db179789c1f 100644 --- a/cdc/sink/codec/avro/schema_registry.go +++ b/cdc/sink/codec/avro/schema_registry.go @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err ) req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil) if err != nil { - log.Error("Could not construct request for clearRegistry", zap.String("uri", uri)) + log.Error("Could not construct request for clearRegistry", zap.Error(err)) return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err) } req.Header.Add( diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go new file mode 100644 index 00000000000..561d5515a5c --- /dev/null +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -0,0 +1,108 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlproducer + +import ( + "context" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/sink/codec/common" +) + +// Assert DDLEventSink implementation +var _ DDLProducer = (*PulsarMockProducers)(nil) + +// PulsarMockProducers is a mock pulsar producer +type PulsarMockProducers struct { + events map[string][]*pulsar.ProducerMessage +} + +// SyncBroadcastMessage pulsar consume all partitions +func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, + totalPartitionsNum int32, message *common.Message, +) error { + return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) +} + +// SyncSendMessage sends a message +// partitionNum is not used,pulsar consume all partitions +func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, + partitionNum int32, message *common.Message, +) error { + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + p.events[topic] = append(p.events[topic], data) + + return nil +} + +// NewMockPulsarProducer creates a pulsar producer +func NewMockPulsarProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *config.PulsarConfig, + client pulsar.Client, +) (*PulsarMockProducers, error) { + return &PulsarMockProducers{ + events: map[string][]*pulsar.ProducerMessage{}, + }, nil +} + +// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer +func NewMockPulsarProducerDDL( + ctx context.Context, + changefeedID model.ChangeFeedID, + pConfig *config.PulsarConfig, + client pulsar.Client, + sinkConfig *config.SinkConfig, +) (DDLProducer, error) { + return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) +} + +// GetProducerByTopic returns a producer by topic name +func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + return producer, nil +} + +// Close close all producers +func (p *PulsarMockProducers) Close() { + p.events = make(map[string][]*pulsar.ProducerMessage) +} + +// Flush waits for all the messages in the async producer to be sent to Pulsar. +// Notice: this method is not thread-safe. +// Do not try to call AsyncSendMessage and Flush functions in different threads, +// otherwise Flush will not work as expected. It may never finish or flush the wrong message. +// Because inflight will be modified by mistake. +func (p *PulsarMockProducers) Flush(ctx context.Context) error { + return nil +} + +// GetAllEvents returns the events received by the mock producer. +func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage { + var events []*pulsar.ProducerMessage + for _, v := range p.events { + events = append(events, v...) + } + return events +} + +// GetEvents returns the event filtered by the key. +func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage { + return p.events[topic] +} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go new file mode 100644 index 00000000000..6015ad46e02 --- /dev/null +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -0,0 +1,106 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. +func NewPulsarDDLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, + producerCreator ddlproducer.PulsarFactory, +) (_ *DDLSink, err error) { + log.Info("Starting pulsar DDL producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) + + // NewEventRouter + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, + sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + start := time.Now() + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) + if err != nil { + log.Error("DDL sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) + log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + topicManager, err := pulsarTopicManagerCreator(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + + s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) + + return s, nil +} diff --git a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go new file mode 100644 index 00000000000..80594ddb5ab --- /dev/null +++ b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go @@ -0,0 +1,322 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dmlproducer + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + lru "github.com/hashicorp/golang-lru" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/metrics/mq" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "go.uber.org/zap" +) + +var _ DMLProducer = (*pulsarDMLProducer)(nil) + +// pulsarDMLProducer is used to send messages to pulsar. +type pulsarDMLProducer struct { + // id indicates which processor (changefeed) this sink belongs to. + id model.ChangeFeedID + // We hold the client to make close operation faster. + // Please see the comment of Close(). + client pulsar.Client + // producers is used to send messages to pulsar. + // One topic only use one producer , so we want to have many topics but use less memory, + // lru is a good idea to solve this question. + // support multiple topics + producers *lru.Cache + + // closedMu is used to protect `closed`. + // We need to ensure that closed producers are never written to. + closedMu sync.RWMutex + // closed is used to indicate whether the producer is closed. + // We also use it to guard against double closes. + closed bool + + // failpointCh is used to inject failpoints to the run loop. + // Only used in test. + failpointCh chan error + // closeCh is send error + errChan chan error + + pConfig *config.PulsarConfig +} + +// NewPulsarDMLProducer creates a new pulsar producer. +func NewPulsarDMLProducer( + ctx context.Context, + changefeedID model.ChangeFeedID, + client pulsar.Client, + sinkConfig *config.SinkConfig, + errCh chan error, + failpointCh chan error, +) (DMLProducer, error) { + log.Info("Creating pulsar DML producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + start := time.Now() + + var pulsarConfig *config.PulsarConfig + if sinkConfig.PulsarConfig == nil { + log.Error("new pulsar DML producer fail,sink:pulsar config is empty") + return nil, cerror.ErrPulsarInvalidConfig. + GenWithStackByArgs("pulsar config is empty") + } + + pulsarConfig = sinkConfig.PulsarConfig + defaultTopicName := pulsarConfig.GetDefaultTopicName() + defaultProducer, err := newProducer(pulsarConfig, client, defaultTopicName) + if err != nil { + go client.Close() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + producerCacheSize := config.DefaultPulsarProducerCacheSize + if pulsarConfig != nil && pulsarConfig.PulsarProducerCacheSize != nil { + producerCacheSize = int(*pulsarConfig.PulsarProducerCacheSize) + } + + producers, err := lru.NewWithEvict(producerCacheSize, func(key interface{}, value interface{}) { + // this is call when lru Remove producer or auto remove producer + pulsarProducer, ok := value.(pulsar.Producer) + if ok && pulsarProducer != nil { + pulsarProducer.Close() + } + }) + if err != nil { + go client.Close() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + producers.Add(defaultTopicName, defaultProducer) + + p := &pulsarDMLProducer{ + id: changefeedID, + client: client, + producers: producers, + pConfig: pulsarConfig, + closed: false, + failpointCh: failpointCh, + errChan: errCh, + } + log.Info("Pulsar DML producer created", zap.Stringer("changefeed", p.id), + zap.Duration("duration", time.Since(start))) + return p, nil +} + +// AsyncSendMessage Async send one message +func (p *pulsarDMLProducer) AsyncSendMessage( + ctx context.Context, topic string, + partition int32, message *common.Message, +) error { + wrapperSchemaAndTopic(message) + + // We have to hold the lock to avoid writing to a closed producer. + // Close may be blocked for a long time. + p.closedMu.RLock() + defer p.closedMu.RUnlock() + + // If producers are closed, we should skip the message and return an error. + if p.closed { + return cerror.ErrPulsarProducerClosed.GenWithStackByArgs() + } + failpoint.Inject("PulsarSinkAsyncSendError", func() { + // simulate sending message to input channel successfully but flushing + // message to Pulsar meets error + log.Info("PulsarSinkAsyncSendError error injected", zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID)) + p.failpointCh <- errors.New("pulsar sink injected error") + failpoint.Return(nil) + }) + data := &pulsar.ProducerMessage{ + Payload: message.Value, + Key: message.GetPartitionKey(), + } + + producer, err := p.GetProducerByTopic(topic) + if err != nil { + return err + } + + // if for stress test record , add count to message callback function + + producer.SendAsync(ctx, data, + func(id pulsar.MessageID, m *pulsar.ProducerMessage, err error) { + // fail + if err != nil { + e := cerror.WrapError(cerror.ErrPulsarAsyncSendMessage, err) + log.Error("Pulsar DML producer async send error", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), + zap.Error(err)) + mq.IncPublishedDMLFail(topic, p.id.ID, message.GetSchema()) + // use this select to avoid send error to a closed channel + // the ctx will always be called before the errChan is closed + select { + case <-ctx.Done(): + return + case p.errChan <- e: + default: + log.Warn("Error channel is full in pulsar DML producer", + zap.Stringer("changefeed", p.id), zap.Error(e)) + } + } else if message.Callback != nil { + // success + message.Callback() + mq.IncPublishedDMLSuccess(topic, p.id.ID, message.GetSchema()) + } + }) + + mq.IncPublishedDMLCount(topic, p.id.ID, message.GetSchema()) + + return nil +} + +func (p *pulsarDMLProducer) Close() { + // We have to hold the lock to synchronize closing with writing. + p.closedMu.Lock() + defer p.closedMu.Unlock() + // If the producer has already been closed, we should skip this close operation. + if p.closed { + // We need to guard against double closing the clients, + // which could lead to panic. + log.Warn("Pulsar DML producer already closed", + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID)) + return + } + close(p.failpointCh) + p.closed = true + + start := time.Now() + keys := p.producers.Keys() + for _, topic := range keys { + p.producers.Remove(topic) // callback func will be called + topicName, _ := topic.(string) + log.Info("Async client closed in pulsar DML producer", + zap.Duration("duration", time.Since(start)), + zap.String("namespace", p.id.Namespace), + zap.String("changefeed", p.id.ID), zap.String("topic", topicName)) + } + + p.client.Close() +} + +// newProducer creates a pulsar producer +// One topic is used by one producer +func newProducer( + pConfig *config.PulsarConfig, + client pulsar.Client, + topicName string, +) (pulsar.Producer, error) { + po := pulsar.ProducerOptions{ + Topic: topicName, + } + if pConfig.BatchingMaxMessages != nil { + po.BatchingMaxMessages = *pConfig.BatchingMaxMessages + } + if pConfig.BatchingMaxPublishDelay != nil { + po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay.Duration() + } + if pConfig.CompressionType != nil { + po.CompressionType = pConfig.CompressionType.Value() + po.CompressionLevel = pulsar.Default + } + if pConfig.SendTimeout != nil { + po.SendTimeout = pConfig.SendTimeout.Duration() + } + + producer, err := client.CreateProducer(po) + if err != nil { + return nil, err + } + + log.Info("create pulsar producer success", zap.String("topic", topicName)) + + return producer, nil +} + +func (p *pulsarDMLProducer) getProducer(topic string) (pulsar.Producer, bool) { + target, ok := p.producers.Get(topic) + if ok { + producer, ok := target.(pulsar.Producer) + if ok { + return producer, true + } + } + return nil, false +} + +// GetProducerByTopic get producer by topicName, +// if not exist, it will create a producer with topicName, and set in LRU cache +// more meta info at pulsarDMLProducer's producers +func (p *pulsarDMLProducer) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { + getProducer, ok := p.getProducer(topicName) + if ok && getProducer != nil { + return getProducer, nil + } + + if !ok { // create a new producer for the topicName + producer, err = newProducer(p.pConfig, p.client, topicName) + if err != nil { + return nil, err + } + p.producers.Add(topicName, producer) + } + + return producer, nil +} + +// wrapperSchemaAndTopic wrapper schema and topic +func wrapperSchemaAndTopic(m *common.Message) { + if m.Schema == nil { + if m.Protocol == config.ProtocolMaxwell { + mx := &maxwellMessage{} + err := json.Unmarshal(m.Value, mx) + if err != nil { + log.Error("unmarshal maxwell message failed", zap.Error(err)) + return + } + if len(mx.Database) > 0 { + m.Schema = &mx.Database + } + if len(mx.Table) > 0 { + m.Table = &mx.Table + } + } + if m.Protocol == config.ProtocolCanal { // canal protocol set multi schemas in one topic + m.Schema = str2Pointer("multi_schema") + } + } +} + +// maxwellMessage is the message format of maxwell +type maxwellMessage struct { + Database string `json:"database"` + Table string `json:"table"` +} + +// str2Pointer returns the pointer of the string. +func str2Pointer(str string) *string { + return &str +} diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go new file mode 100644 index 00000000000..b29688d5bca --- /dev/null +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -0,0 +1,125 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + "github.com/pingcap/tiflow/pkg/sink/kafka" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewKafkaDMLSink will verify the config and create a KafkaSink. +func NewKafkaDMLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + errCh chan error, + factoryCreator kafka.FactoryCreator, + producerCreator dmlproducer.Factory, +) (_ *dmlSink, err error) { + topic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + options := kafka.NewOptions() + if err := options.Apply(changefeedID, sinkURI, replicaConfig); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + factory, err := factoryCreator(options, changefeedID) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + adminClient, err := factory.AdminClient(ctx) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + // We must close adminClient when this func return cause by an error + // otherwise the adminClient will never be closed and lead to a goroutine leak. + defer func() { + if err != nil && adminClient != nil { + adminClient.Close() + } + }() + + // adjust the option configuration before creating the kafka client + if err = kafka.AdjustOptions(ctx, adminClient, options, topic); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + topicManager, err := util.GetTopicManagerAndTryCreateTopic( + ctx, + changefeedID, + topic, + options.DeriveTopicConfig(), + adminClient, + ) + if err != nil { + return nil, errors.Trace(err) + } + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + failpointCh := make(chan error, 1) + asyncProducer, err := factory.AsyncProducer(ctx, failpointCh) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) + } + + metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) + dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) + concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) + encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) + s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, + eventRouter, encoderGroup, protocol, errCh) + log.Info("DML sink producer created", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeedID", changefeedID.ID)) + + return s, nil +} diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go new file mode 100644 index 00000000000..6a11fc512a6 --- /dev/null +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -0,0 +1,117 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "context" + "net/url" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/builder" + pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" + tiflowutil "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// NewPulsarDMLSink will verify the config and create a PulsarSink. +func NewPulsarDMLSink( + ctx context.Context, + changefeedID model.ChangeFeedID, + sinkURI *url.URL, + replicaConfig *config.ReplicaConfig, + errCh chan error, + pulsarTopicManagerCreator manager.PulsarTopicManager, + clientCreator pulsarConfig.FactoryCreator, + producerCreator dmlproducer.PulsarFactory, +) (_ *dmlSink, err error) { + log.Info("Starting pulsar DML producer ...", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID)) + + defaultTopic, err := util.GetTopic(sinkURI) + if err != nil { + return nil, errors.Trace(err) + } + + protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) + if err != nil { + return nil, errors.Trace(err) + } + + pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) + if err != nil { + return nil, errors.Trace(err) + } + + client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) + if err != nil { + log.Error("DML sink producer client create fail", zap.Error(err)) + return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) + } + + failpointCh := make(chan error, 1) + log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String())) + start := time.Now() + p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh) + log.Info("DML sink producer created", + zap.String("changefeed", changefeedID.String()), + zap.Duration("duration", time.Since(start))) + if err != nil { + defer func() { + if p != nil { + p.Close() + } + }() + return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) + } + + // The topicManager is not actually used in pulsar , it is only used to create dmlSink. + // TODO: Find a way to remove it in newDMLSink. + topicManager, err := pulsarTopicManagerCreator(pConfig, client) + if err != nil { + return nil, errors.Trace(err) + } + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) + if err != nil { + return nil, errors.Trace(err) + } + + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, + config.DefaultMaxMessageBytes) + if err != nil { + return nil, errors.Trace(err) + } + + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrPulsarInvalidConfig, err) + } + + concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) + encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) + + s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderGroup, protocol, errCh) + + return s, nil +} diff --git a/cdc/sink/mysql/mysql_syncpoint_store.go b/cdc/sink/mysql/mysql_syncpoint_store.go index 879a97d67bf..e187979b17c 100644 --- a/cdc/sink/mysql/mysql_syncpoint_store.go +++ b/cdc/sink/mysql/mysql_syncpoint_store.go @@ -144,7 +144,7 @@ func newMySQLSyncPointStore( return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } - log.Info("Start mysql syncpoint sink") + log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String())) return &mysqlSyncPointStore{ db: syncDB, @@ -213,7 +213,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, var secondaryTs string err = row.Scan(&secondaryTs) if err != nil { - log.Info("sync table: get tidb_current_ts err") + log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String())) err2 := tx.Rollback() if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) diff --git a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go index 2603965effc..d02b309546f 100644 --- a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go @@ -119,7 +119,19 @@ func NewKafkaDDLSink( return nil, errors.Trace(err) } +<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go s, err := newDDLSink(ctx, p, topicManager, eventRouter, encoderConfig) +======= + encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + start := time.Now() + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) + syncProducer, err := factory.SyncProducer(ctx) +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)):cdc/sink/ddlsink/mq/kafka_ddl_sink.go if err != nil { return nil, errors.Trace(err) } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index c57d4694252..48582905789 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -868,7 +868,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti func openDB(ctx context.Context, dsn string) (*sql.DB, error) { db, err := sql.Open("mysql", dsn) if err != nil { - log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("open db failed", zap.Error(err)) return nil, errors.Trace(err) } @@ -879,7 +879,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err = db.PingContext(ctx); err != nil { - log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("ping db failed", zap.Error(err)) return nil, errors.Trace(err) } log.Info("open db success", zap.String("dsn", dsn)) diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 85baaf5673f..636edcf865f 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/util" ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { } return redo.ValidateStorage(uri) } + +// MaskSensitiveData masks sensitive data in ConsistentConfig +func (c *ConsistentConfig) MaskSensitiveData() { + c.Storage = util.MaskSensitiveDataInURI(c.Storage) +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 090db4cd7c2..a602056300b 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -279,3 +279,13 @@ func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url return nil } + +// MaskSensitiveData masks sensitive data in ReplicaConfig +func (c *ReplicaConfig) MaskSensitiveData() { + if c.Sink != nil { + c.Sink.MaskSensitiveData() + } + if c.Consistent != nil { + c.Consistent.MaskSensitiveData() + } +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 5b80f46a299..04964bdc98a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -18,6 +18,11 @@ import ( "net/url" "strings" +<<<<<<< HEAD +======= + "github.com/apache/pulsar-client-go/pulsar" + "github.com/aws/aws-sdk-go-v2/aws" +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -154,6 +159,19 @@ type KafkaConfig struct { LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` } +// MaskSensitiveData masks sensitive data in SinkConfig +func (s *SinkConfig) MaskSensitiveData() { + if s.SchemaRegistry != nil { + s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry)) + } + if s.KafkaConfig != nil { + s.KafkaConfig.MaskSensitiveData() + } + if s.PulsarConfig != nil { + s.PulsarConfig.MaskSensitiveData() + } +} + // CSVConfig defines a series of configuration items for csv codec. type CSVConfig struct { // delimiter between fields @@ -273,6 +291,275 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } +<<<<<<< HEAD +======= +// CodecConfig represents a MQ codec configuration +type CodecConfig struct { + EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"` + MaxBatchSize *int `toml:"max-batch-size" json:"max-batch-size,omitempty"` + AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"` + AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"` + AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"` +} + +// KafkaConfig represents a kafka sink configuration +type KafkaConfig struct { + PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` + ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` + KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` + MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` + Compression *string `toml:"compression" json:"compression,omitempty"` + KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` + DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` + SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` + SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` + CA *string `toml:"ca" json:"ca,omitempty"` + Cert *string `toml:"cert" json:"cert,omitempty"` + Key *string `toml:"key" json:"key,omitempty"` + InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` +} + +// MaskSensitiveData masks sensitive data in KafkaConfig +func (k *KafkaConfig) MaskSensitiveData() { + k.SASLPassword = aws.String("********") + k.SASLGssAPIPassword = aws.String("********") + k.SASLOAuthClientSecret = aws.String("********") + k.Key = aws.String("********") + if k.GlueSchemaRegistryConfig != nil { + k.GlueSchemaRegistryConfig.AccessKey = "********" + k.GlueSchemaRegistryConfig.Token = "********" + k.GlueSchemaRegistryConfig.SecretAccessKey = "********" + } +} + +// PulsarCompressionType is the compression type for pulsar +type PulsarCompressionType string + +// Value returns the pulsar compression type +func (p *PulsarCompressionType) Value() pulsar.CompressionType { + if p == nil { + return 0 + } + switch strings.ToLower(string(*p)) { + case "lz4": + return pulsar.LZ4 + case "zlib": + return pulsar.ZLib + case "zstd": + return pulsar.ZSTD + default: + return 0 // default is no compression + } +} + +// TimeMill is the time in milliseconds +type TimeMill int + +// Duration returns the time in seconds as a duration +func (t *TimeMill) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Millisecond +} + +// NewTimeMill returns a new time in milliseconds +func NewTimeMill(x int) *TimeMill { + t := TimeMill(x) + return &t +} + +// TimeSec is the time in seconds +type TimeSec int + +// Duration returns the time in seconds as a duration +func (t *TimeSec) Duration() time.Duration { + if t == nil { + return 0 + } + return time.Duration(*t) * time.Second +} + +// NewTimeSec returns a new time in seconds +func NewTimeSec(x int) *TimeSec { + t := TimeSec(x) + return &t +} + +// OAuth2 is the configuration for OAuth2 +type OAuth2 struct { + // OAuth2IssuerURL the URL of the authorization server. + OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` + // OAuth2Audience the URL of the resource server. + OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` + // OAuth2PrivateKey the private key used to sign the server. + OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` + // OAuth2ClientID the client ID of the application. + OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` + // OAuth2Scope scope + OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` +} + +func (o *OAuth2) validate() (err error) { + if o == nil { + return nil + } + if len(o.OAuth2IssuerURL) == 0 || len(o.OAuth2ClientID) == 0 || len(o.OAuth2PrivateKey) == 0 || + len(o.OAuth2Audience) == 0 { + return fmt.Errorf("issuer-url and audience and private-key and client-id not be empty") + } + return nil +} + +// PulsarConfig pulsar sink configuration +type PulsarConfig struct { + TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` + TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` + TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` + + // PulsarProducerCacheSize is the size of the cache of pulsar producers + PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` + + // PulsarVersion print the version of pulsar + PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` + + // pulsar client compression + CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` + + // AuthenticationToken the token for the Pulsar server + AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` + + // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) + ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` + + // Set the operation timeout (default: 30 seconds) + // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the + // operation will be marked as failed + OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` + + // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) + BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` + + // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) + // if batch messages are enabled. If set to a non zero value, messages will be queued until this time + // interval or until + BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` + + // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. + // Send and SendAsync returns an error after timeout. + // default: 30s + SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` + + // TokenFromFile Authentication from the file token, + // the path name of the file (the third priority authentication method) + TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` + + // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) + BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` + // BasicPassword with account + BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` + + // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key + AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` + // AuthTLSPrivateKeyPath private key + AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` + + // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id + // and 'type' always use 'client_credentials' + OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + + // BrokerURL is used to configure service brokerUrl for the Pulsar service. + // This parameter is a part of the `sink-uri`. Internal use only. + BrokerURL string `toml:"-" json:"-"` + // SinkURI is the parsed sinkURI. Internal use only. + SinkURI *url.URL `toml:"-" json:"-"` +} + +// MaskSensitiveData masks sensitive data in PulsarConfig +func (c *PulsarConfig) MaskSensitiveData() { + if c.AuthenticationToken != nil { + c.AuthenticationToken = aws.String("******") + } + if c.BasicPassword != nil { + c.BasicPassword = aws.String("******") + } + if c.OAuth2 != nil { + c.OAuth2.OAuth2PrivateKey = "******" + } +} + +// Check get broker url +func (c *PulsarConfig) validate() (err error) { + if c.OAuth2 != nil { + if err = c.OAuth2.validate(); err != nil { + return err + } + if c.TLSTrustCertsFilePath == nil { + return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") + } + } + + return nil +} + +// GetDefaultTopicName get default topic name +func (c *PulsarConfig) GetDefaultTopicName() string { + topicName := c.SinkURI.Path + return topicName[1:] +} + +// MySQLConfig represents a MySQL sink configuration +type MySQLConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` + MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` + MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` + TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` + SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` + SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` + SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` + TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + Timeout *string `toml:"timeout" json:"timeout,omitempty"` + EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` + EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` + EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` +} + +// CloudStorageConfig represents a cloud storage sink configuration +type CloudStorageConfig struct { + WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` + FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` + FileSize *int `toml:"file-size" json:"file-size,omitempty"` + + OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` +} + +>>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err diff --git a/pkg/util/uri.go b/pkg/util/uri.go index dfe2b66f7b8..d033dd4d303 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) { } return uriParsed.Redacted(), nil } + +var name = []string{ + "password", + "sasl-password", + "access-key", + "secret-access-key", + "access_token", + "token", + "secret", + "passwd", + "pwd", +} + +// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked. +func MaskSensitiveDataInURI(uri string) string { + uriParsed, err := url.Parse(uri) + if err != nil { + log.Error("failed to parse sink URI", zap.Error(err)) + return "" + } + queries := uriParsed.Query() + for _, secretKey := range name { + if queries.Has(secretKey) { + queries.Set(secretKey, "******") + } + } + uriParsed.RawQuery = queries.Encode() + return uriParsed.Redacted() +} From 6728de4763c3934068ebb51f5fddedd195febd8e Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Mon, 11 Sep 2023 14:48:45 +0800 Subject: [PATCH 2/4] mask password in changefeed config --- cdc/api/v1/api.go | 10 - cdc/api/v2/changefeed.go | 10 - cdc/model/changefeed.go | 5 + .../ddlproducer/pulsar_ddl_mock_producer.go | 108 ------ cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 106 ------ .../mq/dmlproducer/pulsar_dml_producer.go | 322 ------------------ cdc/sink/dmlsink/mq/kafka_dml_sink.go | 125 ------- cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 117 ------- cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go | 12 - pkg/config/sink.go | 269 +-------------- 10 files changed, 9 insertions(+), 1075 deletions(-) delete mode 100644 cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go delete mode 100644 cdc/sink/ddlsink/mq/pulsar_ddl_sink.go delete mode 100644 cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go delete mode 100644 cdc/sink/dmlsink/mq/kafka_dml_sink.go delete mode 100644 cdc/sink/dmlsink/mq/pulsar_dml_sink.go diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 0bd61a75967..333250f2800 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -304,14 +304,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { _ = c.Error(err) return } -<<<<<<< HEAD - - infoStr, err := info.Marshal() - if err != nil { - _ = c.Error(err) - return - } - o, err := h.capture.GetOwner() if err != nil { _ = c.Error(err) @@ -323,8 +315,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } -======= ->>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) upstreamInfo := &model.UpstreamInfo{ ID: up.ID, PDEndpoints: strings.Join(up.PdEndpoints, ","), diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 2194265c977..265cbde9faf 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -116,19 +116,9 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CAPath: cfg.CAPath, CertAllowedCN: cfg.CertAllowedCN, } -<<<<<<< HEAD - infoStr, err := info.Marshal() - if err != nil { - needRemoveGCSafePoint = true - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return - } o, err := h.capture.GetOwner() -======= // cannot create changefeed if there are running lightning/restore tasks - tlsCfg, err := credential.ToTLSConfig() ->>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) if err != nil { needRemoveGCSafePoint = true _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 265bcf322cb..3493e3c1e6f 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -46,6 +46,11 @@ type ChangeFeedID struct { ID string } +// String implements fmt.Stringer interface +func (c ChangeFeedID) String() string { + return c.Namespace + "/" + c.ID +} + // DefaultChangeFeedID returns `ChangeFeedID` with default namespace func DefaultChangeFeedID(id string) ChangeFeedID { return ChangeFeedID{ diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go deleted file mode 100644 index 561d5515a5c..00000000000 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddlproducer - -import ( - "context" - - "github.com/apache/pulsar-client-go/pulsar" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/sink/codec/common" -) - -// Assert DDLEventSink implementation -var _ DDLProducer = (*PulsarMockProducers)(nil) - -// PulsarMockProducers is a mock pulsar producer -type PulsarMockProducers struct { - events map[string][]*pulsar.ProducerMessage -} - -// SyncBroadcastMessage pulsar consume all partitions -func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, - totalPartitionsNum int32, message *common.Message, -) error { - return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) -} - -// SyncSendMessage sends a message -// partitionNum is not used,pulsar consume all partitions -func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, - partitionNum int32, message *common.Message, -) error { - data := &pulsar.ProducerMessage{ - Payload: message.Value, - Key: message.GetPartitionKey(), - } - p.events[topic] = append(p.events[topic], data) - - return nil -} - -// NewMockPulsarProducer creates a pulsar producer -func NewMockPulsarProducer( - ctx context.Context, - changefeedID model.ChangeFeedID, - pConfig *config.PulsarConfig, - client pulsar.Client, -) (*PulsarMockProducers, error) { - return &PulsarMockProducers{ - events: map[string][]*pulsar.ProducerMessage{}, - }, nil -} - -// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer -func NewMockPulsarProducerDDL( - ctx context.Context, - changefeedID model.ChangeFeedID, - pConfig *config.PulsarConfig, - client pulsar.Client, - sinkConfig *config.SinkConfig, -) (DDLProducer, error) { - return NewMockPulsarProducer(ctx, changefeedID, pConfig, client) -} - -// GetProducerByTopic returns a producer by topic name -func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { - return producer, nil -} - -// Close close all producers -func (p *PulsarMockProducers) Close() { - p.events = make(map[string][]*pulsar.ProducerMessage) -} - -// Flush waits for all the messages in the async producer to be sent to Pulsar. -// Notice: this method is not thread-safe. -// Do not try to call AsyncSendMessage and Flush functions in different threads, -// otherwise Flush will not work as expected. It may never finish or flush the wrong message. -// Because inflight will be modified by mistake. -func (p *PulsarMockProducers) Flush(ctx context.Context) error { - return nil -} - -// GetAllEvents returns the events received by the mock producer. -func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage { - var events []*pulsar.ProducerMessage - for _, v := range p.events { - events = append(events, v...) - } - return events -} - -// GetEvents returns the event filtered by the key. -func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage { - return p.events[topic] -} diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go deleted file mode 100644 index 6015ad46e02..00000000000 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package mq - -import ( - "context" - "net/url" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" - "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec/builder" - pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" - tiflowutil "github.com/pingcap/tiflow/pkg/util" - "go.uber.org/zap" -) - -// NewPulsarDDLSink will verify the config and create a Pulsar DDL Sink. -func NewPulsarDDLSink( - ctx context.Context, - changefeedID model.ChangeFeedID, - sinkURI *url.URL, - replicaConfig *config.ReplicaConfig, - pulsarTopicManagerCreator manager.PulsarTopicManager, - clientCreator pulsarConfig.FactoryCreator, - producerCreator ddlproducer.PulsarFactory, -) (_ *DDLSink, err error) { - log.Info("Starting pulsar DDL producer ...", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID)) - - defaultTopic, err := util.GetTopic(sinkURI) - if err != nil { - return nil, errors.Trace(err) - } - - protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) - if err != nil { - return nil, errors.Trace(err) - } - - pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) - if err != nil { - return nil, errors.Trace(err) - } - - log.Info("Try to create a DDL sink producer", - zap.String("changefeed", changefeedID.String())) - - // NewEventRouter - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) - if err != nil { - return nil, errors.Trace(err) - } - - encoderConfig, err := util.GetEncoderConfig(changefeedID, - sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes) - if err != nil { - return nil, errors.Trace(err) - } - - encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - - start := time.Now() - client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) - if err != nil { - log.Error("DDL sink producer client create fail", zap.Error(err)) - return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) - } - - p, err := producerCreator(ctx, changefeedID, pConfig, client, replicaConfig.Sink) - log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start))) - if err != nil { - return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) - } - - topicManager, err := pulsarTopicManagerCreator(pConfig, client) - if err != nil { - return nil, errors.Trace(err) - } - - s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol) - - return s, nil -} diff --git a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go deleted file mode 100644 index 80594ddb5ab..00000000000 --- a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package dmlproducer - -import ( - "context" - "encoding/json" - "sync" - "time" - - "github.com/apache/pulsar-client-go/pulsar" - lru "github.com/hashicorp/golang-lru" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/metrics/mq" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec/common" - "go.uber.org/zap" -) - -var _ DMLProducer = (*pulsarDMLProducer)(nil) - -// pulsarDMLProducer is used to send messages to pulsar. -type pulsarDMLProducer struct { - // id indicates which processor (changefeed) this sink belongs to. - id model.ChangeFeedID - // We hold the client to make close operation faster. - // Please see the comment of Close(). - client pulsar.Client - // producers is used to send messages to pulsar. - // One topic only use one producer , so we want to have many topics but use less memory, - // lru is a good idea to solve this question. - // support multiple topics - producers *lru.Cache - - // closedMu is used to protect `closed`. - // We need to ensure that closed producers are never written to. - closedMu sync.RWMutex - // closed is used to indicate whether the producer is closed. - // We also use it to guard against double closes. - closed bool - - // failpointCh is used to inject failpoints to the run loop. - // Only used in test. - failpointCh chan error - // closeCh is send error - errChan chan error - - pConfig *config.PulsarConfig -} - -// NewPulsarDMLProducer creates a new pulsar producer. -func NewPulsarDMLProducer( - ctx context.Context, - changefeedID model.ChangeFeedID, - client pulsar.Client, - sinkConfig *config.SinkConfig, - errCh chan error, - failpointCh chan error, -) (DMLProducer, error) { - log.Info("Creating pulsar DML producer ...", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID)) - start := time.Now() - - var pulsarConfig *config.PulsarConfig - if sinkConfig.PulsarConfig == nil { - log.Error("new pulsar DML producer fail,sink:pulsar config is empty") - return nil, cerror.ErrPulsarInvalidConfig. - GenWithStackByArgs("pulsar config is empty") - } - - pulsarConfig = sinkConfig.PulsarConfig - defaultTopicName := pulsarConfig.GetDefaultTopicName() - defaultProducer, err := newProducer(pulsarConfig, client, defaultTopicName) - if err != nil { - go client.Close() - return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) - } - producerCacheSize := config.DefaultPulsarProducerCacheSize - if pulsarConfig != nil && pulsarConfig.PulsarProducerCacheSize != nil { - producerCacheSize = int(*pulsarConfig.PulsarProducerCacheSize) - } - - producers, err := lru.NewWithEvict(producerCacheSize, func(key interface{}, value interface{}) { - // this is call when lru Remove producer or auto remove producer - pulsarProducer, ok := value.(pulsar.Producer) - if ok && pulsarProducer != nil { - pulsarProducer.Close() - } - }) - if err != nil { - go client.Close() - return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) - } - - producers.Add(defaultTopicName, defaultProducer) - - p := &pulsarDMLProducer{ - id: changefeedID, - client: client, - producers: producers, - pConfig: pulsarConfig, - closed: false, - failpointCh: failpointCh, - errChan: errCh, - } - log.Info("Pulsar DML producer created", zap.Stringer("changefeed", p.id), - zap.Duration("duration", time.Since(start))) - return p, nil -} - -// AsyncSendMessage Async send one message -func (p *pulsarDMLProducer) AsyncSendMessage( - ctx context.Context, topic string, - partition int32, message *common.Message, -) error { - wrapperSchemaAndTopic(message) - - // We have to hold the lock to avoid writing to a closed producer. - // Close may be blocked for a long time. - p.closedMu.RLock() - defer p.closedMu.RUnlock() - - // If producers are closed, we should skip the message and return an error. - if p.closed { - return cerror.ErrPulsarProducerClosed.GenWithStackByArgs() - } - failpoint.Inject("PulsarSinkAsyncSendError", func() { - // simulate sending message to input channel successfully but flushing - // message to Pulsar meets error - log.Info("PulsarSinkAsyncSendError error injected", zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID)) - p.failpointCh <- errors.New("pulsar sink injected error") - failpoint.Return(nil) - }) - data := &pulsar.ProducerMessage{ - Payload: message.Value, - Key: message.GetPartitionKey(), - } - - producer, err := p.GetProducerByTopic(topic) - if err != nil { - return err - } - - // if for stress test record , add count to message callback function - - producer.SendAsync(ctx, data, - func(id pulsar.MessageID, m *pulsar.ProducerMessage, err error) { - // fail - if err != nil { - e := cerror.WrapError(cerror.ErrPulsarAsyncSendMessage, err) - log.Error("Pulsar DML producer async send error", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), - zap.Error(err)) - mq.IncPublishedDMLFail(topic, p.id.ID, message.GetSchema()) - // use this select to avoid send error to a closed channel - // the ctx will always be called before the errChan is closed - select { - case <-ctx.Done(): - return - case p.errChan <- e: - default: - log.Warn("Error channel is full in pulsar DML producer", - zap.Stringer("changefeed", p.id), zap.Error(e)) - } - } else if message.Callback != nil { - // success - message.Callback() - mq.IncPublishedDMLSuccess(topic, p.id.ID, message.GetSchema()) - } - }) - - mq.IncPublishedDMLCount(topic, p.id.ID, message.GetSchema()) - - return nil -} - -func (p *pulsarDMLProducer) Close() { - // We have to hold the lock to synchronize closing with writing. - p.closedMu.Lock() - defer p.closedMu.Unlock() - // If the producer has already been closed, we should skip this close operation. - if p.closed { - // We need to guard against double closing the clients, - // which could lead to panic. - log.Warn("Pulsar DML producer already closed", - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID)) - return - } - close(p.failpointCh) - p.closed = true - - start := time.Now() - keys := p.producers.Keys() - for _, topic := range keys { - p.producers.Remove(topic) // callback func will be called - topicName, _ := topic.(string) - log.Info("Async client closed in pulsar DML producer", - zap.Duration("duration", time.Since(start)), - zap.String("namespace", p.id.Namespace), - zap.String("changefeed", p.id.ID), zap.String("topic", topicName)) - } - - p.client.Close() -} - -// newProducer creates a pulsar producer -// One topic is used by one producer -func newProducer( - pConfig *config.PulsarConfig, - client pulsar.Client, - topicName string, -) (pulsar.Producer, error) { - po := pulsar.ProducerOptions{ - Topic: topicName, - } - if pConfig.BatchingMaxMessages != nil { - po.BatchingMaxMessages = *pConfig.BatchingMaxMessages - } - if pConfig.BatchingMaxPublishDelay != nil { - po.BatchingMaxPublishDelay = pConfig.BatchingMaxPublishDelay.Duration() - } - if pConfig.CompressionType != nil { - po.CompressionType = pConfig.CompressionType.Value() - po.CompressionLevel = pulsar.Default - } - if pConfig.SendTimeout != nil { - po.SendTimeout = pConfig.SendTimeout.Duration() - } - - producer, err := client.CreateProducer(po) - if err != nil { - return nil, err - } - - log.Info("create pulsar producer success", zap.String("topic", topicName)) - - return producer, nil -} - -func (p *pulsarDMLProducer) getProducer(topic string) (pulsar.Producer, bool) { - target, ok := p.producers.Get(topic) - if ok { - producer, ok := target.(pulsar.Producer) - if ok { - return producer, true - } - } - return nil, false -} - -// GetProducerByTopic get producer by topicName, -// if not exist, it will create a producer with topicName, and set in LRU cache -// more meta info at pulsarDMLProducer's producers -func (p *pulsarDMLProducer) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) { - getProducer, ok := p.getProducer(topicName) - if ok && getProducer != nil { - return getProducer, nil - } - - if !ok { // create a new producer for the topicName - producer, err = newProducer(p.pConfig, p.client, topicName) - if err != nil { - return nil, err - } - p.producers.Add(topicName, producer) - } - - return producer, nil -} - -// wrapperSchemaAndTopic wrapper schema and topic -func wrapperSchemaAndTopic(m *common.Message) { - if m.Schema == nil { - if m.Protocol == config.ProtocolMaxwell { - mx := &maxwellMessage{} - err := json.Unmarshal(m.Value, mx) - if err != nil { - log.Error("unmarshal maxwell message failed", zap.Error(err)) - return - } - if len(mx.Database) > 0 { - m.Schema = &mx.Database - } - if len(mx.Table) > 0 { - m.Table = &mx.Table - } - } - if m.Protocol == config.ProtocolCanal { // canal protocol set multi schemas in one topic - m.Schema = str2Pointer("multi_schema") - } - } -} - -// maxwellMessage is the message format of maxwell -type maxwellMessage struct { - Database string `json:"database"` - Table string `json:"table"` -} - -// str2Pointer returns the pointer of the string. -func str2Pointer(str string) *string { - return &str -} diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go deleted file mode 100644 index b29688d5bca..00000000000 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package mq - -import ( - "context" - "net/url" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" - "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec" - "github.com/pingcap/tiflow/pkg/sink/codec/builder" - "github.com/pingcap/tiflow/pkg/sink/kafka" - tiflowutil "github.com/pingcap/tiflow/pkg/util" - "go.uber.org/zap" -) - -// NewKafkaDMLSink will verify the config and create a KafkaSink. -func NewKafkaDMLSink( - ctx context.Context, - changefeedID model.ChangeFeedID, - sinkURI *url.URL, - replicaConfig *config.ReplicaConfig, - errCh chan error, - factoryCreator kafka.FactoryCreator, - producerCreator dmlproducer.Factory, -) (_ *dmlSink, err error) { - topic, err := util.GetTopic(sinkURI) - if err != nil { - return nil, errors.Trace(err) - } - - options := kafka.NewOptions() - if err := options.Apply(changefeedID, sinkURI, replicaConfig); err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - - factory, err := factoryCreator(options, changefeedID) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) - } - - adminClient, err := factory.AdminClient(ctx) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) - } - // We must close adminClient when this func return cause by an error - // otherwise the adminClient will never be closed and lead to a goroutine leak. - defer func() { - if err != nil && adminClient != nil { - adminClient.Close() - } - }() - - // adjust the option configuration before creating the kafka client - if err = kafka.AdjustOptions(ctx, adminClient, options, topic); err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) - } - - protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) - if err != nil { - return nil, errors.Trace(err) - } - - topicManager, err := util.GetTopicManagerAndTryCreateTopic( - ctx, - changefeedID, - topic, - options.DeriveTopicConfig(), - adminClient, - ) - if err != nil { - return nil, errors.Trace(err) - } - - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, topic, sinkURI.Scheme) - if err != nil { - return nil, errors.Trace(err) - } - - encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes) - if err != nil { - return nil, errors.Trace(err) - } - - encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - - failpointCh := make(chan error, 1) - asyncProducer, err := factory.AsyncProducer(ctx, failpointCh) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err) - } - - metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) - dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) - concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) - encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) - s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, - eventRouter, encoderGroup, protocol, errCh) - log.Info("DML sink producer created", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeedID", changefeedID.ID)) - - return s, nil -} diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go deleted file mode 100644 index 6a11fc512a6..00000000000 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package mq - -import ( - "context" - "net/url" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" - "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" - "github.com/pingcap/tiflow/cdc/sink/util" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/sink/codec" - "github.com/pingcap/tiflow/pkg/sink/codec/builder" - pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar" - tiflowutil "github.com/pingcap/tiflow/pkg/util" - "go.uber.org/zap" -) - -// NewPulsarDMLSink will verify the config and create a PulsarSink. -func NewPulsarDMLSink( - ctx context.Context, - changefeedID model.ChangeFeedID, - sinkURI *url.URL, - replicaConfig *config.ReplicaConfig, - errCh chan error, - pulsarTopicManagerCreator manager.PulsarTopicManager, - clientCreator pulsarConfig.FactoryCreator, - producerCreator dmlproducer.PulsarFactory, -) (_ *dmlSink, err error) { - log.Info("Starting pulsar DML producer ...", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID)) - - defaultTopic, err := util.GetTopic(sinkURI) - if err != nil { - return nil, errors.Trace(err) - } - - protocol, err := util.GetProtocol(tiflowutil.GetOrZero(replicaConfig.Sink.Protocol)) - if err != nil { - return nil, errors.Trace(err) - } - - pConfig, err := pulsarConfig.NewPulsarConfig(sinkURI, replicaConfig.Sink.PulsarConfig) - if err != nil { - return nil, errors.Trace(err) - } - - client, err := clientCreator(pConfig, changefeedID, replicaConfig.Sink) - if err != nil { - log.Error("DML sink producer client create fail", zap.Error(err)) - return nil, cerror.WrapError(cerror.ErrPulsarNewClient, err) - } - - failpointCh := make(chan error, 1) - log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String())) - start := time.Now() - p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh) - log.Info("DML sink producer created", - zap.String("changefeed", changefeedID.String()), - zap.Duration("duration", time.Since(start))) - if err != nil { - defer func() { - if p != nil { - p.Close() - } - }() - return nil, cerror.WrapError(cerror.ErrPulsarNewProducer, err) - } - - // The topicManager is not actually used in pulsar , it is only used to create dmlSink. - // TODO: Find a way to remove it in newDMLSink. - topicManager, err := pulsarTopicManagerCreator(pConfig, client) - if err != nil { - return nil, errors.Trace(err) - } - eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) - if err != nil { - return nil, errors.Trace(err) - } - - encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, - config.DefaultMaxMessageBytes) - if err != nil { - return nil, errors.Trace(err) - } - - encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrPulsarInvalidConfig, err) - } - - concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) - encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) - - s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderGroup, protocol, errCh) - - return s, nil -} diff --git a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go index d02b309546f..2603965effc 100644 --- a/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go @@ -119,19 +119,7 @@ func NewKafkaDDLSink( return nil, errors.Trace(err) } -<<<<<<< HEAD:cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go s, err := newDDLSink(ctx, p, topicManager, eventRouter, encoderConfig) -======= - encoderBuilder, err := builder.NewRowEventEncoderBuilder(ctx, encoderConfig) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - - start := time.Now() - log.Info("Try to create a DDL sink producer", - zap.String("changefeed", changefeedID.String())) - syncProducer, err := factory.SyncProducer(ctx) ->>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)):cdc/sink/ddlsink/mq/kafka_ddl_sink.go if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 04964bdc98a..10c28588929 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -18,15 +18,12 @@ import ( "net/url" "strings" -<<<<<<< HEAD -======= - "github.com/apache/pulsar-client-go/pulsar" - "github.com/aws/aws-sdk-go-v2/aws" ->>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) + "github.com/aws/aws-sdk-go/aws" "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -161,15 +158,12 @@ type KafkaConfig struct { // MaskSensitiveData masks sensitive data in SinkConfig func (s *SinkConfig) MaskSensitiveData() { - if s.SchemaRegistry != nil { - s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry)) + if s.SchemaRegistry != "" { + s.SchemaRegistry = util.MaskSensitiveDataInURI(s.SchemaRegistry) } if s.KafkaConfig != nil { s.KafkaConfig.MaskSensitiveData() } - if s.PulsarConfig != nil { - s.PulsarConfig.MaskSensitiveData() - } } // CSVConfig defines a series of configuration items for csv codec. @@ -291,8 +285,6 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -<<<<<<< HEAD -======= // CodecConfig represents a MQ codec configuration type CodecConfig struct { EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"` @@ -302,264 +294,11 @@ type CodecConfig struct { AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"` } -// KafkaConfig represents a kafka sink configuration -type KafkaConfig struct { - PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` - ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` - KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` - MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` - Compression *string `toml:"compression" json:"compression,omitempty"` - KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` - AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` - DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` - WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` - ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` - RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` - SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` - SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` - SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` - SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` - SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` - SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` - SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` - SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` - SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` - SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` - SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` - SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` - SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` - SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` - SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` - SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` - EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` - CA *string `toml:"ca" json:"ca,omitempty"` - Cert *string `toml:"cert" json:"cert,omitempty"` - Key *string `toml:"key" json:"key,omitempty"` - InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` - CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` - LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` - GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` -} - // MaskSensitiveData masks sensitive data in KafkaConfig func (k *KafkaConfig) MaskSensitiveData() { - k.SASLPassword = aws.String("********") - k.SASLGssAPIPassword = aws.String("********") k.SASLOAuthClientSecret = aws.String("********") - k.Key = aws.String("********") - if k.GlueSchemaRegistryConfig != nil { - k.GlueSchemaRegistryConfig.AccessKey = "********" - k.GlueSchemaRegistryConfig.Token = "********" - k.GlueSchemaRegistryConfig.SecretAccessKey = "********" - } -} - -// PulsarCompressionType is the compression type for pulsar -type PulsarCompressionType string - -// Value returns the pulsar compression type -func (p *PulsarCompressionType) Value() pulsar.CompressionType { - if p == nil { - return 0 - } - switch strings.ToLower(string(*p)) { - case "lz4": - return pulsar.LZ4 - case "zlib": - return pulsar.ZLib - case "zstd": - return pulsar.ZSTD - default: - return 0 // default is no compression - } -} - -// TimeMill is the time in milliseconds -type TimeMill int - -// Duration returns the time in seconds as a duration -func (t *TimeMill) Duration() time.Duration { - if t == nil { - return 0 - } - return time.Duration(*t) * time.Millisecond -} - -// NewTimeMill returns a new time in milliseconds -func NewTimeMill(x int) *TimeMill { - t := TimeMill(x) - return &t -} - -// TimeSec is the time in seconds -type TimeSec int - -// Duration returns the time in seconds as a duration -func (t *TimeSec) Duration() time.Duration { - if t == nil { - return 0 - } - return time.Duration(*t) * time.Second -} - -// NewTimeSec returns a new time in seconds -func NewTimeSec(x int) *TimeSec { - t := TimeSec(x) - return &t -} - -// OAuth2 is the configuration for OAuth2 -type OAuth2 struct { - // OAuth2IssuerURL the URL of the authorization server. - OAuth2IssuerURL string `toml:"oauth2-issuer-url" json:"oauth2-issuer-url,omitempty"` - // OAuth2Audience the URL of the resource server. - OAuth2Audience string `toml:"oauth2-audience" json:"oauth2-audience,omitempty"` - // OAuth2PrivateKey the private key used to sign the server. - OAuth2PrivateKey string `toml:"oauth2-private-key" json:"oauth2-private-key,omitempty"` - // OAuth2ClientID the client ID of the application. - OAuth2ClientID string `toml:"oauth2-client-id" json:"oauth2-client-id,omitempty"` - // OAuth2Scope scope - OAuth2Scope string `toml:"oauth2-scope" json:"oauth2-scope,omitempty"` -} - -func (o *OAuth2) validate() (err error) { - if o == nil { - return nil - } - if len(o.OAuth2IssuerURL) == 0 || len(o.OAuth2ClientID) == 0 || len(o.OAuth2PrivateKey) == 0 || - len(o.OAuth2Audience) == 0 { - return fmt.Errorf("issuer-url and audience and private-key and client-id not be empty") - } - return nil -} - -// PulsarConfig pulsar sink configuration -type PulsarConfig struct { - TLSKeyFilePath *string `toml:"tls-certificate-path" json:"tls-certificate-path,omitempty"` - TLSCertificateFile *string `toml:"tls-certificate-file" json:"tls-private-key-path,omitempty"` - TLSTrustCertsFilePath *string `toml:"tls-trust-certs-file-path" json:"tls-trust-certs-file-path,omitempty"` - - // PulsarProducerCacheSize is the size of the cache of pulsar producers - PulsarProducerCacheSize *int32 `toml:"pulsar-producer-cache-size" json:"pulsar-producer-cache-size,omitempty"` - - // PulsarVersion print the version of pulsar - PulsarVersion *string `toml:"pulsar-version" json:"pulsar-version,omitempty"` - - // pulsar client compression - CompressionType *PulsarCompressionType `toml:"compression-type" json:"compression-type,omitempty"` - - // AuthenticationToken the token for the Pulsar server - AuthenticationToken *string `toml:"authentication-token" json:"authentication-token,omitempty"` - - // ConnectionTimeout Timeout for the establishment of a TCP connection (default: 5 seconds) - ConnectionTimeout *TimeSec `toml:"connection-timeout" json:"connection-timeout,omitempty"` - - // Set the operation timeout (default: 30 seconds) - // Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the - // operation will be marked as failed - OperationTimeout *TimeSec `toml:"operation-timeout" json:"operation-timeout,omitempty"` - - // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) - BatchingMaxMessages *uint `toml:"batching-max-messages" json:"batching-max-messages,omitempty"` - - // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) - // if batch messages are enabled. If set to a non zero value, messages will be queued until this time - // interval or until - BatchingMaxPublishDelay *TimeMill `toml:"batching-max-publish-delay" json:"batching-max-publish-delay,omitempty"` - - // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. - // Send and SendAsync returns an error after timeout. - // default: 30s - SendTimeout *TimeSec `toml:"send-timeout" json:"send-timeout,omitempty"` - - // TokenFromFile Authentication from the file token, - // the path name of the file (the third priority authentication method) - TokenFromFile *string `toml:"token-from-file" json:"token-from-file,omitempty"` - - // BasicUserName Account name for pulsar basic authentication (the second priority authentication method) - BasicUserName *string `toml:"basic-user-name" json:"basic-user-name,omitempty"` - // BasicPassword with account - BasicPassword *string `toml:"basic-password" json:"basic-password,omitempty"` - - // AuthTLSCertificatePath create new pulsar authentication provider with specified TLS certificate and private key - AuthTLSCertificatePath *string `toml:"auth-tls-certificate-path" json:"auth-tls-certificate-path,omitempty"` - // AuthTLSPrivateKeyPath private key - AuthTLSPrivateKeyPath *string `toml:"auth-tls-private-key-path" json:"auth-tls-private-key-path,omitempty"` - - // Oauth2 include oauth2-issuer-url oauth2-audience oauth2-private-key oauth2-client-id - // and 'type' always use 'client_credentials' - OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` - - // BrokerURL is used to configure service brokerUrl for the Pulsar service. - // This parameter is a part of the `sink-uri`. Internal use only. - BrokerURL string `toml:"-" json:"-"` - // SinkURI is the parsed sinkURI. Internal use only. - SinkURI *url.URL `toml:"-" json:"-"` -} - -// MaskSensitiveData masks sensitive data in PulsarConfig -func (c *PulsarConfig) MaskSensitiveData() { - if c.AuthenticationToken != nil { - c.AuthenticationToken = aws.String("******") - } - if c.BasicPassword != nil { - c.BasicPassword = aws.String("******") - } - if c.OAuth2 != nil { - c.OAuth2.OAuth2PrivateKey = "******" - } -} - -// Check get broker url -func (c *PulsarConfig) validate() (err error) { - if c.OAuth2 != nil { - if err = c.OAuth2.validate(); err != nil { - return err - } - if c.TLSTrustCertsFilePath == nil { - return fmt.Errorf("oauth2 is not empty but tls-trust-certs-file-path is empty") - } - } - - return nil -} - -// GetDefaultTopicName get default topic name -func (c *PulsarConfig) GetDefaultTopicName() string { - topicName := c.SinkURI.Path - return topicName[1:] -} - -// MySQLConfig represents a MySQL sink configuration -type MySQLConfig struct { - WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` - MaxTxnRow *int `toml:"max-txn-row" json:"max-txn-row,omitempty"` - MaxMultiUpdateRowSize *int `toml:"max-multi-update-row-size" json:"max-multi-update-row-size,omitempty"` - MaxMultiUpdateRowCount *int `toml:"max-multi-update-row" json:"max-multi-update-row,omitempty"` - TiDBTxnMode *string `toml:"tidb-txn-mode" json:"tidb-txn-mode,omitempty"` - SSLCa *string `toml:"ssl-ca" json:"ssl-ca,omitempty"` - SSLCert *string `toml:"ssl-cert" json:"ssl-cert,omitempty"` - SSLKey *string `toml:"ssl-key" json:"ssl-key,omitempty"` - TimeZone *string `toml:"time-zone" json:"time-zone,omitempty"` - WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` - ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` - Timeout *string `toml:"timeout" json:"timeout,omitempty"` - EnableBatchDML *bool `toml:"enable-batch-dml" json:"enable-batch-dml,omitempty"` - EnableMultiStatement *bool `toml:"enable-multi-statement" json:"enable-multi-statement,omitempty"` - EnableCachePreparedStatement *bool `toml:"enable-cache-prepared-statement" json:"enable-cache-prepared-statement,omitempty"` -} - -// CloudStorageConfig represents a cloud storage sink configuration -type CloudStorageConfig struct { - WorkerCount *int `toml:"worker-count" json:"worker-count,omitempty"` - FlushInterval *string `toml:"flush-interval" json:"flush-interval,omitempty"` - FileSize *int `toml:"file-size" json:"file-size,omitempty"` - - OutputColumnID *bool `toml:"output-column-id" json:"output-column-id,omitempty"` } ->>>>>>> 6ea9a41117 (*(ticdc): do not print password in cdc log (#9691)) func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { if err := s.validateAndAdjustSinkURI(sinkURI); err != nil { return err From 4ad553b1250fd4570046b2117ba230a1b2eff77d Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Mon, 11 Sep 2023 16:17:53 +0800 Subject: [PATCH 3/4] add ut --- pkg/config/replica_config_test.go | 25 ++++++++++++++++++++++++ pkg/config/sink.go | 5 ++++- pkg/util/uri.go | 8 ++++---- pkg/util/uri_test.go | 32 +++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index ca6fc07ac92..90319372f55 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" ) @@ -253,3 +254,27 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) { require.NoError(t, err) require.False(t, config.EnableOldValue) } + +func TestMaskSensitiveData(t *testing.T) { + config := ReplicaConfig{ + Sink: nil, + Consistent: nil, + } + config.MaskSensitiveData() + require.Nil(t, config.Sink) + require.Nil(t, config.Consistent) + config.Sink = &SinkConfig{} + config.Sink.KafkaConfig = &KafkaConfig{ + SASLOAuthTokenURL: aws.String("http://abc.com?password=bacd"), + SASLOAuthClientSecret: aws.String("bacd"), + } + config.Sink.SchemaRegistry = "http://abc.com?password=bacd" + config.Consistent = &ConsistentConfig{ + Storage: "http://abc.com?password=bacd", + } + config.MaskSensitiveData() + require.Equal(t, "http://abc.com?password=xxxxx", config.Sink.SchemaRegistry) + require.Equal(t, "http://abc.com?password=xxxxx", config.Consistent.Storage) + require.Equal(t, "http://abc.com?password=xxxxx", *config.Sink.KafkaConfig.SASLOAuthTokenURL) + require.Equal(t, "******", *config.Sink.KafkaConfig.SASLOAuthClientSecret) +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 10c28588929..a796f8372ac 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -296,7 +296,10 @@ type CodecConfig struct { // MaskSensitiveData masks sensitive data in KafkaConfig func (k *KafkaConfig) MaskSensitiveData() { - k.SASLOAuthClientSecret = aws.String("********") + k.SASLOAuthClientSecret = aws.String("******") + if k.SASLOAuthTokenURL != nil { + k.SASLOAuthTokenURL = aws.String(util.MaskSensitiveDataInURI(*k.SASLOAuthTokenURL)) + } } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { diff --git a/pkg/util/uri.go b/pkg/util/uri.go index d033dd4d303..da0fbb0d9b8 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -81,7 +81,7 @@ func MaskSinkURI(uri string) (string, error) { return uriParsed.Redacted(), nil } -var name = []string{ +var sensitiveQueryParameterNames = []string{ "password", "sasl-password", "access-key", @@ -97,13 +97,13 @@ var name = []string{ func MaskSensitiveDataInURI(uri string) string { uriParsed, err := url.Parse(uri) if err != nil { - log.Error("failed to parse sink URI", zap.Error(err)) + log.Error("failed to parse URI", zap.Error(err)) return "" } queries := uriParsed.Query() - for _, secretKey := range name { + for _, secretKey := range sensitiveQueryParameterNames { if queries.Has(secretKey) { - queries.Set(secretKey, "******") + queries.Set(secretKey, "xxxxx") } } uriParsed.RawQuery = queries.Encode() diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index 7cafebc9ef6..7e75921ab3b 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -88,3 +88,35 @@ func TestMaskSinkURI(t *testing.T) { require.Equal(t, tt.masked, maskedURI) } } + +func TestMaskSensitiveDataInURI(t *testing.T) { + tests := []struct { + uri string + masked string + }{ + { + "mysql://root:123456@127.0.0.1:3306/?time-zone=c", + "mysql://root:xxxxx@127.0.0.1:3306/?time-zone=c", + }, + { + "", + "", + }, + { + "abc", + "abc", + }, + } + for _, q := range sensitiveQueryParameterNames { + tests = append(tests, struct { + uri string + masked string + }{"kafka://127.0.0.1:9093/cdc?" + q + "=verysecure", + "kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx"}) + } + + for _, tt := range tests { + maskedURI := MaskSensitiveDataInURI(tt.uri) + require.Equal(t, tt.masked, maskedURI) + } +} From cc28fb3fffd9401ef5f607cb75f6d6a9348e3274 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Mon, 11 Sep 2023 18:28:04 +0800 Subject: [PATCH 4/4] add ut --- cdc/api/v2/changefeed.go | 1 - pkg/util/uri_test.go | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 265cbde9faf..b163b0ac0c8 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -117,7 +117,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CertAllowedCN: cfg.CertAllowedCN, } o, err := h.capture.GetOwner() - // cannot create changefeed if there are running lightning/restore tasks if err != nil { needRemoveGCSafePoint = true diff --git a/pkg/util/uri_test.go b/pkg/util/uri_test.go index 7e75921ab3b..97e16b511f0 100644 --- a/pkg/util/uri_test.go +++ b/pkg/util/uri_test.go @@ -111,8 +111,10 @@ func TestMaskSensitiveDataInURI(t *testing.T) { tests = append(tests, struct { uri string masked string - }{"kafka://127.0.0.1:9093/cdc?" + q + "=verysecure", - "kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx"}) + }{ + "kafka://127.0.0.1:9093/cdc?" + q + "=verysecure", + "kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx", + }) } for _, tt := range tests {