Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
126898: raft: persist Lead into the HardState r=nvanbenschoten a=arulajmani

Previously, leader information for a given term was stored as volatile state in raft. This change persists it durably by moving it to HardState. This is done in service of leader leases, where we want to ensure a follower doesn't violate its promise of support to the leader even after a restart.

Fixes cockroachdb#124421

Release note: None

126970: sql/sem/tree, backupccl, *: sanitize URLs during Format r=dt,rytaft a=michae2

Prior to this change, all SQL statements containing URLs would be formatted with the full URL in cleartext, including any secrets such as keys or passwords. These secrets would sometimes show up in the slow query log or sql audit log. This PR adds new functions `tree.(*FmtCtx).FormatURI`and `tree.(*FmtCtx).FormatURIs` which are designed to sanitize URLs during formatting.

See individual commits for details.

Fixes: CRDB-39710, TREQ-284

Epic: None

Release note: None

Co-authored-by: Arul Ajmani <arulajmani@gmail.com>
Co-authored-by: Michael Erickson <michae2@cockroachlabs.com>
  • Loading branch information
3 people committed Jul 17, 2024
3 parents d6b590c + e60feee + 390e5b3 commit 743b721
Show file tree
Hide file tree
Showing 79 changed files with 1,163 additions and 836 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,15 @@ func emitAlteredSchedule(
) error {
to := make([]string, len(stmt.To))
for i, dest := range stmt.To {
to[i] = tree.AsStringWithFlags(dest, tree.FmtBareStrings)
to[i] = tree.AsStringWithFlags(dest, tree.FmtBareStrings|tree.FmtShowFullURIs)
}
kmsURIs := make([]string, len(stmt.Options.EncryptionKMSURI))
for i, kmsURI := range stmt.Options.EncryptionKMSURI {
kmsURIs[i] = tree.AsStringWithFlags(kmsURI, tree.FmtBareStrings)
kmsURIs[i] = tree.AsStringWithFlags(kmsURI, tree.FmtBareStrings|tree.FmtShowFullURIs)
}
incDests := make([]string, len(stmt.Options.IncrementalStorage))
for i, incDest := range stmt.Options.IncrementalStorage {
incDests[i] = tree.AsStringWithFlags(incDest, tree.FmtBareStrings)
incDests[i] = tree.AsStringWithFlags(incDest, tree.FmtBareStrings|tree.FmtShowFullURIs)
}
if err := emitSchedule(job, stmt, to, nil, /* incrementalFrom */
kmsURIs, incDests, resultsCh); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ func backupJobDescription(
}

ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(b, ann), nil
return tree.AsStringWithFlags(
b, tree.FmtAlwaysQualifyNames|tree.FmtShowFullURIs, tree.FmtAnnotations(ann),
), nil
}

// annotatedBackupStatement is a tree.Backup, optionally
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler
backup.Options.EncryptionPassphrase = tree.NewDString("redacted")
}

m.BackupStatement = backup.String()
m.BackupStatement = tree.AsStringWithFlags(backup, tree.FmtShowFullURIs)
return json.Marshal(m)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func emitSchedule(
tree.NewDString(status),
nextRun,
tree.NewDString(sj.ScheduleExpr()),
tree.NewDString(tree.AsString(redactedBackupNode)),
tree.NewDString(tree.AsStringWithFlags(redactedBackupNode, tree.FmtShowFullURIs)),
}
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,9 @@ func restoreJobDescription(
}

ann := p.ExtendedEvalContext().Annotations
return tree.AsStringWithFQNames(r, ann), nil
return tree.AsStringWithFlags(
r, tree.FmtAlwaysQualifyNames|tree.FmtShowFullURIs, tree.FmtAnnotations(ann),
), nil
}

func restoreTypeCheck(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func changefeedJobDescription(
return "", err
}
sort.Slice(c.Options, func(i, j int) bool { return c.Options[i].Key < c.Options[j].Key })
return tree.AsString(c), nil
return tree.AsStringWithFlags(c, tree.FmtShowFullURIs), nil
}

func logSanitizedChangefeedDestination(ctx context.Context, destination string) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedpb/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ func (m ScheduledChangefeedExecutionArgs) MarshalJSONPB(x *jsonpb.Marshaler) ([]
}
export.SinkURI = tree.NewDString(sinkURI)

m.ChangefeedStatement = export.String()
m.ChangefeedStatement = tree.AsStringWithFlags(export, tree.FmtShowFullURIs)
return json.Marshal(m)
}
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (e *externalConnectionFeedFactory) Feed(
}
createStmt.SinkURI = tree.NewStrVal(`external://` + randomExternalConnectionName)

return e.TestFeedFactory.Feed(createStmt.String(), args...)
return e.TestFeedFactory.Feed(tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...)
}

