Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Temporal reset API #98

Merged
merged 3 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,10 @@ Client APIs are hosted by iWF server for user workflow application to interact w
* Stop workflow: stop a workflow execution
* Signal workflow: send a signal to a workflow execution
* Search workflow: search for workflows using a query language like SQL with search attributes
* Get workflow: get basic information about a workflow like status
* Get workflow: get basic information about a workflow like status and results(if completed or waiting for completed)
* Get workflow data objects: get the dataObjects of a workflow execution
* Get workflow search attributes: get the search attributes of a workflow execution
* Reset workflow: reset a workflow to previous states
* Get workflow results: get the workflow completion results (with or without waiting for completion)

# Why iWF

Expand Down Expand Up @@ -260,7 +259,7 @@ cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0
- [x] Stop workflow API

### 1.1
- [x] Reset workflow API (Cadence only, TODO for Temporal)
- [x] Reset workflow API
- [x] Command type(s) for inter-state communications (e.g. internal channel)
- [x] AnyCommandCompleted Decider trigger type
- [x] More workflow start options: IdReusePolicy, cron schedule, retry
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func start(c *cli.Context) {
if err != nil {
log.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Backend.Temporal.Namespace)

for _, svcName := range services {
go launchTemporalService(svcName, config, unifiedClient, temporalClient)
Expand Down
8 changes: 6 additions & 2 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import (
"net/http"
)

const testNamespace = "default"

func createTemporalClient() client.Client {
temporalClient, err := client.Dial(client.Options{})
temporalClient, err := client.Dial(client.Options{
Namespace: testNamespace,
})
if err != nil {
log.Fatalf("unable to connect to Temporal %v", err)
}
Expand All @@ -43,7 +47,7 @@ func startWorkflowWorker(handler common.WorkflowHandler) (closeFunc func()) {
func startIwfService(backendType service.BackendType) (closeFunc func()) {
if backendType == service.BackendTypeTemporal {
temporalClient := createTemporalClient()
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient))
iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient, testNamespace))
iwfServer := &http.Server{
Addr: ":" + testIwfServerPort,
Handler: iwfService,
Expand Down
9 changes: 6 additions & 3 deletions script/http/local/home.http
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ Content-Type: application/json
"stateOptions": {}
}

### test reset API
### test reset API NOTE: try different resetType: BEGINNING, HISTORY_EVENT_ID, HISTORY_EVENT_TIME
POST http://localhost:8801/api/v1/workflow/reset
Content-Type: application/json

{
"workflowId": "signal1665468993",
"resetType": "FIRST_DECISION_COMPLETED"
"workflowId": "timer1671219152",
"workflowRunId": "fae51e69-7e41-4658-89aa-663463951dc7",
"resetType": "HISTORY_EVENT_TIME",
"historyEventId":15,
"historyEventTime": "150m"
}
141 changes: 4 additions & 137 deletions service/api/cadence/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"fmt"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/common/timeparser"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
"regexp"
"strconv"
"time"
)

func getResetIDsByType(
Expand All @@ -32,12 +30,12 @@ func getResetIDsByType(
return
}
case service.ResetTypeHistoryEventTime:
var earliestTime int64
earliestTime, err = parseTime(earliestHistoryTimeStr)
var earliestTimeUnixNano int64
earliestTimeUnixNano, err = timeparser.ParseTime(earliestHistoryTimeStr)
if err != nil {
return
}
decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient)
decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTimeUnixNano, frontendClient)
if err != nil {
return
}
Expand Down Expand Up @@ -133,137 +131,6 @@ OuterLoop:
return
}

const (
defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00
// regex expression for parsing time durations, shorter, longer notations and numeric value respectively
defaultDateTimeRangeShortRE = "^[1-9][0-9]*[smhdwMy]$" // eg. 1s, 20m, 300h etc.
defaultDateTimeRangeLongRE = "^[1-9][0-9]*(second|minute|hour|day|week|month|year)$" // eg. 1second, 20minute, 300hour etc.
defaultDateTimeRangeNum = "^[1-9][0-9]*" // eg. 1, 20, 300 etc.
)

func parseTime(timeStr string) (int64, error) {
defaultValue := int64(0)
if len(timeStr) == 0 {
return defaultValue, nil
}

// try to parse
parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr)
if err == nil {
return parsedTime.UnixNano(), nil
}

// treat as raw time
resultValue, err := strconv.ParseInt(timeStr, 10, 64)
if err == nil {
return resultValue, nil
}

// treat as time range format
parsedTime, err = parseTimeRange(timeStr)
if err != nil {
return 0, fmt.Errorf("cannot parse time '%s', use UTC format '2006-01-02T15:04:05Z', "+
"time range or raw UnixNano directly. See help for more details: %v", timeStr, err)
}
return parsedTime.UnixNano(), nil
}

