-
Notifications
You must be signed in to change notification settings - Fork 265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ConditionalPutCommitHandler
for concurrency on S3, faster commit
#3483
Conversation
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
ConditionalPutCommitHandler
for concurrency on S3, faster commit
One follow up I found while working on this: #3487 |
services: | ||
minio: | ||
image: lazybit/minio | ||
ports: | ||
- 9000:9000 | ||
env: | ||
MINIO_ACCESS_KEY: ACCESSKEY | ||
MINIO_SECRET_KEY: SECRETKEY | ||
options: --name=minio --health-cmd "curl http://localhost:9000/minio/health/live" | ||
dynamodb-local: | ||
image: amazon/dynamodb-local | ||
ports: | ||
- 8000:8000 | ||
env: | ||
AWS_ACCESS_KEY_ID: ACCESSKEY | ||
AWS_SECRET_ACCESS_KEY: SECRETKEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced this with localstack
in docker-compose. That is now started in the run_integtest
action.
68d6474
to
82fbd9c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work! Feels like the end of an era. Can we get rid of RenameCommitHandler
? Or do we keep it around for a bit more?
docker-compose.yml
Outdated
- MINIO_SECRET_KEY=SECRETKEY | ||
- SERVICES=s3,dynamodb,kms | ||
- DEBUG=1 | ||
- LS_LOG=trace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much logging does trace end up being? I could see it being useful but also maybe drowning test output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. I'm fine removing that.
"aws_access_key_id": "ACCESS_KEY", | ||
"aws_secret_access_key": "SECRET_KEY", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't you specify these as ACCESSKEY
and SECRETKEY
in the docker compose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch. It turns out, localstack doesn't check these https://docs.localstack.cloud/references/credentials/#:~:text=The%20value%20of%20the%20secret%20access%20key%20are%20currently%20ignored%20by%20LocalStack.
That's why the unit tests are passing still.
let memory_store = ObjectStore::memory(); | ||
let dummy_path = "dummy"; | ||
manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?; | ||
let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just to minimize the time in the remote store call? Why not write directly into the remote store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just too lazy to refactor manifest_writer
to output a buffer that could be used using with put_opts()
. I will do that in a future PR.
if (buf.len() < 16 || !buf.ends_with(MAGIC)) && known_size.is_some() { | ||
return Box::pin(read_manifest(object_store, path, None)).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice robustness trick
rust/lance/src/io/commit.rs
Outdated
@@ -13,10 +13,7 @@ | |||
//! strategies. The default implementation for most object stores is | |||
//! [RenameCommitHandler], which writes the manifest to a temporary path, then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to ConditionalPutCommitHandler
?
rust/lance/src/io/commit/s3_test.rs
Outdated
.credentials_provider(credentials) | ||
.endpoint_url(CONFIG[2].1) | ||
.behavior_version(BehaviorVersion::latest()) | ||
.region(Region::new("us-east-1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.region(Region::new("us-east-1")) | |
.region(Region::new(CONFIG[5].1)) |
tokio::task::spawn(async move { | ||
let config = aws_config().await; | ||
let client = S3Client::new(&config); | ||
Self::delete_bucket(client, &bucket_name).await; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah...the infamous async drop 😄
|
||
// In case it wasn't deleted earlier | ||
Self::delete_table(client.clone(), name).await; | ||
tokio::time::sleep(std::time::Duration::from_millis(200)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining this sleep?
let num_rows = dataset.count_rows(None).await.unwrap(); | ||
assert_eq!(num_rows, data.num_rows() * (concurrency + 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe do a quick scan to make sure the data is all readable? Or perhaps even better, a call to dataset validate?
rust/lance/src/io/commit/s3_test.rs
Outdated
|
||
let bucket = S3Bucket::new("test-ddb-iops").await; | ||
let ddb_table = DynamoDBCommitTable::new("test-ddb-iops").await; | ||
// let uri = format!("s3://{}/", bucket.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// let uri = format!("s3://{}/", bucket.0); |
It's public so I figured I'd keep it for now. But probably not used by anyone so can deprecate it. |
ConditionalPutCommitHandler
, reducing the number of IOPS to write a manifest from 3 (put, copy-if-not-exists, delete) to just 1.size
field to dynamodb, eliminating need forHEAD
IOP when opening a table. This means we can open a table with only 1 object store IOP when using dynamodb manifest store. Closes Add size field to dynamodDB store #2995