Skip to content

Commit

Permalink
Implement Temporal reset API (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Dec 16, 2022
1 parent ec3bfcc commit 01ca8ca
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 150 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,10 @@ Client APIs are hosted by iWF server for user workflow application to interact w
* Stop workflow: stop a workflow execution
* Signal workflow: send a signal to a workflow execution
* Search workflow: search for workflows using a query language like SQL with search attributes
* Get workflow: get basic information about a workflow like status
* Get workflow: get basic information about a workflow like status and results(if completed or waiting for completed)
* Get workflow data objects: get the dataObjects of a workflow execution
* Get workflow search attributes: get the search attributes of a workflow execution
* Reset workflow: reset a workflow to previous states
* Get workflow results: get the workflow completion results (with or without waiting for completion)

# Why iWF

Expand Down Expand Up @@ -260,7 +259,7 @@ cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
- [x] Stop workflow API

### 1.1
- [x] Reset workflow API (Cadence only, TODO for Temporal)
- [x] Reset workflow API
- [x] Command type(s) for inter-state communications (e.g. internal channel)
- [x] AnyCommandCompleted Decider trigger type
- [x] More workflow start options: IdReusePolicy, cron schedule, retry
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func start(c *cli.Context) {
if err != nil {
log.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Backend.Temporal.Namespace)

for _, svcName := range services {
go launchTemporalService(svcName, config, unifiedClient, temporalClient)
Expand Down
8 changes: 6 additions & 2 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import (
"net/http"
)

const testNamespace = "default"

func createTemporalClient() client.Client {
temporalClient, err := client.Dial(client.Options{})
temporalClient, err := client.Dial(client.Options{
Namespace: testNamespace,
})
if err != nil {
log.Fatalf("unable to connect to Temporal %v", err)
}
Expand All @@ -43,7 +47,7 @@ func startWorkflowWorker(handler common.WorkflowHandler) (closeFunc func()) {
func startIwfService(backendType service.BackendType) (closeFunc func()) {
if backendType == service.BackendTypeTemporal {
temporalClient := createTemporalClient()
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient))
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient, testNamespace))
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand Down
9 changes: 6 additions & 3 deletions script/http/local/home.http
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ Content-Type: application/json
"stateOptions": {}
}

### test reset API
### test reset API NOTE: try different resetType: BEGINNING, HISTORY_EVENT_ID, HISTORY_EVENT_TIME
POST http://localhost:8801/api/v1/workflow/reset
Content-Type: application/json

{
"workflowId": "signal1665468993",
"resetType": "FIRST_DECISION_COMPLETED"
"workflowId": "timer1671219152",
"workflowRunId": "fae51e69-7e41-4658-89aa-663463951dc7",
"resetType": "HISTORY_EVENT_TIME",
"historyEventId":15,
"historyEventTime": "150m"
}
141 changes: 4 additions & 137 deletions service/api/cadence/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"fmt"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/timeparser"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"regexp"
"strconv"
"time"
)

func getResetIDsByType(
Expand All @@ -32,12 +30,12 @@ func getResetIDsByType(
return
}
case service.ResetTypeHistoryEventTime:
var earliestTime int64
earliestTime, err = parseTime(earliestHistoryTimeStr)
var earliestTimeUnixNano int64
earliestTimeUnixNano, err = timeparser.ParseTime(earliestHistoryTimeStr)
if err != nil {
return
}
decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient)
decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTimeUnixNano, frontendClient)
if err != nil {
return
}
Expand Down Expand Up @@ -133,137 +131,6 @@ OuterLoop:
return
}

const (
defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00
// regex expression for parsing time durations, shorter, longer notations and numeric value respectively
defaultDateTimeRangeShortRE = "^[1-9][0-9]*[smhdwMy]$" // eg. 1s, 20m, 300h etc.
defaultDateTimeRangeLongRE = "^[1-9][0-9]*(second|minute|hour|day|week|month|year)$" // eg. 1second, 20minute, 300hour etc.
defaultDateTimeRangeNum = "^[1-9][0-9]*" // eg. 1, 20, 300 etc.
)

func parseTime(timeStr string) (int64, error) {
defaultValue := int64(0)
if len(timeStr) == 0 {
return defaultValue, nil
}

// try to parse
parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr)
if err == nil {
return parsedTime.UnixNano(), nil
}

// treat as raw time
resultValue, err := strconv.ParseInt(timeStr, 10, 64)
if err == nil {
return resultValue, nil
}

// treat as time range format
parsedTime, err = parseTimeRange(timeStr)
if err != nil {
return 0, fmt.Errorf("cannot parse time '%s', use UTC format '2006-01-02T15:04:05Z', "+
"time range or raw UnixNano directly. See help for more details: %v", timeStr, err)
}
return parsedTime.UnixNano(), nil
}