func setURI(
Expand Down Expand Up @@ -881,7 +881,7 @@ func (f *tableFeedFactory) Feed(
return nil, err
}

if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := f.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
return nil, err
}
return c, nil
Expand Down Expand Up @@ -1124,7 +1124,7 @@ func (f *cloudFeedFactory) Feed(
dir: feedDir,
isBare: createStmt.Select != nil && !explicitEnvelope,
}
if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := f.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
return nil, err
}
return c, nil
Expand Down Expand Up @@ -1822,7 +1822,7 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes
registry: registry,
}

if err := k.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := k.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
return nil, errors.CombineErrors(err, c.Close())
}
return c, nil
Expand Down Expand Up @@ -2052,7 +2052,7 @@ func (f *webhookFeedFactory) Feed(create string, args ...interface{}) (cdctest.T
isBare: createStmt.Select != nil && !explicitEnvelope,
mockSink: sinkDest,
}
if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := f.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
sinkDest.Close()
return nil, err
}
Expand Down Expand Up @@ -2448,7 +2448,7 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
deprecatedClient: deprecatedClient,
}

if err := p.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := p.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
_ = mockServer.Close()
return nil, err
}
Expand Down Expand Up @@ -2723,7 +2723,7 @@ func (p *pulsarFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
pulsarServer: mockServer,
}

if err := p.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {
if err := p.startFeedJob(c.jobFeed, tree.AsStringWithFlags(createStmt, tree.FmtShowPasswords), args...); err != nil {
return nil, err
}
return c, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/telemetryccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/sql/sqltestutils",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand Down
18 changes: 12 additions & 6 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -267,7 +268,9 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`IMPORT INTO a CSV DATA ('%s')`, srv.URL),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "import",
stmt: fmt.Sprintf(`IMPORT INTO defaultdb.public.a CSV DATA ('%s')`, srv.URL),
stmt: fmt.Sprintf(
`IMPORT INTO defaultdb.public.a CSV DATA (%s)`, tree.PasswordSubstitution,
),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand All @@ -279,7 +282,10 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`IMPORT INTO a CSV DATA ('%s') WITH detached`, srv.URL),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "import",
stmt: fmt.Sprintf(`IMPORT INTO defaultdb.public.a CSV DATA ('%s') WITH OPTIONS (detached)`, srv.URL),
stmt: fmt.Sprintf(
`IMPORT INTO defaultdb.public.a CSV DATA (%s) WITH OPTIONS (detached)`,
tree.PasswordSubstitution,
),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand All @@ -291,7 +297,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`BACKUP DATABASE mydb INTO '%s'`, nodelocal.MakeLocalStorageURI("test1")),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "backup",
stmt: fmt.Sprintf(`BACKUP DATABASE mydb INTO '%s'`, nodelocal.MakeLocalStorageURI("test1")),
stmt: fmt.Sprintf(`BACKUP DATABASE mydb INTO %s`, tree.PasswordSubstitution),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand All @@ -303,7 +309,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`BACKUP DATABASE mydb INTO '%s' WITH detached`, nodelocal.MakeLocalStorageURI("test1")),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "backup",
stmt: fmt.Sprintf(`BACKUP DATABASE mydb INTO '%s' WITH OPTIONS (detached)`, nodelocal.MakeLocalStorageURI("test1")),
stmt: fmt.Sprintf(`BACKUP DATABASE mydb INTO %s WITH OPTIONS (detached)`, tree.PasswordSubstitution),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand All @@ -315,7 +321,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`RESTORE DATABASE mydb FROM LATEST IN '%s'`, nodelocal.MakeLocalStorageURI("test1")),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "restore",
stmt: fmt.Sprintf(`RESTORE DATABASE mydb FROM 'latest' IN '%s'`, nodelocal.MakeLocalStorageURI("test1")),
stmt: fmt.Sprintf(`RESTORE DATABASE mydb FROM 'latest' IN %s`, tree.PasswordSubstitution),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand All @@ -327,7 +333,7 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
query: fmt.Sprintf(`RESTORE DATABASE mydb FROM LATEST IN '%s' WITH detached`, nodelocal.MakeLocalStorageURI("test1")),
sampleQueryEvent: expectedSampleQueryEvent{
eventType: "restore",
stmt: fmt.Sprintf(`RESTORE DATABASE mydb FROM 'latest' IN '%s' WITH OPTIONS (detached)`, nodelocal.MakeLocalStorageURI("test1")),
stmt: fmt.Sprintf(`RESTORE DATABASE mydb FROM 'latest' IN %s WITH OPTIONS (detached)`, tree.PasswordSubstitution),
},
recoveryEvent: expectedRecoveryEvent{
numRows: 3,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/externalconn/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (e *MutableExternalConnection) UnredactedConnectionStatement() string {
},
As: tree.NewDString(e.rec.ConnectionDetails.UnredactedURI()),
}
return tree.AsString(ecNode)
return tree.AsStringWithFlags(ecNode, tree.FmtShowFullURIs)
}

