Skip to content

Commit

Permalink
Refactor: move all archival related scaffolding to ArchivalSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jan 17, 2025
1 parent 38903d3 commit a75923a
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 144 deletions.
139 changes: 71 additions & 68 deletions tests/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
Expand All @@ -58,14 +57,12 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

const (
retryLimit = 40
retryBackoffTime = 500 * time.Millisecond
)

type (
ArchivalSuite struct {
testcore.FunctionalTestSuite

archivalNamespace namespace.Name
archivalNamespaceID namespace.ID
}

archivalWorkflowInfo struct {
Expand All @@ -81,79 +78,95 @@ func TestArchivalSuite(t *testing.T) {
}

func (s *ArchivalSuite) SetupSuite() {
dynamicConfigOverrides := map[dynamicconfig.Key]any{
dynamicconfig.ArchivalProcessorArchiveDelay.Key(): time.Duration(0),
}
s.FunctionalTestSuite.SetupSuiteWithDefaultCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides))
dynamicConfigOverrides :=map[dynamicconfig.Key]any{
dynamicconfig.ArchivalProcessorArchiveDelay.Key(): time.Duration(0),
}s.FunctionalTestSuite.SetupSuiteWithDefaultCluster(testcore.WithDynamicConfigOverrides(dynamicConfigOverrides),
testcore.WithArchivalEnabled(),
)

var err error
s.archivalNamespace = namespace.Name(testcore.RandomizeStr("archival-enabled-namespace"))
s.archivalNamespaceID, err = s.RegisterNamespace(
s.archivalNamespace,
0, // Archive right away.
enumspb.ARCHIVAL_STATE_ENABLED,
s.GetTestCluster().ArchiverBase().HistoryURI(),
s.GetTestCluster().ArchiverBase().VisibilityURI(),
)
s.Require().NoError(err)
}

func (s *ArchivalSuite) TearDownSuite() {
s.Require().NoError(s.MarkNamespaceAsDeleted(s.archivalNamespace))
s.FunctionalTestBase.TearDownCluster()
}

func (s *ArchivalSuite) TestArchival_TimerQueueProcessor() {
s.True(s.GetTestCluster().ArchivalBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())
s.True(s.GetTestCluster().ArchiverBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())

workflowID := "archival-timer-queue-processor-workflow-id"
workflowType := "archival-timer-queue-processor-type"
taskQueue := "archival-timer-queue-processor-task-queue"
numActivities := 1
numRuns := 1
workflowInfo := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.ArchivalNamespace(), numActivities, numRuns)[0]
workflowInfo := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.archivalNamespace, numActivities, numRuns)[0]

s.True(s.isArchived(s.ArchivalNamespaceID(), workflowInfo.execution))
s.True(s.isHistoryDeleted(workflowInfo))
s.True(s.isMutableStateDeleted(s.ArchivalNamespaceID(), workflowInfo.execution))
s.workflowIsArchived(s.archivalNamespaceID, workflowInfo.execution)
s.historyIsDeleted(workflowInfo)
s.mutableStateIsDeleted(s.archivalNamespaceID, workflowInfo.execution)
}

func (s *ArchivalSuite) TestArchival_ContinueAsNew() {
s.True(s.GetTestCluster().ArchivalBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())
s.True(s.GetTestCluster().ArchiverBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())

workflowID := "archival-continueAsNew-workflow-id"
workflowType := "archival-continueAsNew-workflow-type"
taskQueue := "archival-continueAsNew-task-queue"
numActivities := 1
numRuns := 5
workflowInfos := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.ArchivalNamespace(), numActivities, numRuns)
workflowInfos := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.archivalNamespace, numActivities, numRuns)

for _, workflowInfo := range workflowInfos {
s.True(s.isArchived(s.ArchivalNamespaceID(), workflowInfo.execution))
s.True(s.isHistoryDeleted(workflowInfo))
s.True(s.isMutableStateDeleted(s.ArchivalNamespaceID(), workflowInfo.execution))
s.workflowIsArchived(s.archivalNamespaceID, workflowInfo.execution)
s.historyIsDeleted(workflowInfo)
s.mutableStateIsDeleted(s.archivalNamespaceID, workflowInfo.execution)
}
}

