Skip to content

Commit

Permalink
heartbeat: use leader's ACL token when failing heartbeat (#24241)
Browse files Browse the repository at this point in the history
In #23838 we updated the `Node.Update` RPC handler we use for heartbeats to be
more strict about requiring node secrets. But when a node goes down, it's the
leader that sends the request to mark the node down via `Node.Update` (to
itself), and this request was missing the leader ACL needed to authenticate to
itself.

Add the leader ACL to the request and update the RPC handler test for
disconnected-clients to use ACLs, which would have detected this bug. Also added
a note to the `Authenticate` comment about how that authentication path requires
the leader ACL.

Fixes: #24231
Ref: https://hashicorp.atlassian.net/browse/NET-11384
  • Loading branch information
tgross authored Oct 17, 2024
1 parent e440e1d commit 55fe05d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .changelog/24241.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
heartbeat: Fixed a bug where failed nodes would not be marked down
```
4 changes: 4 additions & 0 deletions nomad/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func NewAuthenticator(cfg *AuthenticatorConfig) *Authenticator {
// an ephemeral ACLToken makes the original of the credential clear to RPC
// handlers, who may have different behavior for internal vs external origins.
//
// Note: when making a server-to-server RPC that authenticates with this method,
// the RPC *must* include the leader's ACL token. Use AuthenticateServerOnly for
// requests that don't have access to the leader's ACL token.
//
// Note: when called on the follower we'll be making stale queries, so it's
// possible if the follower is behind that the leader will get a different value
// if an ACL token or allocation's WI has just been created.
Expand Down
3 changes: 2 additions & 1 deletion nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
Status: structs.NodeStatusDown,
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
WriteRequest: structs.WriteRequest{
Region: h.srv.config.Region,
Region: h.srv.config.Region,
AuthToken: h.srv.getLeaderAcl(),
},
}

Expand Down
38 changes: 23 additions & 15 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// Setup server with tighter heartbeat so we don't have to wait so long
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
s, rootToken, cleanupS := TestACLServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
Expand Down Expand Up @@ -1001,7 +1001,8 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
go heartbeat(heartbeatCtx)

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusReady, rootToken.SecretID)

// Register job with Disconnect.LostAfter
job := version.jobSpec(time.Hour)
Expand All @@ -1018,22 +1019,24 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
AuthToken: rootToken.SecretID,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc to be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})
}, rootToken.SecretID)

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
AuthToken: rootToken.SecretID,
},
}
var allocsResp structs.NodeAllocsResponse
Expand All @@ -1058,17 +1061,18 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
must.NoError(t, err)

// Wait for alloc to be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
}, rootToken.SecretID)

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
cancelHeartbeat()
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusDisconnected, rootToken.SecretID)
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})
}, rootToken.SecretID)

// Restart heartbeat to reconnect node.
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
Expand All @@ -1081,15 +1085,17 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// allocs status with the server so the scheduler have the necessary
// information to avoid unnecessary placements.
time.Sleep(3 * heartbeatTTL)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusInit, rootToken.SecretID)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
AuthToken: rootToken.SecretID,
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
Expand All @@ -1104,10 +1110,12 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// - client status is ready.
// - only 1 alloc and the alloc is running.
// - all evals are terminal, so cluster is in a stable state.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusReady, rootToken.SecretID)
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
}, rootToken.SecretID)

testutil.WaitForResult(func() (bool, error) {
state := s.fsm.State()
ws := memdb.NewWatchSet()
Expand Down
18 changes: 14 additions & 4 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,27 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady)
}

// WaitForClientStatus blocks until the client is in the expected status.
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) {
// WaitForClientStatus blocks until the client is in the expected status
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID, region, status string) {
t.Helper()
WaitForClientStatusWithToken(t, rpc, nodeID, region, status, "")
}

// WaitForClientStatusWithToken blocks until the client is in the expected
// status, for use with ACLs enabled
func WaitForClientStatusWithToken(t testing.TB, rpc rpcFn, nodeID, region, status, token string) {
t.Helper()

if region == "" {
region = "global"
}
WaitForResult(func() (bool, error) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: region},
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: region,
AuthToken: token,
},
}
var out structs.SingleNodeResponse

Expand Down

0 comments on commit 55fe05d

Please sign in to comment.