Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(ticdc): do not print password in cdc log #9691

Merged
merged 3 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand All @@ -331,7 +324,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
8 changes: 1 addition & 7 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
Expand Down Expand Up @@ -176,7 +170,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
12 changes: 6 additions & 6 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -540,11 +540,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -726,7 +726,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -35,9 +34,6 @@ type PulsarMockProducers struct {
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
totalPartitionsNum int32, message *common.Message,
) error {
// call SyncSendMessage

log.Info("pulsarProducers SyncBroadcastMessage in")
return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message)
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func NewKafkaDDLSink(
}

start := time.Now()
log.Info("Try to create a DDL sink producer", zap.Any("options", options))
log.Info("Try to create a DDL sink producer",
zap.String("changefeed", changefeedID.String()))
syncProducer, err := factory.SyncProducer(ctx)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

log.Info("Try to create a DDL sink producer", zap.Any("pulsarConfig", pConfig))
log.Info("Try to create a DDL sink producer",
zap.String("changefeed", changefeedID.String()))

// NewEventRouter
eventRouter, err := dispatcher.NewEventRouter(replicaConfig, defaultTopic, sinkURI.Scheme)
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/dmlproducer/pulsar_dml_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func NewPulsarDMLProducer(

var pulsarConfig *config.PulsarConfig
if sinkConfig.PulsarConfig == nil {
log.Error("new pulsar DML producer fail",
zap.Any("sink:pulsar config is empty", sinkConfig.PulsarConfig))
log.Error("new pulsar DML producer fail,sink:pulsar config is empty")
return nil, cerror.ErrPulsarInvalidConfig.
GenWithStackByArgs("pulsar config is empty")
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ func NewKafkaDMLSink(
eventRouter, encoderGroup, protocol, errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID),
zap.Any("options", options))
zap.String("changefeedID", changefeedID.ID))

return s, nil
}
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/mq/pulsar_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ func NewPulsarDMLSink(
}

failpointCh := make(chan error, 1)
log.Info("Try to create a DML sink producer", zap.Any("pulsar", pConfig))
log.Info("Try to create a DML sink producer", zap.String("changefeed", changefeedID.String()))
start := time.Now()
p, err := producerCreator(ctx, changefeedID, client, replicaConfig.Sink, errCh, failpointCh)
log.Info("DML sink producer created",
zap.String("changefeed", changefeedID.String()),
zap.Duration("duration", time.Since(start)))
if err != nil {
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/syncpointstore/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newMySQLSyncPointStore(
return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
}

log.Info("Start mysql syncpoint sink")
log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String()))

return &mysqlSyncPointStore{
db: syncDB,
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
var secondaryTs string
err = row.Scan(&secondaryTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String()))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("open db failed", zap.Error(err))
return nil, errors.Trace(err)
}

Expand All @@ -957,7 +957,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("ping db failed", zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("open db success", zap.String("dsn", dsn))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
)

// ConsistentConfig represents replication consistency config for a changefeed.
Expand Down Expand Up @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
}
return redo.ValidateStorage(uri)
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
func (c *ConsistentConfig) MaskSensitiveData() {
c.Storage = util.MaskSensitiveDataInURI(c.Storage)
}
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,13 @@ func isSinkCompatibleWithSpanReplication(u *url.URL) bool {
return u != nil &&
(strings.Contains(u.Scheme, "kafka") || strings.Contains(u.Scheme, "blackhole"))
}

// MaskSensitiveData masks sensitive data in ReplicaConfig
func (c *ReplicaConfig) MaskSensitiveData() {
if c.Sink != nil {
c.Sink.MaskSensitiveData()
}
if c.Consistent != nil {
c.Consistent.MaskSensitiveData()
}
}
40 changes: 40 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -160,6 +161,19 @@ type SinkConfig struct {
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
func (s *SinkConfig) MaskSensitiveData() {
if s.SchemaRegistry != nil {
s.SchemaRegistry = aws.String(util.MaskSensitiveDataInURI(*s.SchemaRegistry))
}
if s.KafkaConfig != nil {
s.KafkaConfig.MaskSensitiveData()
}
if s.PulsarConfig != nil {
s.PulsarConfig.MaskSensitiveData()
}
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down Expand Up @@ -328,6 +342,19 @@ type KafkaConfig struct {
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"`
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLPassword = aws.String("********")
k.SASLGssAPIPassword = aws.String("********")
k.SASLOAuthClientSecret = aws.String("********")
k.Key = aws.String("********")
if k.GlueSchemaRegistryConfig != nil {
k.GlueSchemaRegistryConfig.AccessKey = "********"
k.GlueSchemaRegistryConfig.Token = "********"
k.GlueSchemaRegistryConfig.SecretAccessKey = "********"
}
}

// PulsarCompressionType is the compression type for pulsar
type PulsarCompressionType string

Expand Down Expand Up @@ -472,6 +499,19 @@ type PulsarConfig struct {
u *url.URL `toml:"-" json:"-"`
}

// MaskSensitiveData masks sensitive data in PulsarConfig
func (c *PulsarConfig) MaskSensitiveData() {
if c.AuthenticationToken != nil {
c.AuthenticationToken = aws.String("******")
}
if c.BasicPassword != nil {
c.BasicPassword = aws.String("******")
}
if c.OAuth2 != nil {
c.OAuth2.OAuth2PrivateKey = "******"
}
}

// Check get broker url
func (c *PulsarConfig) validate() (err error) {
if c.OAuth2 != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/confluent_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (m *confluentSchemaManager) ClearRegistry(ctx context.Context, schemaSubjec
uri := m.registryURL + "/subjects/" + url.QueryEscape(schemaSubject)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) {
}
return uriParsed.Redacted(), nil
}

var name = []string{
"password",
"sasl-password",
"access-key",
"secret-access-key",
"access_token",
"token",
"secret",
"passwd",
"pwd",
}

// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse sink URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
for _, secretKey := range name {
if queries.Has(secretKey) {
queries.Set(secretKey, "******")
}
}
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}