// parseTimeRange parses a given time duration string (in format X<time-duration>) and
// returns parsed timestamp given that duration in the past from current time.
// All valid values must contain a number followed by a time-duration, from the following list (long form/short form):
// - second/s
// - minute/m
// - hour/h
// - day/d
// - week/w
// - month/M
// - year/y
// For example, possible input values, and their result:
// - "3d" or "3day" --> three days --> time.Now().Add(-3 * 24 * time.Hour)
// - "2m" or "2minute" --> two minutes --> time.Now().Add(-2 * time.Minute)
// - "1w" or "1week" --> one week --> time.Now().Add(-7 * 24 * time.Hour)
// - "30s" or "30second" --> thirty seconds --> time.Now().Add(-30 * time.Second)
// Note: Duration strings are case-sensitive, and should be used as mentioned above only.
// Limitation: Value of numerical multiplier, X should be in b/w 0 - 1e6 (1 million), boundary values excluded i.e.
// 0 < X < 1e6. Also, the maximum time in the past can be 1 January 1970 00:00:00 UTC (epoch time),
// so giving "1000y" will result in epoch time.
func parseTimeRange(timeRange string) (time.Time, error) {
match, err := regexp.MatchString(defaultDateTimeRangeShortRE, timeRange)
if !match { // fallback on to check if it's of longer notation
_, err = regexp.MatchString(defaultDateTimeRangeLongRE, timeRange)
}
if err != nil {
return time.Time{}, err
}

re, _ := regexp.Compile(defaultDateTimeRangeNum)
idx := re.FindStringSubmatchIndex(timeRange)
if idx == nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}

num, err := strconv.Atoi(timeRange[idx[0]:idx[1]])
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}
if num >= 1e6 {
return time.Time{}, fmt.Errorf("invalid time-duation multiplier %d, allowed range is 0 < multiplier < 1000000", num)
}

dur, err := parseTimeDuration(timeRange[idx[1]:])
if err != nil {
return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange)
}

res := time.Now().Add(time.Duration(-num) * dur) // using server's local timezone
epochTime := time.Unix(0, 0)
if res.Before(epochTime) {
res = epochTime
}
return res, nil
}

const (
// time ranges
day = 24 * time.Hour
week = 7 * day
month = 30 * day
year = 365 * day
)

// parseTimeDuration parses the given time duration in either short or long convention
// and returns the time.Duration
// Valid values (long notation/short notation):
// - second/s
// - minute/m
// - hour/h
// - day/d
// - week/w
// - month/M
// - year/y
// NOTE: the input "duration" is case-sensitive
func parseTimeDuration(duration string) (dur time.Duration, err error) {
switch duration {
case "s", "second":
dur = time.Second
case "m", "minute":
dur = time.Minute
case "h", "hour":
dur = time.Hour
case "d", "day":
dur = day
case "w", "week":
dur = week
case "M", "month":
dur = month
case "y", "year":
dur = year
default:
err = fmt.Errorf("unknown time duration %s", duration)
}
return
}

func composeErrorWithMessage(msg string, err error) error {
err = fmt.Errorf("%v, %v", msg, err)
return err
Expand Down
51 changes: 47 additions & 4 deletions service/api/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package temporal
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/api"
Expand All @@ -16,12 +17,14 @@ import (
)

type temporalClient struct {
tClient client.Client
tClient client.Client
namespace string
}

func NewTemporalClient(tClient client.Client) api.UnifiedClient {
func NewTemporalClient(tClient client.Client, namespace string) api.UnifiedClient {
return &temporalClient{
tClient: tClient,
tClient: tClient,
namespace: namespace,
}
}

Expand Down Expand Up @@ -200,5 +203,45 @@ func (t *temporalClient) GetWorkflowResult(ctx context.Context, valuePtr interfa
}

func (t *temporalClient) ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (runId string, err error) {
return "", fmt.Errorf("not supported")
reqRunId := request.GetWorkflowRunId()
if reqRunId == "" {
// set default runId to current
resp, err := t.DescribeWorkflowExecution(ctx, request.GetWorkflowId(), "")
if err != nil {
return "", err
}
reqRunId = resp.RunId
}

resetType := service.ResetType(request.GetResetType())
resetBaseRunID, resetEventId, err := getResetEventIDByType(ctx, resetType,
t.namespace, request.GetWorkflowId(), reqRunId,
t.tClient.WorkflowService(), request.GetHistoryEventId(), request.GetHistoryEventTime())

if err != nil {
return "", err
}

requestId := uuid.New().String()
resetReapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL
if request.GetSkipSignalReapply() {
resetReapplyType = enums.RESET_REAPPLY_TYPE_NONE
}

resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{
Namespace: t.namespace,
WorkflowExecution: &common.WorkflowExecution{
WorkflowId: request.WorkflowId,
RunId: resetBaseRunID,
},
Reason: request.GetReason(),
WorkflowTaskFinishEventId: resetEventId,
RequestId: requestId,
ResetReapplyType: resetReapplyType,
})

if err != nil {
return "", err
}
return resp.GetRunId(), nil
}
Loading