Skip to content

Commit

Permalink
Populate activitypause for activity heartbeat response (temporalio#7098)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Peopagate "ActivityPaused" to the workers via "heartbeat" response, in
both cases.

## Why?
<!-- Tell your future self why have you made these changes -->
To let long-running activity workers know that activity was paused.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
unit tests

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No
  • Loading branch information
ychebotarev authored and stephanos committed Jan 17, 2025
1 parent c8d91ee commit 928a14d
Show file tree
Hide file tree
Showing 6 changed files with 2,043 additions and 2,022 deletions.
4,044 changes: 2,027 additions & 2,017 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.43.2-0.20250114194029-61f369a5f511
go.temporal.io/api v1.43.2-0.20250116211459-192e5ed816b7
go.temporal.io/sdk v1.31.0
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.43.2-0.20250114194029-61f369a5f511 h1:4YhvSVBPmEkNFE5wupUSXUx0+HotlAYjXyWqWfUCgqY=
go.temporal.io/api v1.43.2-0.20250114194029-61f369a5f511/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.2-0.20250116211459-192e5ed816b7 h1:4lxYy+8G8UbjOCMMD2qa+QIdZZenP8MoylAPPmhx3CQ=
go.temporal.io/api v1.43.2-0.20250116211459-192e5ed816b7/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ message RecordActivityTaskHeartbeatRequest {

message RecordActivityTaskHeartbeatResponse {
bool cancel_requested = 1;

bool activity_paused = 2;
}

message RespondActivityTaskCompletedRequest {
Expand Down
10 changes: 8 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,10 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(ctx context.Context, requ
return nil, err
}

return &workflowservice.RecordActivityTaskHeartbeatResponse{CancelRequested: resp.GetCancelRequested()}, nil
return &workflowservice.RecordActivityTaskHeartbeatResponse{
CancelRequested: resp.GetCancelRequested(),
ActivityPaused: resp.GetActivityPaused(),
}, nil
}

// RecordActivityTaskHeartbeatById is called by application worker while it is processing an ActivityTask. If worker fails
Expand Down Expand Up @@ -1297,7 +1300,10 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatById(ctx context.Context,
if err != nil {
return nil, err
}
return &workflowservice.RecordActivityTaskHeartbeatByIdResponse{CancelRequested: resp.GetCancelRequested()}, nil
return &workflowservice.RecordActivityTaskHeartbeatByIdResponse{
CancelRequested: resp.GetCancelRequested(),
ActivityPaused: resp.GetActivityPaused(),
}, nil
}

// RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will
Expand Down
3 changes: 3 additions & 0 deletions service/history/api/recordactivitytaskheartbeat/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func Invoke(
}

var cancelRequested bool
var activityPaused bool
err = api.GetAndUpdateWorkflowWithNew(
ctx,
token.Clock,
Expand Down Expand Up @@ -104,6 +105,7 @@ func Invoke(
}

cancelRequested = ai.CancelRequested
activityPaused = ai.Paused

// Save progress and last HB reported time.
mutableState.UpdateActivityProgress(ai, request)
Expand All @@ -123,5 +125,6 @@ func Invoke(

return &historyservice.RecordActivityTaskHeartbeatResponse{
CancelRequested: cancelRequested,
ActivityPaused: activityPaused,
}, nil
}

0 comments on commit 928a14d

Please sign in to comment.