Skip to content

Commit

Permalink
Update docs for current work coordination
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed Jan 24, 2025
1 parent f7f2f63 commit d4d5c76
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions RFS/docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ Important CMS features in use:

An RFS Worker “acquires” a work item by either winning an atomic creation or an optimistic update on the CMS. When it does so, it sets a maximum duration for it to complete work on the item as a part of the create/update operation. Ideally, it will use the CMS’s clock to do this. The RFS Worker is assumed to have a lease on that work item until that duration expires. If the work is not completed in that time, the RFS Worker will create a successor work item that defines the remaining work and mark the current work item as finished.

As a specific example, an RFS Worker queries the CMS to find an Elasticsearch Shard to migrate to the target cluster. The CMS returns a record corresponding to a specific Elasticsearch Shard’s progress that either has not been started or has an expired work lease, and the RFS Worker performs an optimistic update of its timestamp field, setting it (hypothetically) for 5 hours from the current time (according to the CMS’s clock).
As a specific example, an RFS Worker queries the CMS to find a work item corresponding to a starting point in an Elasticsearch shard to migrate to the target cluster. The CMS returns a record corresponding to a specific Elasticsearch shard’s progress that either has not been started or has an expired work lease, and the RFS Worker performs an optimistic update of its timestamp field, setting it (hypothetically) for 5 hours from the current time (according to the CMS’s clock).

RFS Workers regularly polls the CMS to see if their current work item’s lease has expired (according to the CMS’s clock); if they find it has expired, they kill themselves and allow an outside system to spin up a replacement RFS Worker.
RFS Workers regularly polls the CMS to see if their current work item’s lease has expired (according to the CMS’s clock); if they find it has expired, they (in most cases) create a successor work item including a progress cursor of the current progress through the shard, mark the original work item as completed, and kill the worker process. If shard setup (downloading, unpacking, and beginning to read the shard) takes too little (<2.5%) or too much (>10%) of the lease time, the lease time is considered wrong-sized and is adjusted for the successor item.

The process of finding the optimal initial work lease duration will be data driven based on actual usage statistics. The CMS will contain the duration of each work item after a RFS operation finishes, which can be used to iteratively improve the initial “guess”. Each RFS Worker is responsible for setting the duration for work items it attempts to acquire a lease for.

Expand All @@ -88,19 +88,22 @@ An RFS Worker retains no more than a single work lease at a time. If the work i

When an RFS Worker acquires a work item, it increments the lease time exponent that will be used on the subsequent attempt. The RFS Worker increases its requested work lease duration based on this exponent.

When some work is completed and a successor item is created, the successor lease time exponent is increased / decreased to maintain a subsequent worker using up 90%-97.5% of the lease time sending docs versus setting up work (e.g. downloading/extracting shard)
When some work is completed and a successor item is created, the successor lease time exponent is increased / decreased to maintain a subsequent worker using up 90%-97.5% of the lease time sending docs versus setting up work (e.g. downloading/extracting shard).

The algorithm for backoff based on number of attempts and the maximum number of attempts to allow will both be data driven and expected to improve with experience.

### Don’t touch existing templates or indices
### Idempotent Successor Item Creation

While performing its work, RFS Workers will not modify any Templates or Index Settings on the target cluster. They instead assume that another, separate process has pre-configured those entries in order to correctly receive the documents being reindexed into the target cluster. This approach allows separation of concerns between the RFS Workers and the metadata migration process. It also provides space for users to customize/configure the target cluster differently than the source cluster.
A successor item is created when a worker does not complete the shard referenced in the work item. Because OpenSearch does not have a concept of transactions, the RFS Worker executes a series of actions against the CMS that allow it to remain in a safe state throughout, if the original worker dies or a new worker picks up the original work item.

### Overwrite documents by ID
1. Determine the successor work item id. This is deterministic and just a combination of the shard name and progress cursor, which is the offset within the shard representing how far the worker was able to progress.
2. Update the `successorWorkItems` field of the original work item with the successor work item id, with a server-side check to ensure that the original work item is still owned by the current worker (validating the `leaseHolderId` matches the one supplied by the worker).
3. Create the new work item with ID that was previously determined, as long as it does not currently exist.
4. Mark the original work item as completed, again validating that the current worker still owns the lease for the work item.

