Skip to content

Commit

Permalink
Move namespace replication to own package (#7112)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

Moved code from `common/namespace` to `common/namespace/nsreplication`.
Next to the existing `common/namespace/nsregistry`.

## Why?
<!-- Tell your future self why have you made these changes -->

The goal is to remove all references from `common/namespace` to
`common/persistence`. One step towards using `common/dynamicconfig` from
`common/namespace`.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

No behavior changes here.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos authored Jan 17, 2025
1 parent e2daba8 commit c9a7813
Show file tree
Hide file tree
Showing 26 changed files with 115 additions and 113 deletions.
3 changes: 0 additions & 3 deletions common/namespace/attr_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceConfig(config *persistencespb.Names
func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace(
replicationConfig *persistencespb.NamespaceReplicationConfig,
) error {

activeCluster := replicationConfig.ActiveClusterName
clusters := replicationConfig.Clusters

Expand All @@ -90,7 +89,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace(
func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace(
replicationConfig *persistencespb.NamespaceReplicationConfig,
) error {

activeCluster := replicationConfig.ActiveClusterName
clusters := replicationConfig.Clusters

Expand Down Expand Up @@ -120,7 +118,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace
func (d *AttrValidatorImpl) validateClusterName(
clusterName string,
) error {

if info, ok := d.clusterMetadata.GetAllClusterInfo()[clusterName]; !ok || !info.Enabled {
return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid cluster name: %v", clusterName))
}
Expand Down
13 changes: 3 additions & 10 deletions common/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
"github.com/google/uuid"
enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
replicationpb "go.temporal.io/api/replication/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/util"
)
Expand Down Expand Up @@ -139,8 +139,8 @@ func FromAdminClientApiResponse(response *adminservice.GetNamespaceResponse) *Na
replicationConfig := &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: response.GetReplicationConfig().GetActiveClusterName(),
State: response.GetReplicationConfig().GetState(),
Clusters: ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()),
FailoverHistory: convertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()),
Clusters: nsreplication.ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()),
FailoverHistory: nsreplication.ConvertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()),
}
return &Namespace{
info: info,
Expand Down Expand Up @@ -262,13 +262,6 @@ func (ns *Namespace) IsOnCluster(clusterName string) bool {
return false
}

// FailoverHistory returns the a copy of failover history for this namespace.
func (ns *Namespace) FailoverHistory() []*replicationpb.FailoverStatus {
return convertFailoverHistoryToReplicationProto(
ns.replicationConfig.GetFailoverHistory(),
)
}

// ConfigVersion return the namespace config version
func (ns *Namespace) ConfigVersion() int64 {
return ns.configVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go

package namespace
package nsreplication

import (
"context"
Expand All @@ -45,15 +45,15 @@ type (
}

dlqMessageHandlerImpl struct {
replicationHandler ReplicationTaskExecutor
replicationHandler TaskExecutor
namespaceReplicationQueue persistence.NamespaceReplicationQueue
logger log.Logger
}
)

// NewDLQMessageHandler returns a DLQTaskHandler instance
// NewDLQMessageHandler returns a DLQMessageHandler instance
func NewDLQMessageHandler(
replicationHandler ReplicationTaskExecutor,
replicationHandler TaskExecutor,
namespaceReplicationQueue persistence.NamespaceReplicationQueue,
logger log.Logger,
) DLQMessageHandler {
Expand All @@ -64,7 +64,7 @@ func NewDLQMessageHandler(
}
}

// ReadMessages reads namespace replication DLQ messages
// Read reads namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Read(
ctx context.Context,
lastMessageID int64,
Expand All @@ -86,12 +86,11 @@ func (d *dlqMessageHandlerImpl) Read(
)
}

// PurgeMessages purges namespace replication DLQ messages
// Purge purges namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Purge(
ctx context.Context,
lastMessageID int64,
) error {

ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx)
if err != nil {
return err
Expand All @@ -115,14 +114,13 @@ func (d *dlqMessageHandlerImpl) Purge(
return nil
}

// MergeMessages merges namespace replication DLQ messages
// Merge merges namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Merge(
ctx context.Context,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]byte, error) {

ackLevel, err := d.namespaceReplicationQueue.GetDLQAckLevel(ctx)
if err != nil {
return nil, err
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package namespace
package nsreplication

import (
"context"
Expand All @@ -46,7 +46,7 @@ type (
*require.Assertions
controller *gomock.Controller

mockReplicationTaskExecutor *MockReplicationTaskExecutor
mockReplicationTaskExecutor *MockTaskExecutor
mockReplicationQueue *persistence.MockNamespaceReplicationQueue
dlqMessageHandler *dlqMessageHandlerImpl
}
Expand All @@ -69,7 +69,7 @@ func (s *dlqMessageHandlerSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

logger := log.NewTestLogger()
s.mockReplicationTaskExecutor = NewMockReplicationTaskExecutor(s.controller)
s.mockReplicationTaskExecutor = NewMockTaskExecutor(s.controller)
s.mockReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller)

s.dlqMessageHandler = NewDLQMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replication_task_handler_mock.go
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replication_task_handler_mock.go

package namespace
package nsreplication

import (
"context"
Expand Down Expand Up @@ -66,34 +66,34 @@ var (
// NOTE: the counterpart of namespace replication transmission logic is in service/frontend package

type (
// ReplicationTaskExecutor is the interface for executing namespace replication tasks
ReplicationTaskExecutor interface {
// TaskExecutor is the interface for executing namespace replication tasks
TaskExecutor interface {
Execute(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) error
}

namespaceReplicationTaskExecutorImpl struct {
taskExecutorImpl struct {
currentCluster string
metadataManager persistence.MetadataManager
logger log.Logger
}
)

// NewReplicationTaskExecutor creates a new instance of namespace replicator
func NewReplicationTaskExecutor(
// NewTaskExecutor creates a new instance of namespace replicator
func NewTaskExecutor(
currentCluster string,
metadataManagerV2 persistence.MetadataManager,
logger log.Logger,
) ReplicationTaskExecutor {
) TaskExecutor {

return &namespaceReplicationTaskExecutorImpl{
return &taskExecutorImpl{
currentCluster: currentCluster,
metadataManager: metadataManagerV2,
logger: logger,
}
}

// Execute handles receiving of the namespace replication task
func (h *namespaceReplicationTaskExecutorImpl) Execute(
func (h *taskExecutorImpl) Execute(
ctx context.Context,
task *replicationspb.NamespaceTaskAttributes,
) error {
Expand Down Expand Up @@ -123,7 +123,7 @@ func checkClusterIncludedInReplicationConfig(clusterName string, repCfg []*repli
return false
}

func (h *namespaceReplicationTaskExecutorImpl) shouldProcessTask(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) (bool, error) {
func (h *taskExecutorImpl) shouldProcessTask(ctx context.Context, task *replicationspb.NamespaceTaskAttributes) (bool, error) {
resp, err := h.metadataManager.GetNamespace(ctx, &persistence.GetNamespaceRequest{
Name: task.Info.GetName(),
})
Expand All @@ -148,7 +148,7 @@ func (h *namespaceReplicationTaskExecutorImpl) shouldProcessTask(ctx context.Con
}

// handleNamespaceCreationReplicationTask handles the namespace creation replication task
func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicationTask(
func (h *taskExecutorImpl) handleNamespaceCreationReplicationTask(
ctx context.Context,
task *replicationspb.NamespaceTaskAttributes,
) error {
Expand Down Expand Up @@ -251,7 +251,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceCreationReplicatio
}

// handleNamespaceUpdateReplicationTask handles the namespace update replication task
func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationTask(
func (h *taskExecutorImpl) handleNamespaceUpdateReplicationTask(
ctx context.Context,
task *replicationspb.NamespaceTaskAttributes,
) error {
Expand Down Expand Up @@ -318,7 +318,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
request.Namespace.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName()
request.Namespace.FailoverVersion = task.GetFailoverVersion()
request.Namespace.FailoverNotificationVersion = notificationVersion
request.Namespace.ReplicationConfig.FailoverHistory = convertFailoverHistoryToPersistenceProto(task.GetFailoverHistory())
request.Namespace.ReplicationConfig.FailoverHistory = ConvertFailoverHistoryToPersistenceProto(task.GetFailoverHistory())
}

if !recordUpdated {
Expand All @@ -328,7 +328,7 @@ func (h *namespaceReplicationTaskExecutorImpl) handleNamespaceUpdateReplicationT
return h.metadataManager.UpdateNamespace(ctx, request)
}

func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceReplicationTask(task *replicationspb.NamespaceTaskAttributes) error {
func (h *taskExecutorImpl) validateNamespaceReplicationTask(task *replicationspb.NamespaceTaskAttributes) error {
if task == nil {
return ErrEmptyNamespaceReplicationTask
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func ConvertClusterReplicationConfigFromProto(
return output
}

func convertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.FailoverStatus) []*persistencespb.FailoverStatus {
func ConvertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.FailoverStatus) []*persistencespb.FailoverStatus {
var res []*persistencespb.FailoverStatus
for _, status := range failoverHistory {
res = append(res, &persistencespb.FailoverStatus{
Expand All @@ -367,7 +367,7 @@ func convertFailoverHistoryToPersistenceProto(failoverHistory []*replicationpb.F
return res
}

func (h *namespaceReplicationTaskExecutorImpl) validateNamespaceStatus(input enumspb.NamespaceState) error {
func (h *taskExecutorImpl) validateNamespaceStatus(input enumspb.NamespaceState) error {
switch input {
case enumspb.NAMESPACE_STATE_REGISTERED, enumspb.NAMESPACE_STATE_DEPRECATED:
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package namespace
package nsreplication

import (
"context"
Expand Down Expand Up @@ -52,7 +52,7 @@ type (
controller *gomock.Controller

mockMetadataMgr *persistence.MockMetadataManager
namespaceReplicator *namespaceReplicationTaskExecutorImpl
namespaceReplicator *taskExecutorImpl
}
)

Expand All @@ -72,11 +72,11 @@ func (s *namespaceReplicationTaskExecutorSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockMetadataMgr = persistence.NewMockMetadataManager(s.controller)
logger := log.NewTestLogger()
s.namespaceReplicator = NewReplicationTaskExecutor(
s.namespaceReplicator = NewTaskExecutor(
"some random standby cluster name",
s.mockMetadataMgr,
logger,
).(*namespaceReplicationTaskExecutorImpl)
).(*taskExecutorImpl)
}

func (s *namespaceReplicationTaskExecutorSuite) TearDownTest() {
Expand Down Expand Up @@ -473,7 +473,7 @@ func (s *namespaceReplicationTaskExecutorSuite) TestExecute_UpdateNamespaceTask_
ReplicationConfig: &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: updateTask.ReplicationConfig.ActiveClusterName,
Clusters: []string{updateClusterActive, updateClusterStandby},
FailoverHistory: convertFailoverHistoryToPersistenceProto(failoverHistory),
FailoverHistory: ConvertFailoverHistoryToPersistenceProto(failoverHistory),
},
ConfigVersion: updateConfigVersion,
FailoverNotificationVersion: updateFailoverVersion,
Expand Down
Loading

0 comments on commit c9a7813

Please sign in to comment.