func (s *ArchivalSuite) TestArchival_ArchiverWorker() {
s.T().SkipNow() // flaky test, skip for now, will reimplement archival feature.
// s.T().SkipNow() // flaky test, skip for now, will reimplement archival feature.

s.True(s.GetTestCluster().ArchivalBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())
s.True(s.GetTestCluster().ArchiverBase().Metadata().GetHistoryConfig().ClusterConfiguredForArchival())

workflowID := "archival-archiver-worker-workflow-id"
workflowType := "archival-archiver-worker-workflow-type"
taskQueue := "archival-archiver-worker-task-queue"
numActivities := 10
workflowInfo := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.ArchivalNamespace(), numActivities, 1)[0]
workflowInfo := s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.archivalNamespace, numActivities, 1)[0]

s.True(s.isArchived(s.ArchivalNamespaceID(), workflowInfo.execution))
s.True(s.isHistoryDeleted(workflowInfo))
s.True(s.isMutableStateDeleted(s.ArchivalNamespaceID(), workflowInfo.execution))
s.workflowIsArchived(s.archivalNamespaceID, workflowInfo.execution)
s.historyIsDeleted(workflowInfo)
s.mutableStateIsDeleted(s.archivalNamespaceID, workflowInfo.execution)
}

func (s *ArchivalSuite) TestVisibilityArchival() {
s.True(s.GetTestCluster().ArchivalBase().Metadata().GetVisibilityConfig().ClusterConfiguredForArchival())
s.True(s.GetTestCluster().ArchiverBase().Metadata().GetVisibilityConfig().ClusterConfiguredForArchival())

workflowID := "archival-visibility-workflow-id"
workflowType := "archival-visibility-workflow-type"
taskQueue := "archival-visibility-task-queue"
numActivities := 3
numRuns := 5
startTime := time.Now().UnixNano()
s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.ArchivalNamespace(), numActivities, numRuns)
s.startAndFinishWorkflow("some other workflowID", "some other workflow type", taskQueue, s.ArchivalNamespace(), numActivities, numRuns)
s.startAndFinishWorkflow(workflowID, workflowType, taskQueue, s.archivalNamespace, numActivities, numRuns)
s.startAndFinishWorkflow("some other workflowID", "some other workflow type", taskQueue, s.archivalNamespace, numActivities, numRuns)
endTime := time.Now().UnixNano()

var executions []*workflowpb.WorkflowExecutionInfo

for i := 0; i != retryLimit; i++ {
executions = []*workflowpb.WorkflowExecutionInfo{}
s.Eventually(func() bool {
request := &workflowservice.ListArchivedWorkflowExecutionsRequest{
Namespace: s.ArchivalNamespace().String(),
Namespace: s.archivalNamespace.String(),
PageSize: 2,
Query: fmt.Sprintf("CloseTime >= %v and CloseTime <= %v and WorkflowType = '%s'", startTime, endTime, workflowType),
}
Expand All @@ -165,10 +178,10 @@ func (s *ArchivalSuite) TestVisibilityArchival() {
request.NextPageToken = response.NextPageToken
}
if len(executions) == numRuns {
break
return true
}
time.Sleep(retryBackoffTime) //nolint:forbidigo
}
return false
}, 20*time.Second, 500*time.Millisecond)

for _, execution := range executions {
s.Equal(workflowID, execution.GetExecution().GetWorkflowId())
Expand All @@ -184,30 +197,27 @@ func (s *ArchivalSuite) TestVisibilityArchival() {
}
}

