Skip to content

Commit

Permalink
Introduce VersionVector to detect the relationship between changes
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Jul 16, 2024
1 parent b468f8b commit c78ffe7
Show file tree
Hide file tree
Showing 32 changed files with 2,633 additions and 1,988 deletions.
6 changes: 6 additions & 0 deletions admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,16 @@ func (c *Client) ListChangeSummaries(
return nil, err
}

vector, err := converter.FromVersionVector(snapshotMeta.Msg.VersionVector)
if err != nil {
return nil, err
}

newDoc, err := document.NewInternalDocumentFromSnapshot(
key,
seq,
snapshotMeta.Msg.Lamport,
vector,
snapshotMeta.Msg.Snapshot,
)

Expand Down
42 changes: 39 additions & 3 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,23 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

return &change.Pack{
pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
MinSyncedTicket: minSyncedTicket,
IsRemoved: pbPack.IsRemoved,
}, nil
}

if pbPack.Snapshot != nil {
pack.Snapshot = pbPack.Snapshot
pack.SnapshotVersionVector, err = FromVersionVector(pbPack.SnapshotVersionVector)
if err != nil {
return nil, err
}
}

return pack, nil
}

func fromCheckpoint(pbCheckpoint *api.Checkpoint) change.Checkpoint {
Expand Down Expand Up @@ -147,14 +156,41 @@ func fromChangeID(id *api.ChangeID) (change.ID, error) {
if err != nil {
return change.InitialID, err
}

vector, err := FromVersionVector(id.VersionVector)
if err != nil {
return change.InitialID, err
}

return change.NewID(
id.ClientSeq,
id.ServerSeq,
id.Lamport,
actorID,
vector,
), nil
}

// FromVersionVector converts the given Protobuf formats to model format.
func FromVersionVector(pbVersionVector *api.VersionVector) (time.VersionVector, error) {
versionVector := make(time.VersionVector)
// TODO(hackerwins): Old clients until v0.4.15 don't send VersionVector.
// Remove this check after all clients are updated to the v0.4.16 or later.
if pbVersionVector == nil {
return versionVector, nil
}

for id, lamport := range pbVersionVector.Vector {
actorID, err := time.ActorIDFromHex(id)
if err != nil {
return nil, err
}
versionVector.Set(actorID, lamport)
}

return versionVector, nil
}

// FromDocumentID converts the given Protobuf formats to model format.
func FromDocumentID(pbID string) (types.ID, error) {
id := types.ID(pbID)
Expand Down
39 changes: 33 additions & 6 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,35 @@ func ToCheckpoint(cp change.Checkpoint) *api.Checkpoint {
}

// ToChangeID converts the given model format to Protobuf format.
func ToChangeID(id change.ID) *api.ChangeID {
func ToChangeID(id change.ID) (*api.ChangeID, error) {
pbVersionVector, err := ToVersionVector(id.VersionVector())
if err != nil {
return nil, err
}
return &api.ChangeID{
ClientSeq: id.ClientSeq(),
ServerSeq: id.ServerSeq(),
Lamport: id.Lamport(),
ActorId: id.ActorID().Bytes(),
ClientSeq: id.ClientSeq(),
ServerSeq: id.ServerSeq(),
Lamport: id.Lamport(),
ActorId: id.ActorID().Bytes(),
VersionVector: pbVersionVector,
}, nil
}

// ToVersionVector converts the given model format to Protobuf format.
func ToVersionVector(vector time.VersionVector) (*api.VersionVector, error) {
pbVersionVector := make(map[string]int64)
for actor, clock := range vector {
id, err := time.ActorIDFromBytes(actor[:])
if err != nil {
return nil, err
}

pbVersionVector[id.String()] = clock
}

return &api.VersionVector{
Vector: pbVersionVector,
}, nil
}

// ToDocEventType converts the given model format to Protobuf format.
Expand Down Expand Up @@ -241,8 +263,13 @@ func ToChanges(changes []*change.Change) ([]*api.Change, error) {
return nil, err
}

pbChangeID, err := ToChangeID(c.ID())
if err != nil {
return nil, err
}

pbChanges = append(pbChanges, &api.Change{
Id: ToChangeID(c.ID()),
Id: pbChangeID,
Message: c.Message(),
Operations: pbOperations,
PresenceChange: ToPresenceChange(c.PresenceChange()),
Expand Down
Loading

0 comments on commit c78ffe7

Please sign in to comment.