From c6c0668b263cb6d5d7009d6dd450f8df8601391a Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 16 Dec 2022 12:09:53 -0800 Subject: [PATCH 1/3] Implement Temporal reset API --- README.md | 3 +- service/api/cadence/reset.go | 137 +---------------------------- service/api/temporal/client.go | 42 ++++++++- service/api/temporal/reset.go | 131 ++++++++++++++++++++++++++++ service/common/timeparser/time.go | 139 ++++++++++++++++++++++++++++++ 5 files changed, 314 insertions(+), 138 deletions(-) create mode 100644 service/api/temporal/reset.go create mode 100644 service/common/timeparser/time.go diff --git a/README.md b/README.md index 8dcfe833..cce715ff 100644 --- a/README.md +++ b/README.md @@ -144,11 +144,10 @@ Client APIs are hosted by iWF server for user workflow application to interact w * Stop workflow: stop a workflow execution * Signal workflow: send a signal to a workflow execution * Search workflow: search for workflows using a query language like SQL with search attributes -* Get workflow: get basic information about a workflow like status +* Get workflow: get basic information about a workflow like status and results(if completed or waiting for completed) * Get workflow data objects: get the dataObjects of a workflow execution * Get workflow search attributes: get the search attributes of a workflow execution * Reset workflow: reset a workflow to previous states -* Get workflow results: get the workflow completion results (with or without waiting for completion) # Why iWF diff --git a/service/api/cadence/reset.go b/service/api/cadence/reset.go index c22e0e86..03d1bc83 100644 --- a/service/api/cadence/reset.go +++ b/service/api/cadence/reset.go @@ -5,11 +5,9 @@ import ( "fmt" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/timeparser" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/cadence/.gen/go/shared" - "regexp" - "strconv" - "time" ) func getResetIDsByType( @@ -33,7 +31,7 @@ func getResetIDsByType( } case service.ResetTypeHistoryEventTime: var earliestTime int64 - earliestTime, err = parseTime(earliestHistoryTimeStr) + earliestTime, err = timeparser.ParseTime(earliestHistoryTimeStr) if err != nil { return } @@ -133,137 +131,6 @@ OuterLoop: return } -const ( - defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00 - // regex expression for parsing time durations, shorter, longer notations and numeric value respectively - defaultDateTimeRangeShortRE = "^[1-9][0-9]*[smhdwMy]$" // eg. 1s, 20m, 300h etc. - defaultDateTimeRangeLongRE = "^[1-9][0-9]*(second|minute|hour|day|week|month|year)$" // eg. 1second, 20minute, 300hour etc. - defaultDateTimeRangeNum = "^[1-9][0-9]*" // eg. 1, 20, 300 etc. -) - -func parseTime(timeStr string) (int64, error) { - defaultValue := int64(0) - if len(timeStr) == 0 { - return defaultValue, nil - } - - // try to parse - parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr) - if err == nil { - return parsedTime.UnixNano(), nil - } - - // treat as raw time - resultValue, err := strconv.ParseInt(timeStr, 10, 64) - if err == nil { - return resultValue, nil - } - - // treat as time range format - parsedTime, err = parseTimeRange(timeStr) - if err != nil { - return 0, fmt.Errorf("cannot parse time '%s', use UTC format '2006-01-02T15:04:05Z', "+ - "time range or raw UnixNano directly. See help for more details: %v", timeStr, err) - } - return parsedTime.UnixNano(), nil -} - -// parseTimeRange parses a given time duration string (in format X) and -// returns parsed timestamp given that duration in the past from current time. -// All valid values must contain a number followed by a time-duration, from the following list (long form/short form): -// - second/s -// - minute/m -// - hour/h -// - day/d -// - week/w -// - month/M -// - year/y -// For example, possible input values, and their result: -// - "3d" or "3day" --> three days --> time.Now().Add(-3 * 24 * time.Hour) -// - "2m" or "2minute" --> two minutes --> time.Now().Add(-2 * time.Minute) -// - "1w" or "1week" --> one week --> time.Now().Add(-7 * 24 * time.Hour) -// - "30s" or "30second" --> thirty seconds --> time.Now().Add(-30 * time.Second) -// Note: Duration strings are case-sensitive, and should be used as mentioned above only. -// Limitation: Value of numerical multiplier, X should be in b/w 0 - 1e6 (1 million), boundary values excluded i.e. -// 0 < X < 1e6. Also, the maximum time in the past can be 1 January 1970 00:00:00 UTC (epoch time), -// so giving "1000y" will result in epoch time. -func parseTimeRange(timeRange string) (time.Time, error) { - match, err := regexp.MatchString(defaultDateTimeRangeShortRE, timeRange) - if !match { // fallback on to check if it's of longer notation - _, err = regexp.MatchString(defaultDateTimeRangeLongRE, timeRange) - } - if err != nil { - return time.Time{}, err - } - - re, _ := regexp.Compile(defaultDateTimeRangeNum) - idx := re.FindStringSubmatchIndex(timeRange) - if idx == nil { - return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) - } - - num, err := strconv.Atoi(timeRange[idx[0]:idx[1]]) - if err != nil { - return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) - } - if num >= 1e6 { - return time.Time{}, fmt.Errorf("invalid time-duation multiplier %d, allowed range is 0 < multiplier < 1000000", num) - } - - dur, err := parseTimeDuration(timeRange[idx[1]:]) - if err != nil { - return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) - } - - res := time.Now().Add(time.Duration(-num) * dur) // using server's local timezone - epochTime := time.Unix(0, 0) - if res.Before(epochTime) { - res = epochTime - } - return res, nil -} - -const ( - // time ranges - day = 24 * time.Hour - week = 7 * day - month = 30 * day - year = 365 * day -) - -// parseTimeDuration parses the given time duration in either short or long convention -// and returns the time.Duration -// Valid values (long notation/short notation): -// - second/s -// - minute/m -// - hour/h -// - day/d -// - week/w -// - month/M -// - year/y -// NOTE: the input "duration" is case-sensitive -func parseTimeDuration(duration string) (dur time.Duration, err error) { - switch duration { - case "s", "second": - dur = time.Second - case "m", "minute": - dur = time.Minute - case "h", "hour": - dur = time.Hour - case "d", "day": - dur = day - case "w", "week": - dur = week - case "M", "month": - dur = month - case "y", "year": - dur = year - default: - err = fmt.Errorf("unknown time duration %s", duration) - } - return -} - func composeErrorWithMessage(msg string, err error) error { err = fmt.Errorf("%v, %v", msg, err) return err diff --git a/service/api/temporal/client.go b/service/api/temporal/client.go index e9d107ca..18cabc76 100644 --- a/service/api/temporal/client.go +++ b/service/api/temporal/client.go @@ -3,6 +3,7 @@ package temporal import ( "context" "fmt" + "github.com/google/uuid" "github.com/indeedeng/iwf/gen/iwfidl" "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/api" @@ -200,5 +201,44 @@ func (t *temporalClient) GetWorkflowResult(ctx context.Context, valuePtr interfa } func (t *temporalClient) ResetWorkflow(ctx context.Context, request iwfidl.WorkflowResetRequest) (runId string, err error) { - return "", fmt.Errorf("not supported") + reqRunId := request.GetWorkflowRunId() + if reqRunId == "" { + // set default runId to current + resp, err := t.DescribeWorkflowExecution(ctx, request.GetWorkflowId(), "") + if err != nil { + return "", err + } + reqRunId = resp.RunId + } + + resetType := service.ResetType(request.GetResetType()) + resetBaseRunID, resetEventId, err := getResetEventIDByType(ctx, resetType, + "", request.GetWorkflowId(), reqRunId, + t.tClient.WorkflowService(), request.GetHistoryEventId(), request.GetHistoryEventTime()) + + if err != nil { + return "", err + } + + requestId := uuid.New().String() + resetReapplyType := enums.RESET_REAPPLY_TYPE_SIGNAL + if request.GetSkipSignalReapply() { + resetReapplyType = enums.RESET_REAPPLY_TYPE_NONE + } + + resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ + WorkflowExecution: &common.WorkflowExecution{ + WorkflowId: request.WorkflowId, + RunId: resetBaseRunID, + }, + Reason: request.GetReason(), + WorkflowTaskFinishEventId: resetEventId, + RequestId: requestId, + ResetReapplyType: resetReapplyType, + }) + + if err != nil { + return "", err + } + return resp.GetRunId(), nil } diff --git a/service/api/temporal/reset.go b/service/api/temporal/reset.go new file mode 100644 index 00000000..565c5686 --- /dev/null +++ b/service/api/temporal/reset.go @@ -0,0 +1,131 @@ +package temporal + +import ( + "context" + "fmt" + "github.com/indeedeng/iwf/service" + "github.com/indeedeng/iwf/service/common/timeparser" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" +) + +func getResetEventIDByType(ctx context.Context, resetType service.ResetType, + namespace, wid, rid string, + frontendClient workflowservice.WorkflowServiceClient, + historyEventId int32, earliestHistoryTimeStr string, +) (resetBaseRunID string, workflowTaskFinishID int64, err error) { + // default to the same runID + resetBaseRunID = rid + + switch resetType { + case service.ResetTypeHistoryEventId: + workflowTaskFinishID = int64(historyEventId) + return + case service.ResetTypeHistoryEventTime: + var earliestTime int64 + earliestTime, err = timeparser.ParseTime(earliestHistoryTimeStr) + if err != nil { + return + } + workflowTaskFinishID, err = getEarliestDecisionEventID(ctx, namespace, wid, rid, earliestTime, frontendClient) + if err != nil { + return + } + case service.ResetTypeBeginning: + resetBaseRunID, workflowTaskFinishID, err = getFirstWorkflowTaskEventID(ctx, namespace, wid, rid, frontendClient) + if err != nil { + return + } + default: + panic("not supported resetType") + } + return +} + +func getFirstWorkflowTaskEventID(ctx context.Context, namespace, wid, rid string, frontendClient workflowservice.WorkflowServiceClient) (resetBaseRunID string, workflowTaskEventID int64, err error) { + resetBaseRunID = rid + req := &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wid, + RunId: rid, + }, + MaximumPageSize: 1000, + NextPageToken: nil, + } + for { + var resp *workflowservice.GetWorkflowExecutionHistoryResponse + resp, err = frontendClient.GetWorkflowExecutionHistory(ctx, req) + if err != nil { + return + } + for _, e := range resp.GetHistory().GetEvents() { + if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + workflowTaskEventID = e.GetEventId() + return resetBaseRunID, workflowTaskEventID, nil + } + if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED { + if workflowTaskEventID == 0 { + workflowTaskEventID = e.GetEventId() + 1 + } + } + } + if len(resp.NextPageToken) != 0 { + req.NextPageToken = resp.NextPageToken + } else { + break + } + } + if workflowTaskEventID == 0 { + err = fmt.Errorf("unable to find any scheduled or completed task") + return + } + return +} +func getEarliestDecisionEventID( + ctx context.Context, + namespace string, wid string, + rid string, earliestTime int64, + frontendClient workflowservice.WorkflowServiceClient, +) (decisionFinishID int64, err error) { + req := &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wid, + RunId: rid, + }, + MaximumPageSize: 1000, + NextPageToken: nil, + } + +OuterLoop: + for { + resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req) + if err != nil { + return 0, composeErrorWithMessage("GetWorkflowExecutionHistory failed", err) + } + for _, e := range resp.GetHistory().GetEvents() { + if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + if e.GetEventTime().Unix() >= earliestTime { + decisionFinishID = e.GetEventId() + break OuterLoop + } + } + } + if len(resp.NextPageToken) != 0 { + req.NextPageToken = resp.NextPageToken + } else { + break + } + } + if decisionFinishID == 0 { + return 0, composeErrorWithMessage("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID")) + } + return +} + +func composeErrorWithMessage(msg string, err error) error { + err = fmt.Errorf("%v, %v", msg, err) + return err +} diff --git a/service/common/timeparser/time.go b/service/common/timeparser/time.go new file mode 100644 index 00000000..733f6322 --- /dev/null +++ b/service/common/timeparser/time.go @@ -0,0 +1,139 @@ +package timeparser + +import ( + "fmt" + "regexp" + "strconv" + "time" +) + +const ( + defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00 + // regex expression for parsing time durations, shorter, longer notations and numeric value respectively + defaultDateTimeRangeShortRE = "^[1-9][0-9]*[smhdwMy]$" // eg. 1s, 20m, 300h etc. + defaultDateTimeRangeLongRE = "^[1-9][0-9]*(second|minute|hour|day|week|month|year)$" // eg. 1second, 20minute, 300hour etc. + defaultDateTimeRangeNum = "^[1-9][0-9]*" // eg. 1, 20, 300 etc. +) + +func ParseTime(timeStr string) (int64, error) { + defaultValue := int64(0) + if len(timeStr) == 0 { + return defaultValue, nil + } + + // try to parse + parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr) + if err == nil { + return parsedTime.UnixNano(), nil + } + + // treat as raw time + resultValue, err := strconv.ParseInt(timeStr, 10, 64) + if err == nil { + return resultValue, nil + } + + // treat as time range format + parsedTime, err = parseTimeRange(timeStr) + if err != nil { + return 0, fmt.Errorf("cannot parse time '%s', use UTC format '2006-01-02T15:04:05Z', "+ + "time range or raw UnixNano directly. See help for more details: %v", timeStr, err) + } + return parsedTime.UnixNano(), nil +} + +// parseTimeRange parses a given time duration string (in format X) and +// returns parsed timestamp given that duration in the past from current time. +// All valid values must contain a number followed by a time-duration, from the following list (long form/short form): +// - second/s +// - minute/m +// - hour/h +// - day/d +// - week/w +// - month/M +// - year/y +// For example, possible input values, and their result: +// - "3d" or "3day" --> three days --> time.Now().Add(-3 * 24 * time.Hour) +// - "2m" or "2minute" --> two minutes --> time.Now().Add(-2 * time.Minute) +// - "1w" or "1week" --> one week --> time.Now().Add(-7 * 24 * time.Hour) +// - "30s" or "30second" --> thirty seconds --> time.Now().Add(-30 * time.Second) +// Note: Duration strings are case-sensitive, and should be used as mentioned above only. +// Limitation: Value of numerical multiplier, X should be in b/w 0 - 1e6 (1 million), boundary values excluded i.e. +// 0 < X < 1e6. Also, the maximum time in the past can be 1 January 1970 00:00:00 UTC (epoch time), +// so giving "1000y" will result in epoch time. +func parseTimeRange(timeRange string) (time.Time, error) { + match, err := regexp.MatchString(defaultDateTimeRangeShortRE, timeRange) + if !match { // fallback on to check if it's of longer notation + _, err = regexp.MatchString(defaultDateTimeRangeLongRE, timeRange) + } + if err != nil { + return time.Time{}, err + } + + re, _ := regexp.Compile(defaultDateTimeRangeNum) + idx := re.FindStringSubmatchIndex(timeRange) + if idx == nil { + return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) + } + + num, err := strconv.Atoi(timeRange[idx[0]:idx[1]]) + if err != nil { + return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) + } + if num >= 1e6 { + return time.Time{}, fmt.Errorf("invalid time-duation multiplier %d, allowed range is 0 < multiplier < 1000000", num) + } + + dur, err := parseTimeDuration(timeRange[idx[1]:]) + if err != nil { + return time.Time{}, fmt.Errorf("cannot parse timeRange %s", timeRange) + } + + res := time.Now().Add(time.Duration(-num) * dur) // using server's local timezone + epochTime := time.Unix(0, 0) + if res.Before(epochTime) { + res = epochTime + } + return res, nil +} + +const ( + // time ranges + day = 24 * time.Hour + week = 7 * day + month = 30 * day + year = 365 * day +) + +// parseTimeDuration parses the given time duration in either short or long convention +// and returns the time.Duration +// Valid values (long notation/short notation): +// - second/s +// - minute/m +// - hour/h +// - day/d +// - week/w +// - month/M +// - year/y +// NOTE: the input "duration" is case-sensitive +func parseTimeDuration(duration string) (dur time.Duration, err error) { + switch duration { + case "s", "second": + dur = time.Second + case "m", "minute": + dur = time.Minute + case "h", "hour": + dur = time.Hour + case "d", "day": + dur = day + case "w", "week": + dur = week + case "M", "month": + dur = month + case "y", "year": + dur = year + default: + err = fmt.Errorf("unknown time duration %s", duration) + } + return +} From 73693b836d0c794d4f9f30346c2f2f11e4533ed9 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 16 Dec 2022 12:16:51 -0800 Subject: [PATCH 2/3] Done test --- README.md | 2 +- cmd/server/iwf/iwf.go | 2 +- integ/util.go | 8 ++++++-- script/http/local/home.http | 4 ++-- service/api/temporal/client.go | 11 +++++++---- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index cce715ff..144ecb04 100644 --- a/README.md +++ b/README.md @@ -259,7 +259,7 @@ cadence adm cl asa --search_attr_key IwfWorkflowType --search_attr_type 0 - [x] Stop workflow API ### 1.1 -- [x] Reset workflow API (Cadence only, TODO for Temporal) +- [x] Reset workflow API - [x] Command type(s) for inter-state communications (e.g. internal channel) - [x] AnyCommandCompleted Decider trigger type - [x] More workflow start options: IdReusePolicy, cron schedule, retry diff --git a/cmd/server/iwf/iwf.go b/cmd/server/iwf/iwf.go index 809f2d1b..3decfa94 100644 --- a/cmd/server/iwf/iwf.go +++ b/cmd/server/iwf/iwf.go @@ -99,7 +99,7 @@ func start(c *cli.Context) { if err != nil { log.Fatalf("Unable to connect to Temporal because of error %v", err) } - unifiedClient = temporalapi.NewTemporalClient(temporalClient) + unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Backend.Temporal.Namespace) for _, svcName := range services { go launchTemporalService(svcName, config, unifiedClient, temporalClient) diff --git a/integ/util.go b/integ/util.go index 160c409a..825ad5c7 100644 --- a/integ/util.go +++ b/integ/util.go @@ -15,8 +15,12 @@ import ( "net/http" ) +const testNamespace = "default" + func createTemporalClient() client.Client { - temporalClient, err := client.Dial(client.Options{}) + temporalClient, err := client.Dial(client.Options{ + Namespace: testNamespace, + }) if err != nil { log.Fatalf("unable to connect to Temporal %v", err) } @@ -43,7 +47,7 @@ func startWorkflowWorker(handler common.WorkflowHandler) (closeFunc func()) { func startIwfService(backendType service.BackendType) (closeFunc func()) { if backendType == service.BackendTypeTemporal { temporalClient := createTemporalClient() - iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient)) + iwfService := api.NewService(temporalapi.NewTemporalClient(temporalClient, testNamespace)) iwfServer := &http.Server{ Addr: ":" + testIwfServerPort, Handler: iwfService, diff --git a/script/http/local/home.http b/script/http/local/home.http index 84c4ac89..ef4f5b97 100644 --- a/script/http/local/home.http +++ b/script/http/local/home.http @@ -20,6 +20,6 @@ POST http://localhost:8801/api/v1/workflow/reset Content-Type: application/json { - "workflowId": "signal1665468993", - "resetType": "FIRST_DECISION_COMPLETED" + "workflowId": "timer1671219152", + "resetType": "BEGINNING" } \ No newline at end of file diff --git a/service/api/temporal/client.go b/service/api/temporal/client.go index 18cabc76..a1fcbbb8 100644 --- a/service/api/temporal/client.go +++ b/service/api/temporal/client.go @@ -17,12 +17,14 @@ import ( ) type temporalClient struct { - tClient client.Client + tClient client.Client + namespace string } -func NewTemporalClient(tClient client.Client) api.UnifiedClient { +func NewTemporalClient(tClient client.Client, namespace string) api.UnifiedClient { return &temporalClient{ - tClient: tClient, + tClient: tClient, + namespace: namespace, } } @@ -213,7 +215,7 @@ func (t *temporalClient) ResetWorkflow(ctx context.Context, request iwfidl.Workf resetType := service.ResetType(request.GetResetType()) resetBaseRunID, resetEventId, err := getResetEventIDByType(ctx, resetType, - "", request.GetWorkflowId(), reqRunId, + t.namespace, request.GetWorkflowId(), reqRunId, t.tClient.WorkflowService(), request.GetHistoryEventId(), request.GetHistoryEventTime()) if err != nil { @@ -227,6 +229,7 @@ func (t *temporalClient) ResetWorkflow(ctx context.Context, request iwfidl.Workf } resp, err := t.tClient.ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: t.namespace, WorkflowExecution: &common.WorkflowExecution{ WorkflowId: request.WorkflowId, RunId: resetBaseRunID, From 35714c5478cb6295cd5112be8a145ef8da7541f9 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 16 Dec 2022 12:25:41 -0800 Subject: [PATCH 3/3] done --- script/http/local/home.http | 7 +++++-- service/api/cadence/reset.go | 6 +++--- service/api/temporal/reset.go | 8 ++++---- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/script/http/local/home.http b/script/http/local/home.http index ef4f5b97..00962279 100644 --- a/script/http/local/home.http +++ b/script/http/local/home.http @@ -15,11 +15,14 @@ Content-Type: application/json "stateOptions": {} } -### test reset API +### test reset API NOTE: try different resetType: BEGINNING, HISTORY_EVENT_ID, HISTORY_EVENT_TIME POST http://localhost:8801/api/v1/workflow/reset Content-Type: application/json { "workflowId": "timer1671219152", - "resetType": "BEGINNING" + "workflowRunId": "fae51e69-7e41-4658-89aa-663463951dc7", + "resetType": "HISTORY_EVENT_TIME", + "historyEventId":15, + "historyEventTime": "150m" } \ No newline at end of file diff --git a/service/api/cadence/reset.go b/service/api/cadence/reset.go index 03d1bc83..35353bb1 100644 --- a/service/api/cadence/reset.go +++ b/service/api/cadence/reset.go @@ -30,12 +30,12 @@ func getResetIDsByType( return } case service.ResetTypeHistoryEventTime: - var earliestTime int64 - earliestTime, err = timeparser.ParseTime(earliestHistoryTimeStr) + var earliestTimeUnixNano int64 + earliestTimeUnixNano, err = timeparser.ParseTime(earliestHistoryTimeStr) if err != nil { return } - decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTime, frontendClient) + decisionFinishID, err = getEarliestDecisionID(ctx, domain, wid, rid, earliestTimeUnixNano, frontendClient) if err != nil { return } diff --git a/service/api/temporal/reset.go b/service/api/temporal/reset.go index 565c5686..03cfa852 100644 --- a/service/api/temporal/reset.go +++ b/service/api/temporal/reset.go @@ -23,12 +23,12 @@ func getResetEventIDByType(ctx context.Context, resetType service.ResetType, workflowTaskFinishID = int64(historyEventId) return case service.ResetTypeHistoryEventTime: - var earliestTime int64 - earliestTime, err = timeparser.ParseTime(earliestHistoryTimeStr) + var earliestTimeUnixNano int64 + earliestTimeUnixNano, err = timeparser.ParseTime(earliestHistoryTimeStr) if err != nil { return } - workflowTaskFinishID, err = getEarliestDecisionEventID(ctx, namespace, wid, rid, earliestTime, frontendClient) + workflowTaskFinishID, err = getEarliestDecisionEventID(ctx, namespace, wid, rid, earliestTimeUnixNano, frontendClient) if err != nil { return } @@ -107,7 +107,7 @@ OuterLoop: } for _, e := range resp.GetHistory().GetEvents() { if e.GetEventType() == enums.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - if e.GetEventTime().Unix() >= earliestTime { + if e.GetEventTime().UnixNano() >= earliestTime { decisionFinishID = e.GetEventId() break OuterLoop }