Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(ticdc): do not print password in cdc log (#9691) #9725

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

o, err := h.capture.GetOwner()
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -338,7 +331,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
9 changes: 2 additions & 7 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,8 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
o, err := h.capture.GetOwner()
// cannot create changefeed if there are running lightning/restore tasks
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
Expand All @@ -147,7 +142,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
17 changes: 11 additions & 6 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type ChangeFeedID struct {
ID string
}

// String implements fmt.Stringer interface
func (c ChangeFeedID) String() string {
return c.Namespace + "/" + c.ID
}

// DefaultChangeFeedID returns `ChangeFeedID` with default namespace
func DefaultChangeFeedID(id string) ChangeFeedID {
return ChangeFeedID{
Expand Down Expand Up @@ -233,9 +238,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -476,11 +481,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err
)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mysql/mysql_syncpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func newMySQLSyncPointStore(
return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
}

log.Info("Start mysql syncpoint sink")
log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String()))

return &mysqlSyncPointStore{
db: syncDB,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
var secondaryTs string
err = row.Scan(&secondaryTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String()))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("open db failed", zap.Error(err))
return nil, errors.Trace(err)
}

Expand All @@ -879,7 +879,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("ping db failed", zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("open db success", zap.String("dsn", dsn))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/consistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
)

// ConsistentConfig represents replication consistency config for a changefeed.
Expand Down Expand Up @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
}
return redo.ValidateStorage(uri)
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
func (c *ConsistentConfig) MaskSensitiveData() {
c.Storage = util.MaskSensitiveDataInURI(c.Storage)
}
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,13 @@ func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url

return nil
}

// MaskSensitiveData masks sensitive data in ReplicaConfig
func (c *ReplicaConfig) MaskSensitiveData() {
if c.Sink != nil {
c.Sink.MaskSensitiveData()
}
if c.Consistent != nil {
c.Consistent.MaskSensitiveData()
}
}
25 changes: 25 additions & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -253,3 +254,27 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) {
require.NoError(t, err)
require.False(t, config.EnableOldValue)
}

func TestMaskSensitiveData(t *testing.T) {
config := ReplicaConfig{
Sink: nil,
Consistent: nil,
}
config.MaskSensitiveData()
require.Nil(t, config.Sink)
require.Nil(t, config.Consistent)
config.Sink = &SinkConfig{}
config.Sink.KafkaConfig = &KafkaConfig{
SASLOAuthTokenURL: aws.String("http://abc.com?password=bacd"),
SASLOAuthClientSecret: aws.String("bacd"),
}
config.Sink.SchemaRegistry = "http://abc.com?password=bacd"
config.Consistent = &ConsistentConfig{
Storage: "http://abc.com?password=bacd",
}
config.MaskSensitiveData()
require.Equal(t, "http://abc.com?password=xxxxx", config.Sink.SchemaRegistry)
require.Equal(t, "http://abc.com?password=xxxxx", config.Consistent.Storage)
require.Equal(t, "http://abc.com?password=xxxxx", *config.Sink.KafkaConfig.SASLOAuthTokenURL)
require.Equal(t, "******", *config.Sink.KafkaConfig.SASLOAuthClientSecret)
}
29 changes: 29 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -154,6 +156,16 @@ type KafkaConfig struct {
LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
func (s *SinkConfig) MaskSensitiveData() {
if s.SchemaRegistry != "" {
s.SchemaRegistry = util.MaskSensitiveDataInURI(s.SchemaRegistry)
}
if s.KafkaConfig != nil {
s.KafkaConfig.MaskSensitiveData()
}
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down Expand Up @@ -273,6 +285,23 @@ type ColumnSelector struct {
Columns []string `toml:"columns" json:"columns"`
}

// CodecConfig represents a MQ codec configuration
type CodecConfig struct {
EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"`
MaxBatchSize *int `toml:"max-batch-size" json:"max-batch-size,omitempty"`
AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"`
AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"`
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLOAuthClientSecret = aws.String("******")
if k.SASLOAuthTokenURL != nil {
k.SASLOAuthTokenURL = aws.String(util.MaskSensitiveDataInURI(*k.SASLOAuthTokenURL))
}
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) {
}
return uriParsed.Redacted(), nil
}

var sensitiveQueryParameterNames = []string{
"password",
"sasl-password",
"access-key",
"secret-access-key",
"access_token",
"token",
"secret",
"passwd",
"pwd",
}

// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
for _, secretKey := range sensitiveQueryParameterNames {
if queries.Has(secretKey) {
queries.Set(secretKey, "xxxxx")
}
}
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}
34 changes: 34 additions & 0 deletions pkg/util/uri_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,37 @@ func TestMaskSinkURI(t *testing.T) {
require.Equal(t, tt.masked, maskedURI)
}
}

func TestMaskSensitiveDataInURI(t *testing.T) {
tests := []struct {
uri string
masked string
}{
{
"mysql://root:123456@127.0.0.1:3306/?time-zone=c",
"mysql://root:xxxxx@127.0.0.1:3306/?time-zone=c",
},
{
"",
"",
},
{
"abc",
"abc",
},
}
for _, q := range sensitiveQueryParameterNames {
tests = append(tests, struct {
uri string
masked string
}{
"kafka://127.0.0.1:9093/cdc?" + q + "=verysecure",
"kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx",
})
}

for _, tt := range tests {
maskedURI := MaskSensitiveDataInURI(tt.uri)
require.Equal(t, tt.masked, maskedURI)
}
}