Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix continueAsNew with large snapshot(>4MB) #482

Merged
merged 13 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions .github/workflows/ci-cadence-integ-test-disable-sticky.yml

This file was deleted.

9 changes: 5 additions & 4 deletions docker-compose/init-ci-cadence.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ yes | cadence adm cl asa --search_attr_key IwfExecutingStateIds --search_attr_ty
yes | cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 1


# see https://github.com/indeedeng/iwf/blob/main/CONTRIBUTING.md#option-3-run-with-your-own-cadence-service
echo "now sleep for 60s so that all the search attributes can take effect"

sleep 70
echo "After registering, it may take up 60s because of this issue. for Cadence to load the new search attributes."
echo "If run the test too early, you may see error: \"IwfWorkflowType is not a valid search attribute key\""
echo "and the test would fail with: unknown decision DecisionType: Activity, ID: 0, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition"
sleep 65

echo "now register the domain to tell the tests that Cadence is ready"
cadence --do default domain register

tail -f /dev/null
18 changes: 17 additions & 1 deletion integ/any_command_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ func TestAnyCommandCloseWorkflowTemporal(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, nil)
smallWaitForFastTest()
}
}

func TestAnyCommandCloseWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
Expand All @@ -31,7 +39,15 @@ func TestAnyCommandCloseWorkflowCadence(t *testing.T) {
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(false))
}
}

func TestAnyCommandCloseWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfig(true))
smallWaitForFastTest()
}
}
Expand Down
16 changes: 8 additions & 8 deletions integ/any_command_combination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ func TestAnyCommandCombinationWorkflowTemporal(t *testing.T) {
}
}

func TestAnyCommandCombinationWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
smallWaitForFastTest()
}
}

func TestAnyCommandCombinationWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
func TestAnyCommandCombinationWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
// TODO not sure why using minimumContinueAsNewConfig(true) will fail
doTestAnyCommandCombinationWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfig(false))
doTestAnyCommandCloseWorkflow(t, service.BackendTypeCadence, nil)
smallWaitForFastTest()
}
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func doTestAnyCommandCombinationWorkflow(t *testing.T, backendType service.Backe
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: anycommandconbination.WorkflowType,
WorkflowTimeoutSeconds: 20,
WorkflowTimeoutSeconds: 40,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(anycommandconbination.State1),
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
Expand Down
2 changes: 1 addition & 1 deletion integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
MaximumAttempts: 10,
},
},
Interpreter: config.Interpreter{
Expand Down
6 changes: 3 additions & 3 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT
panicAtHttpError(err, httpResp)

// workflow shouldn't executed any state
var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
var dump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
assertions.Equal(service.StateExecutionCounterInfo{
StateIdStartedCount: make(map[string]int),
StateIdCurrentlyExecutingCount: make(map[string]int),
TotalCurrentlyExecutingCount: 0,
}, dump.StateExecutionCounterInfo)
}, dump.Snapshot.StateExecutionCounterInfo)

// invoke an RPC to trigger the state execution
reqRpc := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
Expand Down
119 changes: 119 additions & 0 deletions integ/large_data_attributes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package integ

import (
"context"
"fmt"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/integ/workflow/signal"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/stretchr/testify/assert"
"net/http"
"strconv"
"strings"
"testing"
"time"
)

func TestLargeDataAttributesTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestLargeQueryAttributes(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0())
smallWaitForFastTest()
}
}

func doTestLargeQueryAttributes(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
if !*temporalIntegTest {
t.Skip()
}
assertions := assert.New(t)

// start test workflow server
wfHandler := signal.NewHandler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Wouldn't it be better to create a handler class for this specific test?

Copy link
Contributor Author

@longquanzheng longquanzheng Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like using the existing is better. The test only need a workflow to do nothing, so that we can set some large DAs. The existing signal workflow is exactly like that. I would just copy the code if so.

closeFunc1 := startWorkflowWorker(wfHandler)
defer closeFunc1()

_, closeFunc2 := startIwfServiceWithClient(backendType)
defer closeFunc2()

wfId := signal.WorkflowType + strconv.Itoa(int(time.Now().UnixNano()))

// start a workflow
apiClient := iwfidl.NewAPIClient(&iwfidl.Configuration{
Servers: []iwfidl.ServerConfiguration{
{
URL: "http://localhost:" + testIwfServerPort,
},
},
})
req := apiClient.DefaultApi.ApiV1WorkflowStartPost(context.Background())
_, httpResp, err := req.WorkflowStartRequest(iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: signal.WorkflowType,
WorkflowTimeoutSeconds: 86400,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(signal.State1),
// this is necessary for large DAs
// otherwise the workflow task will fail when trying to execute a stateAPI with data attributes >4MB
StateOptions: &signal.StateOptionsForLargeDataAttributes,
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
WorkflowConfigOverride: config,
},
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(httpResp.StatusCode, http.StatusOK)

// Define the size of the string in bytes (1 MB = 1024 * 1024 bytes)
const size = 1024 * 1024

OneMbDataObject := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(strings.Repeat("a", size)),
}

