Skip to content

Commit

Permalink
Move namespace DLQ and Replication to own packages
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Jan 17, 2025
1 parent 795bbbd commit 7aaf73a
Show file tree
Hide file tree
Showing 24 changed files with 140 additions and 133 deletions.
3 changes: 0 additions & 3 deletions common/namespace/attr_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceConfig(config *persistencespb.Names
func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace(
replicationConfig *persistencespb.NamespaceReplicationConfig,
) error {

activeCluster := replicationConfig.ActiveClusterName
clusters := replicationConfig.Clusters

Expand All @@ -90,7 +89,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForLocalNamespace(
func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace(
replicationConfig *persistencespb.NamespaceReplicationConfig,
) error {

activeCluster := replicationConfig.ActiveClusterName
clusters := replicationConfig.Clusters

Expand Down Expand Up @@ -120,7 +118,6 @@ func (d *AttrValidatorImpl) ValidateNamespaceReplicationConfigForGlobalNamespace
func (d *AttrValidatorImpl) validateClusterName(
clusterName string,
) error {

if info, ok := d.clusterMetadata.GetAllClusterInfo()[clusterName]; !ok || !info.Enabled {
return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid cluster name: %v", clusterName))
}
Expand Down
13 changes: 3 additions & 10 deletions common/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
"github.com/google/uuid"
enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
replicationpb "go.temporal.io/api/replication/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/util"
)
Expand Down Expand Up @@ -139,8 +139,8 @@ func FromAdminClientApiResponse(response *adminservice.GetNamespaceResponse) *Na
replicationConfig := &persistencespb.NamespaceReplicationConfig{
ActiveClusterName: response.GetReplicationConfig().GetActiveClusterName(),
State: response.GetReplicationConfig().GetState(),
Clusters: ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()),
FailoverHistory: convertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()),
Clusters: nsreplication.ConvertClusterReplicationConfigFromProto(response.GetReplicationConfig().GetClusters()),
FailoverHistory: nsreplication.ConvertFailoverHistoryToPersistenceProto(response.GetFailoverHistory()),
}
return &Namespace{
info: info,
Expand Down Expand Up @@ -262,13 +262,6 @@ func (ns *Namespace) IsOnCluster(clusterName string) bool {
return false
}

// FailoverHistory returns the a copy of failover history for this namespace.
func (ns *Namespace) FailoverHistory() []*replicationpb.FailoverStatus {
return convertFailoverHistoryToReplicationProto(
ns.replicationConfig.GetFailoverHistory(),
)
}

// ConfigVersion return the namespace config version
func (ns *Namespace) ConfigVersion() int64 {
return ns.configVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_message_handler_mock.go

package namespace
package nsdlq

import (
"context"
Expand All @@ -33,39 +33,40 @@ import (
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/persistence"
)

type (
// DLQMessageHandler is the interface handles namespace DLQ messages
DLQMessageHandler interface {
// MessageHandler is the interface handles namespace DLQ messages
MessageHandler interface {
Read(ctx context.Context, lastMessageID int64, pageSize int, pageToken []byte) ([]*replicationspb.ReplicationTask, []byte, error)
Purge(ctx context.Context, lastMessageID int64) error
Merge(ctx context.Context, lastMessageID int64, pageSize int, pageToken []byte) ([]byte, error)
}

dlqMessageHandlerImpl struct {
replicationHandler ReplicationTaskExecutor
messageHandlerImpl struct {
replicationHandler nsreplication.TaskExecutor
namespaceReplicationQueue persistence.NamespaceReplicationQueue
logger log.Logger
}
)

// NewDLQMessageHandler returns a DLQTaskHandler instance
func NewDLQMessageHandler(
replicationHandler ReplicationTaskExecutor,
// NewMessageHandler returns a MessageHandler instance
func NewMessageHandler(
replicationHandler nsreplication.TaskExecutor,
namespaceReplicationQueue persistence.NamespaceReplicationQueue,
logger log.Logger,
) DLQMessageHandler {
return &dlqMessageHandlerImpl{
) MessageHandler {
return &messageHandlerImpl{
replicationHandler: replicationHandler,
namespaceReplicationQueue: namespaceReplicationQueue,
logger: logger,
}
}

// ReadMessages reads namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Read(
func (d *messageHandlerImpl) Read(
ctx context.Context,
lastMessageID int64,
pageSize int,
Expand All @@ -87,7 +88,7 @@ func (d *dlqMessageHandlerImpl) Read(
}

// PurgeMessages purges namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Purge(
func (d *messageHandlerImpl) Purge(
ctx context.Context,
lastMessageID int64,
) error {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (d *dlqMessageHandlerImpl) Purge(
}

// MergeMessages merges namespace replication DLQ messages
func (d *dlqMessageHandlerImpl) Merge(
func (d *messageHandlerImpl) Merge(
ctx context.Context,
lastMessageID int64,
pageSize int,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package namespace
package nsdlq

import (
"context"
Expand All @@ -35,6 +35,7 @@ import (
enumsspb "go.temporal.io/server/api/enums/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace/nsreplication"
"go.temporal.io/server/common/persistence"
"go.uber.org/mock/gomock"
)
Expand All @@ -46,9 +47,9 @@ type (
*require.Assertions
controller *gomock.Controller

mockReplicationTaskExecutor *MockReplicationTaskExecutor
mockReplicationTaskExecutor *nsreplication.MockTaskExecutor
mockReplicationQueue *persistence.MockNamespaceReplicationQueue
dlqMessageHandler *dlqMessageHandlerImpl
dlqMessageHandler *messageHandlerImpl
}
)

Expand All @@ -69,14 +70,14 @@ func (s *dlqMessageHandlerSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

logger := log.NewTestLogger()
s.mockReplicationTaskExecutor = NewMockReplicationTaskExecutor(s.controller)
s.mockReplicationTaskExecutor = nsreplication.NewMockTaskExecutor(s.controller)
s.mockReplicationQueue = persistence.NewMockNamespaceReplicationQueue(s.controller)

s.dlqMessageHandler = NewDLQMessageHandler(
s.dlqMessageHandler = NewMessageHandler(
s.mockReplicationTaskExecutor,
s.mockReplicationQueue,
logger,
).(*dlqMessageHandlerImpl)
).(*messageHandlerImpl)
}

func (s *dlqMessageHandlerSuite) TearDownTest() {
Expand Down
Loading

0 comments on commit 7aaf73a

Please sign in to comment.