Skip to content

Commit

Permalink
Use callerType to determine history requests priority (#7084)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
- Use caller type to determine history requests priority

## Why?
<!-- Tell your future self why have you made these changes -->
- Right now the priority feature is only used for prioritizing operator
requests, but doesn't help with background requests increase case which
can cause user requests to be throttled as well.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
yycptt authored Jan 16, 2025
1 parent 4f2b2db commit 70883ea
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 93 deletions.
88 changes: 10 additions & 78 deletions service/history/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,81 +36,14 @@ const (
)

var (
APIToPriority = map[string]int{
"/temporal.server.api.historyservice.v1.HistoryService/CloseShard": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetShard": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DeleteWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DescribeHistoryHost": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DescribeMutableState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DescribeWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetDLQMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetDLQReplicationMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetMutableState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetReplicationMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ImportWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/IsActivityTaskValid": 1,
"/temporal.server.api.historyservice.v1.HistoryService/IsWorkflowTaskValid": 1,
"/temporal.server.api.historyservice.v1.HistoryService/MergeDLQMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/PollMutableState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/PurgeDLQMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/QueryWorkflow": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ReapplyEvents": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RebuildMutableState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RecordActivityTaskHeartbeat": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RecordActivityTaskStarted": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RecordChildExecutionCompleted": 1,
"/temporal.server.api.historyservice.v1.HistoryService/VerifyChildExecutionCompletionRecorded": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RecordWorkflowTaskStarted": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RefreshWorkflowTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RemoveSignalMutableState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RemoveTask": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ReplicateEventsV2": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ReplicateWorkflowState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RequestCancelWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ResetStickyTaskQueue": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ResetWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RespondActivityTaskCanceled": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RespondActivityTaskCompleted": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RespondActivityTaskFailed": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RespondWorkflowTaskCompleted": 1,
"/temporal.server.api.historyservice.v1.HistoryService/RespondWorkflowTaskFailed": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ScheduleWorkflowTask": 1,
"/temporal.server.api.historyservice.v1.HistoryService/VerifyFirstWorkflowTaskScheduled": 1,
"/temporal.server.api.historyservice.v1.HistoryService/SignalWithStartWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/SignalWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/StartWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/SyncActivity": 1,
"/temporal.server.api.historyservice.v1.HistoryService/SyncShardStatus": 1,
"/temporal.server.api.historyservice.v1.HistoryService/TerminateWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GenerateLastHistoryReplicationTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetReplicationStatus": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DeleteWorkflowVisibilityRecord": 1,
"/temporal.server.api.historyservice.v1.HistoryService/UpdateWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/PollWorkflowExecutionUpdate": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ExecuteMultiOperation": 1,
"/temporal.server.api.historyservice.v1.HistoryService/StreamWorkflowReplicationMessages": 1,
"/temporal.server.api.historyservice.v1.HistoryService/SyncWorkflowState": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetWorkflowExecutionHistory": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetWorkflowExecutionHistoryReverse": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetWorkflowExecutionRawHistory": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetWorkflowExecutionRawHistoryV2": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ForceDeleteWorkflowExecution": 1,
"/temporal.server.api.historyservice.v1.HistoryService/GetDLQTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DeleteDLQTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/AddTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ListQueues": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ListTasks": 1,
"/temporal.server.api.historyservice.v1.HistoryService/CompleteNexusOperation": 1,
"/temporal.server.api.historyservice.v1.HistoryService/InvokeStateMachineMethod": 1,
"/temporal.server.api.historyservice.v1.HistoryService/DeepHealthCheck": 1,
"/temporal.server.api.historyservice.v1.HistoryService/UpdateActivityOptions": 1,
"/temporal.server.api.historyservice.v1.HistoryService/PauseActivity": 1,
"/temporal.server.api.historyservice.v1.HistoryService/UnpauseActivity": 1,
"/temporal.server.api.historyservice.v1.HistoryService/ResetActivity": 1,
"/temporal.server.api.historyservice.v1.HistoryService/UpdateWorkflowExecutionOptions": 1,
CallerTypeToPriority = map[string]int{
headers.CallerTypeOperator: OperatorPriority,
headers.CallerTypeAPI: 1,
headers.CallerTypeBackground: 2,
headers.CallerTypePreemptable: 3,
}

APIPrioritiesOrdered = []int{OperatorPriority, 1}
APIPrioritiesOrdered = []int{OperatorPriority, 1, 2, 3}
)

func NewPriorityRateLimiter(
Expand All @@ -126,13 +59,12 @@ func NewPriorityRateLimiter(
}
}
return quotas.NewPriorityRateLimiter(func(req quotas.Request) int {
if req.CallerType == headers.CallerTypeOperator {
return OperatorPriority
}
if priority, ok := APIToPriority[req.API]; ok {
if priority, ok := CallerTypeToPriority[req.CallerType]; ok {
return priority
}
return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1]
// unknown caller type, default to api to be consistent with existing behavior
// TODO: this happens for CompleteNexusOperation right now. Need to fix it.
return CallerTypeToPriority[headers.CallerTypeAPI]
}, rateLimiters)
}

Expand Down
17 changes: 2 additions & 15 deletions service/history/configs/quotas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@
package configs

import (
"reflect"
"slices"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/testing/temporalapi"
)

type (
Expand Down Expand Up @@ -63,8 +60,8 @@ func (s *quotasSuite) SetupTest() {
func (s *quotasSuite) TearDownTest() {
}

func (s *quotasSuite) TestAPIToPriorityMapping() {
for _, priority := range APIToPriority {
func (s *quotasSuite) TestCallerTypeToPriorityMapping() {
for _, priority := range CallerTypeToPriority {
index := slices.Index(APIPrioritiesOrdered, priority)
s.NotEqual(-1, index)
}
Expand All @@ -76,16 +73,6 @@ func (s *quotasSuite) TestAPIPrioritiesOrdered() {
}
}

func (s *quotasSuite) TestAPIs() {
var service historyservice.HistoryServiceServer
apiToPriority := make(map[string]int)
temporalapi.WalkExportedMethods(&service, func(m reflect.Method) {
fullName := "/temporal.server.api.historyservice.v1.HistoryService/" + m.Name
apiToPriority[fullName] = APIToPriority[fullName]
})
s.Equal(apiToPriority, APIToPriority)
}

func (s *quotasSuite) TestOperatorPrioritized() {
rateFn := func() float64 { return 5 }
operatorRPSRatioFn := func() float64 { return 0.2 }
Expand Down

0 comments on commit 70883ea

Please sign in to comment.