// setting a large data object to test, especially continueAsNew
// because there is a 4MB limit for GRPC in temporal
setReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsSetPost(context.Background())
for i := 0; i < 5; i++ {

httpResp2, err := setReq.WorkflowSetDataObjectsRequest(iwfidl.WorkflowSetDataObjectsRequest{
WorkflowId: wfId,
Objects: []iwfidl.KeyValue{
{
Key: iwfidl.PtrString("large-data-object-" + strconv.Itoa(i)),
Value: &OneMbDataObject,
},
},
}).Execute()

panicAtHttpError(err, httpResp2)
}

// signal the workflow to complete
for i := 0; i < 4; i++ {
signalVal := iwfidl.EncodedObject{
Encoding: iwfidl.PtrString("json"),
Data: iwfidl.PtrString(fmt.Sprintf("test-data-%v", i)),
}

req2 := apiClient.DefaultApi.ApiV1WorkflowSignalPost(context.Background())
httpResp2, err := req2.WorkflowSignalRequest(iwfidl.WorkflowSignalRequest{
WorkflowId: wfId,
SignalChannelName: signal.SignalName,
SignalValue: &signalVal,
}).Execute()

panicAtHttpError(err, httpResp2)
}

// wait for the workflow
reqWait := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
_, httpResp, err = reqWait.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"time"
)

func TestSetQueryAttributes(t *testing.T) {
func TestSetDataAttributesTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
Expand Down Expand Up @@ -50,33 +50,34 @@ func TestSetQueryAttributes(t *testing.T) {

assertions.Equal(httpResp.StatusCode, http.StatusOK)

var signalVals []iwfidl.KeyValue
signalVals = append(signalVals, iwfidl.KeyValue{
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal1,
},
iwfidl.KeyValue{
smallDataObjects := []iwfidl.KeyValue{
{
Key: iwfidl.PtrString(persistence.TestDataObjectKey),
Value: &persistence.TestDataObjectVal1,
},
{
Key: iwfidl.PtrString(persistence.TestDataObjectKey2),
Value: &persistence.TestDataObjectVal2,
})
},
}

setReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsSetPost(context.Background())
httpResp2, err := setReq.WorkflowSetDataObjectsRequest(iwfidl.WorkflowSetDataObjectsRequest{
WorkflowId: wfId,
Objects: signalVals,
Objects: smallDataObjects,
}).Execute()

panicAtHttpError(err, httpResp2)

time.Sleep(time.Second)

getReq := apiClient.DefaultApi.ApiV1WorkflowDataobjectsGetPost(context.Background())
searchResult, httpRespGet, err := getReq.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
getResult, httpRespGet, err := getReq.WorkflowGetDataObjectsRequest(iwfidl.WorkflowGetDataObjectsRequest{
WorkflowId: wfId,
Keys: []string{
persistence.TestDataObjectKey, persistence.TestDataObjectKey2,
}}).Execute()
panicAtHttpError(err, httpRespGet)

assertions.ElementsMatch(signalVals, searchResult.Objects)
assertions.ElementsMatch(smallDataObjects, getResult.Objects)
}
25 changes: 11 additions & 14 deletions integ/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
if config != nil {
expectedConfig = *config
}
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
assertions.Equal(expectedConfig, debugDump.Config)

// update the disable system SA
reqUpdateConfig := apiClient.DefaultApi.ApiV1WorkflowConfigUpdatePost(context.Background())
Expand All @@ -113,21 +111,22 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(2 * time.Second)
}
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
expectedConfig.DisableSystemSearchAttribute = iwfidl.PtrBool(true)
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
assertions.Equal(expectedConfig, debugDump.Config)

// update the pagination size
reqUpdateConfig = apiClient.DefaultApi.ApiV1WorkflowConfigUpdatePost(context.Background())
httpResp, err = reqUpdateConfig.WorkflowConfigUpdateRequest(iwfidl.WorkflowConfigUpdateRequest{
WorkflowId: wfId,
WorkflowConfig: iwfidl.WorkflowConfig{
ContinueAsNewPageSizeInBytes: iwfidl.PtrInt32(300),
ContinueAsNewPageSizeInBytes: iwfidl.PtrInt32(3000000),
},
}).Execute()
panicAtHttpError(err, httpResp)
Expand All @@ -136,10 +135,8 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
if err != nil {
panic(err)
}
expectedConfig.ContinueAsNewPageSizeInBytes = iwfidl.PtrInt32(300)
assertions.Equal(service.DebugDumpResponse{
Config: expectedConfig,
}, debugDump)
expectedConfig.ContinueAsNewPageSizeInBytes = iwfidl.PtrInt32(3000000)
assertions.Equal(expectedConfig, debugDump.Config)

// signal for testing unhandled signals
var unhandledSignalVals []*iwfidl.EncodedObject
Expand Down Expand Up @@ -225,12 +222,12 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
assertions.Equal(signalVals[i], data[fmt.Sprintf("signalValue%v", i)])
}

var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
var dump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
}
assertions.Equal(unhandledSignalVals, dump.SignalsReceived[signal.UnhandledSignalName])
assertions.Equal(unhandledSignalVals, dump.Snapshot.SignalsReceived[signal.UnhandledSignalName])
assertions.True(len(unhandledSignalVals) > 0)

if config == nil {
Expand Down
Loading
Loading