// parseTimeRange parses a given time duration string (in format X<time-duration>) and
// returns parsed timestamp given that duration in the past from current time.
// All valid values must contain a number followed by a time-duration, from the following list (long form/short form):
// - second/s
// - minute/m
// - hour/h
// - day/d
// - week/w
// - month/M
// - year/y
// For example, possible input values, and their result:
// - "3d" or "3day" --> three days --> time.Now().Add(-3 * 24 * time.Hour)
// - "2m" or "2minute" --> two minutes --> time.Now().Add(-2 * time.Minute)
// - "1w" or "1week" --> one week --> time.Now().Add(-7 * 24 * time.Hour)
// - "30s" or "30second" --> thirty seconds --> time.Now().Add(-30 * time.Second)
// Note: Duration strings are case-sensitive, and should be used as mentioned above only.
// Limitation: Value of numerical multiplier, X should be in b/w 0 - 1e6 (1 million), boundary values excluded i.e.
// 0 < X < 1e6. Also, the maximum time in the past can be 1 January 1970 00:00:00 UTC (epoch time),
// so giving "1000y" will result in epoch time.
func parseTimeRange(timeRange string) (time.Time, error) {
match, err := regexp.MatchString(defaultDateTimeRangeShortRE, timeRange)
if !match { // fallback on to check if it's of longer notation
_, err = regexp.MatchString(defaultDateTimeRangeLongRE, timeRange)
}
if err != nil {
return time.Time{}, err
}

re, _ := regexp.Compile(defaultDateTimeRangeNum)
idx := re.FindStringSubmatchIndex(timeRange)
if idx == nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}

num, err := strconv.Atoi(timeRange[idx[0]:idx[1]])
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}
if num >= 1e6 {
return time.Time{}, fmt.Errorf("invalid time-duation multiplier %d, allowed range is 0 < multiplier < 1000000", num)
}

dur, err := parseTimeDuration(timeRange[idx[1]:])
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}

res := time.Now().Add(time.Duration(-num) * dur) // using server's local timezone
epochTime := time.Unix(0, 0)
if res.Before(epochTime) {
res = epochTime
}
return res, nil
}

const (
// time ranges
day = 24 * time.Hour
week = 7 * day
month = 30 * day
year = 365 * day
)

// parseTimeDuration parses the given time duration in either short or long convention
// and returns the time.Duration
// Valid values (long notation/short notation):
// - second/s
// - minute/m
// - hour/h
// - day/d
// - week/w
// - month/M
// - year/y
// NOTE: the input "duration" is case-sensitive
func parseTimeDuration(duration string) (dur time.Duration, err error) {
switch duration {
case "s", "second":
dur = time.Second
case "m", "minute":
dur = time.Minute
case "h", "hour":
dur = time.Hour
case "d", "day":
dur = day
case "w", "week":
dur = week
case "M", "month":
dur = month
case "y", "year":
dur = year
default:
err = fmt.Errorf("unknown time duration %s", duration)
}
return
}

func composeErrorWithMessage(msg string, err error) error {
err = fmt.Errorf("%v, %v", msg, err)
return err
Expand Down
51 changes: 47 additions & 4 deletions service/api/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package temporal
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/api"
Expand All @@ -16,12 +17,14 @@ import (
)

type temporalClient struct {
tClient client.Client
tClient client.Client
namespace string
}

func NewTemporalClient(tClient client.Client) api.UnifiedClient {
func NewTemporalClient(tClient client.Client, namespace string) api.UnifiedClient {
return &temporalClient{
tClient: tClient,
tClient: tClient,
namespace: namespace,
}
}

Expand Down Expand Up @@ -200,5 +203,45 @@ func (t *temporalClient) GetWorkflowResult(ctx context.Context, valuePtr interfa
}

func (t *temporalClient) ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (runId string, err error) {
return "", fmt.Errorf("not supported")
reqRunId := request.GetWorkflowRunId()
if reqRunId == "" {
// set default runId to current
resp, err := t.DescribeWorkflowExecution(ctx, request.GetWorkflowId(), "")
if err != nil {
return "", err
}
reqRunId = resp.RunId
}

resetType := service.ResetType(request.GetResetType())
resetBaseRunID, resetEventId, err := getResetEventIDByType(ctx, resetType,
t.namespace, request.GetWorkflowId(), reqRunId,
t.tClient.WorkflowService(), request.GetHistoryEventId(), request.GetHistoryEventTime())

if err != nil {
return "", err
}

requestId := uuid.New().String()
resetReapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL
if request.GetSkipSignalReapply() {
resetReapplyType = enums.RESET_REAPPLY_TYPE_NONE
}

resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: t.namespace,
WorkflowExecution: &common.WorkflowExecution{
WorkflowId: request.WorkflowId,
RunId: resetBaseRunID,
},
Reason: request.GetReason(),
WorkflowTaskFinishEventId: resetEventId,
RequestId: requestId,
ResetReapplyType: resetReapplyType,
})

if err != nil {
return "", err
}
return resp.GetRunId(), nil
}
Loading

0 comments on commit 01ca8ca

Please sign in to comment.