Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite scheduler workflows example to use DaprWorkflowClient #264

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 28 additions & 69 deletions scheduler-workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"syscall"
"time"

dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/http"
"github.com/dapr/go-sdk/workflow"
Expand All @@ -25,11 +24,8 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client, err := dapr.NewClient()
if err != nil {
log.Fatalf("Error getting dapr client: %v", err)
}
defer client.Close()
wfClient, err := workflow.NewClient()
defer wfClient.Close()

daprService := http.NewService(appPort)
go func() {
Expand Down Expand Up @@ -58,7 +54,13 @@ func main() {
}
log.Println("Workflow worker started")

go startLonghaulWorkflow(ctx, client)
// make sure to clean up any old workflows from previous runs
for i := 0; i < 100; i++ {
wfClient.TerminateWorkflow(ctx, fmt.Sprintf("longhaul-instance-%d", i))
wfClient.PurgeWorkflow(ctx, fmt.Sprintf("longhaul-instance-%d", i))
}

go startLonghaulWorkflow(ctx, wfClient)
waitForShutdown(daprService, worker, cancel)
}

Expand Down Expand Up @@ -100,60 +102,41 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {

// startLonghaulWorkflow performs the following operations on a workflow:
// start, pause, resume, raise event, terminate, purge
func startLonghaulWorkflow(ctx context.Context, client dapr.Client) {
func startLonghaulWorkflow(ctx context.Context, client *workflow.Client) {
i := 0
for {
select {
case <-ctx.Done():
return
default:
log.Printf("Starting workflow iteration %d\n", i)
instanceID := fmt.Sprintf("longhaul-instance-%d", i)
workflowReq := &dapr.StartWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
WorkflowName: "TestWorkflow",
Input: i,
SendRawInput: false,
}

startWfCtx, startWfCancel := context.WithTimeout(ctx, 5*time.Second)
respStart, err := client.StartWorkflowBeta1(startWfCtx, workflowReq)
startWfCancel()
instanceID := fmt.Sprintf("longhaul-instance-%d", i)
_, err := client.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID(instanceID), workflow.WithInput(i))
if err != nil {
log.Printf("Iteration %d: Failed to start workflow: %v\n", i, err)
continue
log.Fatalf("failed to start workflow: %v", err)
}
log.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID)
log.Printf("Workflow started with ID: '%s'\n", instanceID)

pauseWfCtx, pauseWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.PauseWorkflowBeta1(pauseWfCtx, &dapr.PauseWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
})
err = client.SuspendWorkflow(pauseWfCtx, instanceID, "")
pauseWfCancel()
if err != nil {
log.Printf("Failed to pause workflow: %v\n", err)
}
log.Printf("Workflow '%s' paused\n", instanceID)

resumeWfCtx, resumeWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.ResumeWorkflowBeta1(resumeWfCtx, &dapr.ResumeWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
})
err = client.ResumeWorkflow(resumeWfCtx, instanceID, "")
resumeWfCancel()
if err != nil {
log.Printf("Failed to resume workflow: %v\n", err)
}
log.Printf("Workflow '%s' resumed\n", instanceID)
raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second)

// Raise event to advance the workflow
err = client.RaiseEventWorkflowBeta1(raiseEventWfCtx, &dapr.RaiseEventWorkflowRequest{
InstanceID: instanceID,
WorkflowComponent: "",
EventName: "testEvent",
EventData: "testData",
})
raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.RaiseEvent(raiseEventWfCtx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
raiseEventWfCancel()
if err != nil {
log.Printf("Failed to raise event: %v\n", err)
Expand All @@ -162,24 +145,23 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) {
log.Printf("[wfclient] stage: %d\n", stage.Load())

// Wait for workflow to complete
// Poll every 5 seconds to check the workflow status
waitForWorkflowCompletion(ctx, client, instanceID)
terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second)
client.WaitForWorkflowCompletion(ctx, instanceID)

// Terminate and purge after completion
err = client.TerminateWorkflowBeta1(terminateWfCtx, &dapr.TerminateWorkflowRequest{
InstanceID: instanceID,
})
terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.TerminateWorkflow(terminateWfCtx, instanceID)
terminateWfCancel()

if err != nil {
log.Printf("Failed to terminate workflow %s: %v\n", instanceID, err)
} else {
log.Printf("Workflow '%s' terminated\n", instanceID)
}

purgeWfCtx, purgeWfCancel := context.WithTimeout(ctx, 5*time.Second)
err = client.PurgeWorkflowBeta1(purgeWfCtx, &dapr.PurgeWorkflowRequest{
InstanceID: instanceID,
})
err = client.PurgeWorkflow(purgeWfCtx, instanceID)
purgeWfCancel()

if err != nil {
log.Printf("Failed to purge workflow %s: %v\n", instanceID, err)
} else {
Expand All @@ -194,29 +176,6 @@ func startLonghaulWorkflow(ctx context.Context, client dapr.Client) {
}
}

// waitForWorkflowCompletion polls every 5s to check the workflow status
func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instanceID string) {
for {
respGet, err := client.GetWorkflowBeta1(ctx, &dapr.GetWorkflowRequest{
InstanceID: instanceID,
})
if err != nil {
log.Printf("Error retrieving workflow status for %s: %v\n", instanceID, err)
continue
}

switch respGet.RuntimeStatus {
case workflow.StatusCompleted.String():
log.Printf("Workflow '%s' completed\n", instanceID)
return
case workflow.StatusFailed.String():
log.Printf("Workflow '%s' failed\n", instanceID)
return
}
time.Sleep(5 * time.Second)
}
}

// waitForShutdown keeps the app alive until an interrupt or termination signal is received
func waitForShutdown(daprService common.Service, worker *workflow.WorkflowWorker, cancelFunc context.CancelFunc) {
sigCh := make(chan os.Signal, 1)
Expand Down