While performing its work, if an RFS Worker is tasked to create an Elasticsearch Document on the target cluster, it will do so by using the same ID as on the source cluster, clobbering any existing Elasticsearch Document on the target cluster with that ID. The reasoning for this policy is as follows.
If this process fails at any point before step 4 is successful, another worker will acquire the lease for the original work item. However, before beginning work, the worker checks whether the `successorWorkItems` field was set. If so, it knows that a previous worker completed up to the point indicated by the successor work item id, and it picks up the work of steps 3 and 4 (creating the successor work item, if it doesn't exist, and marking the original as complete).

The pending work items are the remaining docs for each Elasticsearch Shard. The RFS Workers have a consistent view of the position of a document within the entire shard. If a lease is about to expire and a shard has not been fully migrated, the RFS Workers use the latest continuous migrated doc number to reduce the duplicate work a successor work item has.
If this process fails before the `successorWorkItems` field is updated, a new worker will acquire the lease, but it won't be clear to that worker that this work was already started, so it will begin from the progress cursor in the original work item. While this means that excess document processing and reindexing work is being done, it does not put the CMS in an inconsistent state.

## How the RFS Worker works

Expand Down Expand Up @@ -184,6 +187,18 @@ If no Entry is returned, we know that this sub-phase is complete and attempt to

The work lease for this sub-phase is on the Shard (ensuring every Elasticsearch Document in that Shard has been processed). We log/emit metrics to indicate how many Documents are successfully and unsuccessfully migrated but we don’t consider the Shard Work Entry to have failed if some (or even all) of the Documents in it are unsuccessfully migrated. We only retry the Shard Work Entry when an RFS Worker fails to process every Document within the lease window. These retries are relatively time consuming, but safe because we overwrite any partial work performed by a previous RFS Worker.

### Notes on Reindexing Behavior

#### Don’t touch existing templates or indices

While performing its work, RFS Workers will not modify any Templates or Index Settings on the target cluster. They instead assume that another, separate process has pre-configured those entries in order to correctly receive the documents being reindexed into the target cluster. This approach allows separation of concerns between the RFS Workers and the metadata migration process. It also provides space for users to customize/configure the target cluster differently than the source cluster.

#### Overwrite documents by ID

While performing its work, if an RFS Worker is tasked to create an Elasticsearch Document on the target cluster, it will do so by using the same ID as on the source cluster, clobbering any existing Elasticsearch Document on the target cluster with that ID. The reasoning for this policy is as follows.

The pending work items are the remaining docs for each Elasticsearch Shard. The RFS Workers have a consistent view of the position of a document within the entire shard. If a lease is about to expire and a shard has not been fully migrated, the RFS Workers use the latest continuous migrated doc number to reduce the duplicate work a successor work item has.

## Appendix: Assumptions

We start with the following high-level assumptions about the structure of the solution. Changes to these assumptions would likely have a substantial impact on the design.
Expand Down Expand Up @@ -221,12 +236,12 @@ FIELDS:
* leaseExpiry (timestamp): When the current work lease expires
* nextAcquisitionLeaseExponent (integer): Times the task has been attempted
SHARD WORK ENTRY RECORD
ID: <name of the index to be migrated>_<shard number>
SHARD COMPONENT WORK ENTRY RECORD
ID: <name of the index to be migrated>_<shard number>_<progress cursor>
FIELDS:
* indexName (string): The index name
* shardId (integer): The shard number
* status (string): NOT_STARTED, COMPLETED, FAILED
* leaseExpiry (timestamp): When the current work lease expires
* leaseHolderId (string): Unique ID of the RFS worker that currently holds the lease
* nextAcquisitionLeaseExponent (integer): Times the task has been attempted
* successorWorkItems (list of strings): Follow-up work item(s) created from this parent item
* completedAt (timestamp): When this work item was completed
```

0 comments on commit d4d5c76

Please sign in to comment.