diff --git a/scheduler-workflow/workflow.go b/scheduler-workflow/workflow.go index 23f22020..41751a1b 100644 --- a/scheduler-workflow/workflow.go +++ b/scheduler-workflow/workflow.go @@ -11,7 +11,6 @@ import ( "syscall" "time" - dapr "github.com/dapr/go-sdk/client" "github.com/dapr/go-sdk/service/common" "github.com/dapr/go-sdk/service/http" "github.com/dapr/go-sdk/workflow" @@ -25,11 +24,11 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client, err := dapr.NewClient() + wfClient, err := workflow.NewClient() if err != nil { log.Fatalf("Error getting dapr client: %v", err) } - defer client.Close() + defer wfClient.Close() daprService := http.NewService(appPort) go func() { @@ -58,7 +57,13 @@ func main() { } log.Println("Workflow worker started") - go startLonghaulWorkflow(ctx, client) + // make sure to clean up any old workflows from previous runs + for i := 0; i < 100; i++ { + wfClient.TerminateWorkflow(ctx, fmt.Sprintf("longhaul-instance-%d", i)) + wfClient.PurgeWorkflow(ctx, fmt.Sprintf("longhaul-instance-%d", i)) + } + + go startLonghaulWorkflow(ctx, wfClient) waitForShutdown(daprService, worker, cancel) } @@ -100,7 +105,7 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) { // startLonghaulWorkflow performs the following operations on a workflow: // start, pause, resume, raise event, terminate, purge -func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { +func startLonghaulWorkflow(ctx context.Context, client *workflow.Client) { i := 0 for { select { @@ -108,52 +113,33 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { return default: log.Printf("Starting workflow iteration %d\n", i) - instanceID := fmt.Sprintf("longhaul-instance-%d", i) - workflowReq := &dapr.StartWorkflowRequest{ - InstanceID: instanceID, - WorkflowComponent: "", - WorkflowName: "TestWorkflow", - Input: i, - SendRawInput: false, - } - startWfCtx, startWfCancel := context.WithTimeout(ctx, 5*time.Second) - respStart, err := client.StartWorkflowBeta1(startWfCtx, workflowReq) - startWfCancel() + instanceID := fmt.Sprintf("longhaul-instance-%d", i) + _, err := client.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID(instanceID), workflow.WithInput(i)) if err != nil { - log.Printf("Iteration %d: Failed to start workflow: %v\n", i, err) - continue + log.Fatalf("failed to start workflow: %v", err) } - log.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID) + log.Printf("Workflow started with ID: '%s'\n", instanceID) pauseWfCtx, pauseWfCancel := context.WithTimeout(ctx, 5*time.Second) - err = client.PauseWorkflowBeta1(pauseWfCtx, &dapr.PauseWorkflowRequest{ - InstanceID: instanceID, - WorkflowComponent: "", - }) + err = client.SuspendWorkflow(pauseWfCtx, instanceID, "") pauseWfCancel() if err != nil { log.Printf("Failed to pause workflow: %v\n", err) } log.Printf("Workflow '%s' paused\n", instanceID) + resumeWfCtx, resumeWfCancel := context.WithTimeout(ctx, 5*time.Second) - err = client.ResumeWorkflowBeta1(resumeWfCtx, &dapr.ResumeWorkflowRequest{ - InstanceID: instanceID, - WorkflowComponent: "", - }) + err = client.ResumeWorkflow(resumeWfCtx, instanceID, "") resumeWfCancel() if err != nil { log.Printf("Failed to resume workflow: %v\n", err) } log.Printf("Workflow '%s' resumed\n", instanceID) - raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second) + // Raise event to advance the workflow - err = client.RaiseEventWorkflowBeta1(raiseEventWfCtx, &dapr.RaiseEventWorkflowRequest{ - InstanceID: instanceID, - WorkflowComponent: "", - EventName: "testEvent", - EventData: "testData", - }) + raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.RaiseEvent(raiseEventWfCtx, instanceID, "testEvent", workflow.WithEventPayload("testData")) raiseEventWfCancel() if err != nil { log.Printf("Failed to raise event: %v\n", err) @@ -162,24 +148,23 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { log.Printf("[wfclient] stage: %d\n", stage.Load()) // Wait for workflow to complete - // Poll every 5 seconds to check the workflow status - waitForWorkflowCompletion(ctx, client, instanceID) - terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second) + client.WaitForWorkflowCompletion(ctx, instanceID) + // Terminate and purge after completion - err = client.TerminateWorkflowBeta1(terminateWfCtx, &dapr.TerminateWorkflowRequest{ - InstanceID: instanceID, - }) + terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.TerminateWorkflow(terminateWfCtx, instanceID) terminateWfCancel() + if err != nil { log.Printf("Failed to terminate workflow %s: %v\n", instanceID, err) } else { log.Printf("Workflow '%s' terminated\n", instanceID) } + purgeWfCtx, purgeWfCancel := context.WithTimeout(ctx, 5*time.Second) - err = client.PurgeWorkflowBeta1(purgeWfCtx, &dapr.PurgeWorkflowRequest{ - InstanceID: instanceID, - }) + err = client.PurgeWorkflow(purgeWfCtx, instanceID) purgeWfCancel() + if err != nil { log.Printf("Failed to purge workflow %s: %v\n", instanceID, err) } else { @@ -191,29 +176,7 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { i = 0 } } - } -} - -// waitForWorkflowCompletion polls every 5s to check the workflow status -func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instanceID string) { - for { - respGet, err := client.GetWorkflowBeta1(ctx, &dapr.GetWorkflowRequest{ - InstanceID: instanceID, - }) - if err != nil { - log.Printf("Error retrieving workflow status for %s: %v\n", instanceID, err) - continue - } - - switch respGet.RuntimeStatus { - case workflow.StatusCompleted.String(): - log.Printf("Workflow '%s' completed\n", instanceID) - return - case workflow.StatusFailed.String(): - log.Printf("Workflow '%s' failed\n", instanceID) - return - } - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) } }