Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce VersionVector to detect the relationship between changes #800

Closed
wants to merge 11 commits into from
Prev Previous commit
Next Next commit
Remove duplicated version vector from pack (#1024)
Unify VersionVectors in ChangePack across three scenarios:

1. Pushing pack to server: represents document's current version vector
2. Pulling pack: represents minSyncedVersionVector for GC
3. Pulling pack(snapshot): represents snapshot's version vector at creation

This commit simplifies the codebase and ensures consistent version
vector handling throughout the document synchronization process.
  • Loading branch information
JOOHOJANG authored Oct 2, 2024
commit d267c11aa1e07ac1ea20867343b0e80fc16a30bf
20 changes: 7 additions & 13 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,19 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

minSyncedVersionVector, err := FromVersionVector(pbPack.MinSyncedVersionVector)
if err != nil {
return nil, err
}

minSyncedTicket, err := fromTimeTicket(pbPack.MinSyncedTicket)
if err != nil {
return nil, err
}

pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedVersionVector: minSyncedVersionVector,
MinSyncedTicket: minSyncedTicket,
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedTicket: minSyncedTicket,
}

return pack, nil
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
1,646 changes: 816 additions & 830 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ message ChangePack {
repeated Change changes = 4;
bool is_removed = 6;
VersionVector version_vector = 7;
VersionVector min_synced_version_vector = 8;

TimeTicket min_synced_ticket = 5; // Deprecated
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use Protobuf deprecation option for min_synced_ticket

Consider using the Protobuf deprecated option to formally deprecate the min_synced_ticket field instead of a comment. This helps code generators provide appropriate warnings.

Update the field definition to:

-TimeTicket min_synced_ticket = 5; // Deprecated
+TimeTicket min_synced_ticket = 5 [deprecated = true];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
TimeTicket min_synced_ticket = 5; // Deprecated
TimeTicket min_synced_ticket = 5 [deprecated = true];

}
Expand Down
5 changes: 0 additions & 5 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ type Pack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (d *Document) Update(
// ApplyChangePack applies the given change pack into this document.
func (d *Document) ApplyChangePack(pack *change.Pack) error {
// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
hasSnapshot := len(pack.Snapshot) > 0

if hasSnapshot {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
Expand Down Expand Up @@ -215,13 +217,13 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.doc.checkpoint = d.doc.checkpoint.Forward(pack.Checkpoint)

// 04. Do Garbage collection.
if !d.options.DisableGC {
d.GarbageCollect(pack.MinSyncedVersionVector)
if !d.options.DisableGC && !hasSnapshot {
d.GarbageCollect(pack.VersionVector)
}

// 05. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func TestDocument(t *testing.T) {
docB := document.New("doc")
docB.SetActor(actorB)
assert.Equal(t, "{}", docB.VersionVector().Marshal())
// NOTE(JOOHOJANG): Normally, docB's Lamport timestamp should be included in pack.versionVector because pack is applied after docB is attached.
// However, since this is not the case in this test method, docB's Lamport timestamp is manually added to packA's versionVector.
// In actual use, since changePacks cannot be exchanged directly between clients without going through a server, the following handling was added.
packA.VersionVector.Set(docB.ActorID(), docB.VersionVector().VersionOf(docB.ActorID()))
assert.NoError(t, docB.ApplyChangePack(packA))
assert.Equal(t, "{000000000000000000000001:2,000000000000000000000002:3}", docB.VersionVector().Marshal())

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (d *InternalDocument) HasLocalChanges() bool {

// ApplyChangePack applies the given change pack into this document.
func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) error {
hasSnapshot := len(pack.Snapshot) > 0

// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
if hasSnapshot {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
return err
}
Expand All @@ -167,15 +169,15 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er
// 03. Update the checkpoint.
d.checkpoint = d.checkpoint.Forward(pack.Checkpoint)

if !disableGC && pack.MinSyncedTicket != nil {
if _, err := d.GarbageCollect(pack.MinSyncedVersionVector); err != nil {
if !disableGC && pack.VersionVector != nil && !hasSnapshot {
if _, err := d.GarbageCollect(pack.VersionVector); err != nil {
return err
}
}

// 04. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func PushPull(
if err != nil {
return nil, err
}
respPack.MinSyncedVersionVector = minSyncedVersionVector
if respPack.SnapshotLen() == 0 {
respPack.VersionVector = minSyncedVersionVector
}

// TODO(hackerwins): This is a previous implementation before the version
// vector was introduced. But it is necessary to support the previous
Expand Down Expand Up @@ -214,7 +216,7 @@ func PushPull(
ctx,
be,
docInfo,
minSyncedTicket,
minSyncedVersionVector,
); err != nil {
logging.From(ctx).Error(err)
}
Expand Down
14 changes: 0 additions & 14 deletions server/packs/serverpacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ type ServerPack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down Expand Up @@ -150,15 +145,6 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {

pbPack.VersionVector = pbVersionVector

if p.MinSyncedVersionVector != nil {
pbMinSyncedVersionVector, err := converter.ToVersionVector(p.MinSyncedVersionVector)
if err != nil {
return nil, err
}

pbPack.MinSyncedVersionVector = pbMinSyncedVersionVector
}

return pbPack, nil
}

Expand Down
14 changes: 10 additions & 4 deletions server/packs/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func storeSnapshot(
ctx context.Context,
be *backend.Backend,
docInfo *database.DocInfo,
minSyncedTicket *time.Ticket,
minSyncedVersionVector time.VersionVector,
) error {
// 01. get the closest snapshot's metadata of this docInfo
docRefKey := docInfo.RefKey()
Expand Down Expand Up @@ -91,13 +91,19 @@ func storeSnapshot(
nil,
nil,
)
pack.MinSyncedTicket = minSyncedTicket

if err := doc.ApplyChangePack(pack, be.Config.SnapshotDisableGC); err != nil {
return err
}

// 04. save the snapshot of the docInfo
// 04. perform garbage collect to remove tombstones
if !be.Config.SnapshotDisableGC {
if _, err := doc.GarbageCollect(minSyncedVersionVector); err != nil {
return err
}
Comment on lines +102 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error messages by wrapping errors with context.

When returning errors, consider adding context to the error message to facilitate debugging.

Apply this diff:

-            return err
+            return fmt.Errorf("garbage collection failed: %w", err)

Ensure that the fmt package is imported.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return err
}
return fmt.Errorf("garbage collection failed: %w", err)
}

}

Comment on lines +99 to +105
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add logging after garbage collection for better observability.

Including a log statement after a successful garbage collection can aid in monitoring and debugging.

Apply this diff to add logging:

     }
+    logging.From(ctx).Infof(
+        "Garbage collection completed for document '%s'",
+        docInfo.Key,
+    )
     // 05. save the snapshot of the docInfo

Committable suggestion was skipped due to low confidence.

// 05. save the snapshot of the docInfo
if err := be.DB.CreateSnapshotInfo(
ctx,
docRefKey,
Expand All @@ -106,7 +112,7 @@ func storeSnapshot(
return err
}

// 05. delete changes before the smallest in `syncedseqs` to save storage.
// 06. delete changes before the smallest in `syncedseqs` to save storage.
if be.Config.SnapshotWithPurgingChanges {
if err := be.DB.PurgeStaleChanges(
ctx,
Expand Down
Loading