// isArchived returns true if both the workflow history and workflow visibility are archived.
func (s *ArchivalSuite) isArchived(namespaceID namespace.ID, execution *commonpb.WorkflowExecution) bool {
// workflowIsArchived asserts that both the workflow history and workflow visibility are archived.
func (s *ArchivalSuite) workflowIsArchived(namespaceID namespace.ID, execution *commonpb.WorkflowExecution) {
serviceName := string(primitives.HistoryService)
historyURI, err := archiver.NewURI(s.GetTestCluster().ArchivalBase().HistoryURI())
historyURI, err := archiver.NewURI(s.GetTestCluster().ArchiverBase().HistoryURI())
s.NoError(err)
historyArchiver, err := s.GetTestCluster().ArchivalBase().Provider().GetHistoryArchiver(
historyArchiver, err := s.GetTestCluster().ArchiverBase().Provider().GetHistoryArchiver(
historyURI.Scheme(),
serviceName,
)
s.NoError(err)

visibilityURI, err := archiver.NewURI(s.GetTestCluster().ArchivalBase().VisibilityURI())
visibilityURI, err := archiver.NewURI(s.GetTestCluster().ArchiverBase().VisibilityURI())
s.NoError(err)
visibilityArchiver, err := s.GetTestCluster().ArchivalBase().Provider().GetVisibilityArchiver(
visibilityArchiver, err := s.GetTestCluster().ArchiverBase().Provider().GetVisibilityArchiver(
visibilityURI.Scheme(),
serviceName,
)
s.NoError(err)

for i := 0; i < retryLimit; i++ {
s.Eventually(func() bool {
ctx := testcore.NewContext()
if i > 0 {
time.Sleep(retryBackoffTime) //nolint:forbidigo
}
var historyResponse *archiver.GetHistoryResponse
historyResponse, err = historyArchiver.Get(ctx, historyURI, &archiver.GetHistoryRequest{
NamespaceID: namespaceID.String(),
Expand All @@ -216,10 +226,10 @@ func (s *ArchivalSuite) isArchived(namespaceID namespace.ID, execution *commonpb
PageSize: 1,
})
if err != nil {
continue
return false
}
if len(historyResponse.HistoryBatches) == 0 {
continue
return false
}
var visibilityResponse *archiver.QueryVisibilityResponse
visibilityResponse, err = visibilityArchiver.Query(
Expand All @@ -237,28 +247,23 @@ func (s *ArchivalSuite) isArchived(namespaceID namespace.ID, execution *commonpb
searchattribute.NameTypeMap{},
)
if err != nil {
continue
return false
}
if len(visibilityResponse.Executions) > 0 {
return true
}
}
if err != nil {
fmt.Println("isArchived failed with error: ", err)
}
return false
return false
}, 20*time.Second, 500*time.Millisecond)
}

func (s *ArchivalSuite) isHistoryDeleted(
workflowInfo archivalWorkflowInfo,
) bool {
func (s *ArchivalSuite) historyIsDeleted(workflowInfo archivalWorkflowInfo) {
shardID := common.WorkflowIDToHistoryShard(
s.ArchivalNamespaceID().String(),
s.archivalNamespaceID.String(),
workflowInfo.execution.WorkflowId,
s.GetTestClusterConfig().HistoryConfig.NumHistoryShards,
)

for i := 0; i < retryLimit; i++ {
s.Eventually(func() bool {
_, err := s.GetTestCluster().TestBase().ExecutionManager.ReadHistoryBranch(
testcore.NewContext(),
&persistence.ReadHistoryBranchRequest{
Expand All @@ -273,14 +278,12 @@ func (s *ArchivalSuite) isHistoryDeleted(
if common.IsNotFoundError(err) {
return true
}

s.NoError(err)
time.Sleep(retryBackoffTime) //nolint:forbidigo
}
return false
return false
}, 20*time.Second, 500*time.Millisecond)
}

func (s *ArchivalSuite) isMutableStateDeleted(namespaceID namespace.ID, execution *commonpb.WorkflowExecution) bool {
func (s *ArchivalSuite) mutableStateIsDeleted(namespaceID namespace.ID, execution *commonpb.WorkflowExecution) {
shardID := common.WorkflowIDToHistoryShard(namespaceID.String(), execution.GetWorkflowId(),
s.GetTestClusterConfig().HistoryConfig.NumHistoryShards)
request := &persistence.GetWorkflowExecutionRequest{
Expand All @@ -290,14 +293,14 @@ func (s *ArchivalSuite) isMutableStateDeleted(namespaceID namespace.ID, executio
RunID: execution.RunId,
}

for i := 0; i < retryLimit; i++ {
s.Eventually(func() bool {
_, err := s.GetTestCluster().TestBase().ExecutionManager.GetWorkflowExecution(testcore.NewContext(), request)
if _, isNotFound := err.(*serviceerror.NotFound); isNotFound {
if common.IsNotFoundError(err) {
return true
}
time.Sleep(retryBackoffTime) //nolint:forbidigo
}
return false
s.NoError(err)
return false
}, 20*time.Second, 500*time.Millisecond)
}

func (s *ArchivalSuite) startAndFinishWorkflow(
Expand Down
Loading

0 comments on commit a75923a

Please sign in to comment.