From 78198f9ea3433fadd5b1a183a23698e16425a8c7 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Wed, 6 Sep 2023 14:48:04 +0800 Subject: [PATCH 1/2] do not print password in cdc log --- cdc/api/v1/api.go | 11 +++-------- cdc/api/v2/changefeed.go | 8 +------- cdc/model/changefeed.go | 6 +++--- cdc/owner/changefeed.go | 4 ++-- .../mq/ddlproducer/pulsar_ddl_mock_producer.go | 4 ---- cdc/sink/ddlsink/mq/kafka_ddl_sink.go | 3 ++- cdc/sink/ddlsink/mq/pulsar_ddl_sink.go | 3 ++- .../dmlsink/mq/dmlproducer/pulsar_dml_producer.go | 3 +-- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 3 +-- cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 3 ++- cdc/syncpointstore/mysql_syncpoint_store.go | 4 ++-- cmd/kafka-consumer/main.go | 4 ++-- pkg/sink/codec/avro/confluent_schema_registry.go | 2 +- 13 files changed, 22 insertions(+), 36 deletions(-) diff --git a/cdc/api/v1/api.go b/cdc/api/v1/api.go index 61e047a5e6f..690008cb023 100644 --- a/cdc/api/v1/api.go +++ b/cdc/api/v1/api.go @@ -308,13 +308,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { _ = c.Error(err) return } - - infoStr, err := info.Marshal() - if err != nil { - _ = c.Error(err) - return - } - upstreamInfo := &model.UpstreamInfo{ ID: up.ID, PDEndpoints: strings.Join(up.PdEndpoints, ","), @@ -331,7 +324,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) { return } - log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr)) + log.Info("Create changefeed successfully!", + zap.String("id", changefeedConfig.ID), + zap.String("changefeed", info.String())) c.Status(http.StatusAccepted) } diff --git a/cdc/api/v2/changefeed.go b/cdc/api/v2/changefeed.go index 76539060705..7a9468be7f2 100644 --- a/cdc/api/v2/changefeed.go +++ b/cdc/api/v2/changefeed.go @@ -135,12 +135,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { CAPath: cfg.CAPath, CertAllowedCN: cfg.CertAllowedCN, } - infoStr, err := info.Marshal() - if err != nil { - needRemoveGCSafePoint = true - _ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err)) - return - } // cannot create changefeed if there are running lightning/restore tasks tlsCfg, err := credential.ToTLSConfig() @@ -176,7 +170,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ID), - zap.String("changefeed", infoStr)) + zap.String("changefeed", info.String())) c.JSON(http.StatusOK, toAPIModel(info, info.StartTs, info.StartTs, nil, true)) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index b910a3894a9..7edc6e72f87 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -561,11 +561,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() { } func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) { - oldRawQuery := uri.RawQuery newRawQuery := newQuery.Encode() + maskedURI, _ := util.MaskSinkURI(uri.String()) log.Info("handle incompatible protocol from sink URI", - zap.String("oldUriQuery", oldRawQuery), - zap.String("fixedUriQuery", newQuery.Encode())) + zap.String("oldURI", maskedURI), + zap.String("newProtocol", newProtocol)) uri.RawQuery = newRawQuery fixedSinkURI := uri.String() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 336c88a2ec5..c93d83baae2 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -650,7 +650,7 @@ LOOP2: zap.Uint64("changefeedEpoch", epoch), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.resolvedTs), - zap.Stringer("info", c.state.Info)) + zap.String("info", c.state.Info.String())) return nil } @@ -726,7 +726,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), zap.Any("status", c.state.Status), - zap.Stringer("info", c.state.Info), + zap.String("info", c.state.Info.String()), zap.Bool("isRemoved", c.isRemoved)) } diff --git a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go index 4005834c416..561d5515a5c 100644 --- a/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go +++ b/cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go @@ -17,7 +17,6 @@ import ( "context" "github.com/apache/pulsar-client-go/pulsar" - "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -35,9 +34,6 @@ type PulsarMockProducers struct { func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, totalPartitionsNum int32, message *common.Message, ) error { - // call SyncSendMessage - - log.Info("pulsarProducers SyncBroadcastMessage in") return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message) } diff --git a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go index d5e6fba688e..1bea0ee93a3 100644 --- a/cdc/sink/ddlsink/mq/kafka_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/kafka_ddl_sink.go @@ -104,7 +104,8 @@ func NewKafkaDDLSink( } start := time.Now() - log.Info("Try to create a DDL sink producer", zap.Any("options", options)) + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) syncProducer, err := factory.SyncProducer(ctx) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go index a19ccc754a9..6015ad46e02 100644 --- a/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go +++ b/cdc/sink/ddlsink/mq/pulsar_ddl_sink.go @@ -62,7 +62,8 @@ func NewPulsarDDLSink( return nil, errors.Trace(err) } - log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig)) + log.Info("Try to create a DDL sink producer", + zap.String("changefeed", changefeedID.String())) // NewEventRouter eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme) diff --git a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go index 6b11a13b89a..cb9c84f8c28 100644 --- a/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go +++ b/cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go @@ -78,8 +78,7 @@ func NewPulsarDMLProducer( var pulsarConfig *config.PulsarConfig if sinkConfig.PulsarConfig == nil { - log.Error("new pulsar DML producer fail", - zap.Any("sink:pulsar config is empty", sinkConfig.PulsarConfig)) + log.Error("new pulsar DML producer fail,sink:pulsar config is empty") return nil, cerror.ErrPulsarInvalidConfig. GenWithStackByArgs("pulsar config is empty") } diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 6295226bccb..280f5664d33 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -140,8 +140,7 @@ func NewKafkaDMLSink( ) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), - zap.String("changefeedID", changefeedID.ID), - zap.Any("options", options)) + zap.String("changefeedID", changefeedID.ID)) return s, nil } diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 019bede6c0a..e9853dda57d 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -71,10 +71,11 @@ func NewPulsarDMLSink( } failpointCh := make(chan error, 1) - log.Info("Try to create a DML sink producer", zap.Any("pulsar", pConfig)) + log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String())) start := time.Now() p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh) log.Info("DML sink producer created", + zap.String("changefeed", changefeedID.String()), zap.Duration("duration", time.Since(start))) if err != nil { defer func() { diff --git a/cdc/syncpointstore/mysql_syncpoint_store.go b/cdc/syncpointstore/mysql_syncpoint_store.go index 48b163558b7..ce4e88df234 100644 --- a/cdc/syncpointstore/mysql_syncpoint_store.go +++ b/cdc/syncpointstore/mysql_syncpoint_store.go @@ -77,7 +77,7 @@ func newMySQLSyncPointStore( return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } - log.Info("Start mysql syncpoint sink") + log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String())) return &mysqlSyncPointStore{ db: syncDB, @@ -146,7 +146,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context, var secondaryTs string err = row.Scan(&secondaryTs) if err != nil { - log.Info("sync table: get tidb_current_ts err") + log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String())) err2 := tx.Rollback() if err2 != nil { log.Error("failed to write syncpoint table", zap.Error(err)) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index f12dadccaf2..4627174a07a 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -946,7 +946,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti func openDB(ctx context.Context, dsn string) (*sql.DB, error) { db, err := sql.Open("mysql", dsn) if err != nil { - log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("open db failed", zap.Error(err)) return nil, errors.Trace(err) } @@ -957,7 +957,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err = db.PingContext(ctx); err != nil { - log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err)) + log.Error("ping db failed", zap.Error(err)) return nil, errors.Trace(err) } log.Info("open db success", zap.String("dsn", dsn)) diff --git a/pkg/sink/codec/avro/confluent_schema_registry.go b/pkg/sink/codec/avro/confluent_schema_registry.go index ced003b271a..9f06fe64587 100644 --- a/pkg/sink/codec/avro/confluent_schema_registry.go +++ b/pkg/sink/codec/avro/confluent_schema_registry.go @@ -368,7 +368,7 @@ func (m *confluentSchemaManager) ClearRegistry(ctx context.Context, schemaSubjec uri := m.registryURL + "/subjects/" + url.QueryEscape(schemaSubject) req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil) if err != nil { - log.Error("Could not construct request for clearRegistry", zap.String("uri", uri)) + log.Error("Could not construct request for clearRegistry", zap.Error(err)) return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err) } req.Header.Add( From 9971834ea1912349f87660b58421a3bd170fbd75 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Thu, 7 Sep 2023 18:04:02 +0800 Subject: [PATCH 2/2] do not print password in cdc log --- cdc/model/changefeed.go | 6 +++--- pkg/config/consistent.go | 6 ++++++ pkg/config/replica_config.go | 10 +++++++++ pkg/config/sink.go | 40 ++++++++++++++++++++++++++++++++++++ pkg/util/uri.go | 29 ++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 3 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 7edc6e72f87..69e9fd1269f 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -240,9 +240,9 @@ func (info *ChangeFeedInfo) String() (str string) { return } - clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI) - if err != nil { - log.Error("failed to marshal changefeed info", zap.Error(err)) + clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI) + if clone.Config != nil { + clone.Config.MaskSensitiveData() } str, err = clone.Marshal() diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 85baaf5673f..636edcf865f 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/util" ) // ConsistentConfig represents replication consistency config for a changefeed. @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { } return redo.ValidateStorage(uri) } + +// MaskSensitiveData masks sensitive data in ConsistentConfig +func (c *ConsistentConfig) MaskSensitiveData() { + c.Storage = util.MaskSensitiveDataInURI(c.Storage) +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 6ccfa25b36c..59b28cd9c3d 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -340,3 +340,13 @@ func (c *ReplicaConfig) adjustEnableOldValueAndVerifyForceReplicate(sinkURI *url return nil } + +// MaskSensitiveData masks sensitive data in ReplicaConfig +func (c *ReplicaConfig) MaskSensitiveData() { + if c.Sink != nil { + c.Sink.MaskSensitiveData() + } + if c.Consistent != nil { + c.Consistent.MaskSensitiveData() + } +} diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 2476f84edb0..fd10ad6bf42 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -20,6 +20,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -172,6 +173,19 @@ type SinkConfig struct { AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"` } +// MaskSensitiveData masks sensitive data in SinkConfig +func (s *SinkConfig) MaskSensitiveData() { + if s.SchemaRegistry != nil { + s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry)) + } + if s.KafkaConfig != nil { + s.KafkaConfig.MaskSensitiveData() + } + if s.PulsarConfig != nil { + s.PulsarConfig.MaskSensitiveData() + } +} + // CSVConfig defines a series of configuration items for csv codec. type CSVConfig struct { // delimiter between fields @@ -340,6 +354,19 @@ type KafkaConfig struct { GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` } +// MaskSensitiveData masks sensitive data in KafkaConfig +func (k *KafkaConfig) MaskSensitiveData() { + k.SASLPassword = aws.String("********") + k.SASLGssAPIPassword = aws.String("********") + k.SASLOAuthClientSecret = aws.String("********") + k.Key = aws.String("********") + if k.GlueSchemaRegistryConfig != nil { + k.GlueSchemaRegistryConfig.AccessKey = "********" + k.GlueSchemaRegistryConfig.Token = "********" + k.GlueSchemaRegistryConfig.SecretAccessKey = "********" + } +} + // PulsarCompressionType is the compression type for pulsar type PulsarCompressionType string @@ -484,6 +511,19 @@ type PulsarConfig struct { u *url.URL `toml:"-" json:"-"` } +// MaskSensitiveData masks sensitive data in PulsarConfig +func (c *PulsarConfig) MaskSensitiveData() { + if c.AuthenticationToken != nil { + c.AuthenticationToken = aws.String("******") + } + if c.BasicPassword != nil { + c.BasicPassword = aws.String("******") + } + if c.OAuth2 != nil { + c.OAuth2.OAuth2PrivateKey = "******" + } +} + // Check get broker url func (c *PulsarConfig) validate() (err error) { if c.OAuth2 != nil { diff --git a/pkg/util/uri.go b/pkg/util/uri.go index dfe2b66f7b8..d033dd4d303 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) { } return uriParsed.Redacted(), nil } + +var name = []string{ + "password", + "sasl-password", + "access-key", + "secret-access-key", + "access_token", + "token", + "secret", + "passwd", + "pwd", +} + +// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked. +func MaskSensitiveDataInURI(uri string) string { + uriParsed, err := url.Parse(uri) + if err != nil { + log.Error("failed to parse sink URI", zap.Error(err)) + return "" + } + queries := uriParsed.Query() + for _, secretKey := range name { + if queries.Has(secretKey) { + queries.Set(secretKey, "******") + } + } + uriParsed.RawQuery = queries.Encode() + return uriParsed.Redacted() +}