// RedactedConnectionURI implements the ExternalConnection interface and
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
Term uint64
Vote raftpb.PeerID
Commit uint64
Lead raftpb.PeerID
}
// Conversion fails if new fields are added to `HardState`, in which case this method
// and the expected sums should be updated.
Expand All @@ -64,6 +65,7 @@ func TestBelowRaftProtosDontChange(t *testing.T) {
Term: n % 3,
Vote: raftpb.PeerID(n % 7),
Commit: n % 11,
Lead: raftpb.PeerID(n % 13),
}
},
func(r *rand.Rand) protoutil.Message {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
Term: m.Term,
Vote: m.Vote,
Commit: m.Commit,
Lead: m.Lead,
}
if !raft.IsEmptyHardState(hs) {
// NB: Note that without additional safeguards, it's incorrect to write
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,23 @@ func (sl StateLoader) SynthesizeHardState(
return errors.Newf("can't decrease HardState.Commit from %d to %d",
redact.Safe(oldHS.Commit), redact.Safe(newHS.Commit))
}

// TODO(arul): This function can be called with an empty OldHS. In all other
// cases, where a term is included, we should be able to assert that the term
// isn't regressing (i.e. oldHS.Term >= newHS.Term).

if oldHS.Term > newHS.Term {
// The existing HardState is allowed to be ahead of us, which is
// relevant in practice for the split trigger. We already checked above
// that we're not rewinding the acknowledged index, and we haven't
// updated votes yet.
newHS.Term = oldHS.Term
}
// If the existing HardState voted in this term, remember that.
// If the existing HardState voted in this term and knows who the leader is,
// remember that.
if oldHS.Term == newHS.Term {
newHS.Vote = oldHS.Vote
newHS.Lead = oldHS.Lead
}
err := sl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
Expand Down
52 changes: 24 additions & 28 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

var hasReady bool
var softState *raft.SoftState
var outboundMsgs []raftpb.Message
var msgStorageAppend, msgStorageApply raftpb.Message
r.mu.Lock()
Expand Down Expand Up @@ -836,7 +835,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

logRaftReady(ctx, syncRd)
asyncRd := makeAsyncReady(syncRd)
softState = asyncRd.SoftState
outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages)
}
// We unquiesce if we have a Ready (= there's work to do). We also have
Expand Down Expand Up @@ -876,24 +874,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, nil
}

refreshReason := noReason
if softState != nil && leaderID != roachpb.ReplicaID(softState.Lead) {
// Refresh pending commands if the Raft leader has changed. This is usually
// the first indication we have of a new leader on a restarted node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, softState.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(softState.Lead)
}

r.traceMessageSends(outboundMsgs, "sending messages")
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* willDeliverLocal */)

Expand Down Expand Up @@ -941,7 +921,28 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

refreshReason := noReason
if hasMsg(msgStorageAppend) {
// Leadership changes, if any, are communicated through MsgStorageAppends.
// Check if that's the case here.
if msgStorageAppend.Lead != raft.None && leaderID != roachpb.ReplicaID(msgStorageAppend.Lead) {
// Refresh pending commands if the Raft leader has changed. This is
// usually the first indication we have of a new leader on a restarted
// node.
//
// TODO(peter): Re-proposing commands when SoftState.Lead changes can lead
// to wasteful multiple-reproposals when we later see an empty Raft command
// indicating a newly elected leader or a conf change. Replay protection
// prevents any corruption, so the waste is only a performance issue.
if log.V(3) {
log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, msgStorageAppend.Lead)
}
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
refreshReason = reasonNewLeader
}
leaderID = roachpb.ReplicaID(msgStorageAppend.Lead)
}

if msgStorageAppend.Snapshot != nil {
if inSnap.Desc == nil {
// If we didn't expect Raft to have a snapshot but it has one
Expand Down Expand Up @@ -969,6 +970,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
Term: msgStorageAppend.Term,
Vote: msgStorageAppend.Vote,
Commit: msgStorageAppend.Commit,
Lead: msgStorageAppend.Lead,
}
if len(msgStorageAppend.Entries) != 0 {
log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot")
Expand Down Expand Up @@ -1204,11 +1206,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// All fields in asyncReady are read-only.
// TODO(nvanbenschoten): move this into go.etcd.io/raft.
type asyncReady struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*raft.SoftState

// Messages specifies outbound messages to other peers and to local storage
// threads. These messages can be sent in any order.
//
Expand All @@ -1220,8 +1217,7 @@ type asyncReady struct {
// makeAsyncReady constructs an asyncReady from the provided Ready.
func makeAsyncReady(rd raft.Ready) asyncReady {
return asyncReady{
SoftState: rd.SoftState,
Messages: rd.Messages,
Messages: rd.Messages,
}
}

Expand Down Expand Up @@ -2023,7 +2019,7 @@ func (r *Replica) hasOutstandingSnapshotInFlightToStore(

// HasRaftLeader returns true if the raft group has a raft leader currently.
func HasRaftLeader(raftStatus *raft.Status) bool {
return raftStatus != nil && raftStatus.SoftState.Lead != 0
return raftStatus != nil && raftStatus.HardState.Lead != 0
}

// pendingCmdSlice sorts by increasing MaxLeaseIndex.
Expand Down
Loading

0 comments on commit 743b721

Please sign in to comment.