Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to object_store crate (#2489) #2677

Merged
merged 16 commits into from
Jul 4, 2022
Merged

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jun 1, 2022

Which issue does this PR close?

Closes #2489

Rationale for this change

See ticket

What changes are included in this PR?

Switches DataFusion to using object_store crate in place

Are there any user-facing changes?

Yes this moves to using the object_store crate.

Does this PR break compatibility with Ballista?

Possibly

@tustvold tustvold added the api change Changes the API exposed to users of the crate label Jun 1, 2022
@github-actions github-actions bot added core Core DataFusion crate datafusion Changes in the datafusion crate labels Jun 1, 2022
@andygrove andygrove removed the datafusion Changes in the datafusion crate label Jun 3, 2022
@codecov-commenter
Copy link

codecov-commenter commented Jun 6, 2022

Codecov Report

Merging #2677 (54dd6d2) into master (88b88d4) will decrease coverage by 0.10%.
The diff coverage is 89.03%.

@@            Coverage Diff             @@
##           master    #2677      +/-   ##
==========================================
- Coverage   85.26%   85.15%   -0.11%     
==========================================
  Files         275      275              
  Lines       48830    48846      +16     
==========================================
- Hits        41633    41597      -36     
- Misses       7197     7249      +52     
Impacted Files Coverage Δ
datafusion/core/src/catalog/schema.rs 84.48% <ø> (-0.52%) ⬇️
...afusion/core/src/physical_plan/file_format/avro.rs 0.00% <0.00%> (ø)
datafusion/core/src/physical_plan/mod.rs 88.00% <ø> (ø)
datafusion/core/src/datasource/file_format/avro.rs 61.53% <50.00%> (-8.03%) ⬇️
datafusion/core/src/datasource/file_format/json.rs 93.75% <64.28%> (-5.13%) ⬇️
datafusion/common/src/error.rs 80.00% <66.66%> (-2.28%) ⬇️
datafusion/core/src/datasource/listing/mod.rs 55.55% <66.66%> (+10.10%) ⬆️
...afusion/core/src/physical_plan/file_format/json.rs 91.06% <77.77%> (-2.13%) ⬇️
datafusion/core/src/datasource/file_format/csv.rs 98.91% <80.00%> (-1.09%) ⬇️
datafusion/core/tests/path_partition.rs 85.86% <80.55%> (-0.69%) ⬇️
... and 26 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 88b88d4...54dd6d2. Read the comment docs.

@tustvold
Copy link
Contributor Author

tustvold commented Jun 8, 2022

I think this is now ready for review, I've created #2711 which uses currently unreleased functionality in arrow-rs to do byte range fetches to object storage.

This PR does represent a 10-20% performance regression in the parquet SQL benchmarks when operating on local files. This largely results from moving from spawn_blocking and the corresponding scheduler implications documented in apache/arrow-rs#1473.

However, I am inclined to think this is fine for a couple of reasons:

  • The new scheduler, which is currently blocked by this PR, was specifically created to address this scheduling disparity
  • The difference becomes inconsequential for any non-trivial queries
  • The ongoing work by @Ted-Jiang will help to reduce the IO costs of parquet
  • I think this lays a solid foundation on which we can iterate

@tustvold tustvold marked this pull request as ready for review June 8, 2022 16:25
);
}

#[cfg(target_os = "windows")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is removed as it no longer makes sense, paths are normalized

let path = url.path().strip_prefix('/').unwrap();
replacements.push((path.to_string(), key.to_string()));
}
// Push URL representation of path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Standardized paths 🎉

Err(_) => Ok(Box::pin(stream)),
}
let stream =
FileStream::new(&self.base_config, partition_index, context, opener)?;
Copy link
Contributor Author

@tustvold tustvold Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parquet now uses the same FileStream interface as other formats, this reduces code duplication

@andygrove
Copy link
Member

@tustvold I am planning on creating the 9.0.0 RC on Friday. Would we want to hold off merging this until after the 9.0.0 release?

@tustvold
Copy link
Contributor Author

tustvold commented Jun 8, 2022

Would we want to hold off merging this until after the 9.0.0 release

That isn't really my call to make, especially since IOx consumes via git pin and not a release, however, I would say:

  • Without Use ParquetRecordBatchStream #2711 which is dependent on the next arrow-rs release, this may represent a regression for people consuming data from remote object storage, although its a bit of an apples and oranges comparison, fetching whole files vs futures::block_on range requests, and unclear which is necessarily better
  • The sooner we make the switch the less painful it will be
  • I'm not sure in what capacity people are using the current object store interface

My personal preference would be for 9.0.0 to include the switch so we can start to bring the ecosystem along, but I'm not sure if the timings will work out for that and I don't feel especially strongly. @alamb probably has a view on this.

@alamb
Copy link
Contributor

alamb commented Jun 9, 2022

My personal preference would be for 9.0.0 to include the switch so we can start to bring the ecosystem along, but I'm not sure if the timings will work out for that and I don't feel especially strongly. @alamb probably has a view on this.

I also recommend waiting until after the 9.0.0 release. Rationale:

  1. The datafusion releases are already substantial effort, so anything we can do to reduce the potential issues requiring a second one is good
  2. I think given the wide ranging implications of the PR up/down the ecosystem, we should have some additional reviewers on it prior to merging
  3. I believe @tustvold will be out next week so waiting for his return is probably the wisest course of action

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the code; Thank you very much @tustvold. I love to see the unification plan coming together. Really nice work

Prior to merging this PR, I recommend the following steps:

  1. Make sure we can get Ballista to compile
  2. Run some basic parquet based benchmarks (e.g. the tpch ones)
  3. Send a note to the dev@arrow.apache.org mailing list with a link to this PR (and also maybe on slack)
  4. Get some other opinions (e.g. @yjshen @timvw @matthewmturner @kyotoYaho and @thinkharderdev perhaps) from people who use the existing object store abstraction. There are at least three crates on datafusion-contrib that would seem to use itL https://github.com/datafusion-contrib?q=objectstore&type=all&language=&sort=

chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "8.0.0", features = ["parquet"] }
datafusion-data-access = { path = "../data-access", version = "8.0.0" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also perhaps remove the data-access directory as part of the same PR?

}))?;
};

let schema = match store.get(&object.location).await? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how this interface allows for specialized access of LocalFiles as well as streams 👍

@@ -580,7 +554,7 @@ mod tests {
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
assert_eq!(1, batches[0].num_rows());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we now use FileStream which slices the returned batches based on the provided limit

enum FileStreamState {
Idle,
Open {
future: ReaderFuture,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can add some docstrings -- especially for what future represents

self.next_batch().transpose()
})
.transpose()
fn poll_inner(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matthewmturner
Copy link
Contributor

This is great work - really excited to get this integrated. I hope to provide some comments / questions this weekend.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. We has actually come to the same approach in our project. When fetching from an external object store it turned out to be much more efficient to prefetch the entire file into memory than to try and do a lot of sequential range requests.

I wonder if there is more to gain (in a future iteration of course :)) by reading the metadata and then doing buffered prefetch of only the projected columns and non-pruned row groups. If we can also crack the metadata caching then this should be a pure win.

@alamb
Copy link
Contributor

alamb commented Jun 10, 2022

I wonder if there is more to gain (in a future iteration of course :)) by reading the metadata and then doing buffered prefetch of only the projected columns and non-pruned row groups. If we can also crack the metadata caching then this should be a pure win.

I think this is precisely what @tustvold is working towards -- I am not sure we have a unified vision writeup / ticket anywhere but we are working on one...

@tustvold
Copy link
Contributor Author

I think this is precisely what @tustvold is working towards

Indeed, #2711 adds buffered prefetch of projected columns and non-pruned row groups, using the functionality added in apache/arrow-rs#1803. Further with the work of @Ted-Jiang on ColumnIndex support apache/arrow-rs#1705, we may in the not to distant future support page-level pushdown 🎉

will be out next week so waiting for his return is probably the wisest course of action

I am out for the next week and a bit, and am not sure how much time I will have to work on this, but please do leave feedback and I'll get to it on my return 😄

@alamb
Copy link
Contributor

alamb commented Jun 29, 2022

@tustvold -- given the work for #2226, is the the eventual plan to interleave IO and CPU decoding?

I wonder if we can find some workaround so that @Ted-Jiang and his team doesn't lose performance while we continue to make progress (e.g. could we fetch to local file? or put in some hack for people who wanted to decode using a blocking IO thread or something)

@alamb alamb closed this Jun 29, 2022
@alamb alamb reopened this Jun 29, 2022
@tustvold
Copy link
Contributor Author

is the the eventual plan to interleave IO and CPU decoding

Yes, once we properly support reading and writing the column index structures apache/arrow-rs#1705 we will have sufficient information to interleave IO at the page-level. Currently ParquetRecordBatchStream does not have information on where the pages are actually located, which means it cannot interleave IO at a granularity lower than the column chunk. That being said we could potentially use a heuristic and only fetch the first 1MB or something, I'll have an experiment 🤔

Full disclosure the column in question is somewhat degenerate, it is 106MB over 100x 1MB pages across two row groups. Another obvious way to improve the performance would be to reduce the size of the row groups.

@tustvold
Copy link
Contributor Author

So here is where we stand with regards to this PR:

Pros

Cons

  • Slightly higher memory usage for some queries as buffers encoded column chunks instead of reading pages on-demand
  • Queries to local files with column chunks containing large numbers of pages may be slower

Conclusion

I therefore think on-balance this PR represents a step forward, with the only regression mitigated by using smaller row groups.

@alamb
Copy link
Contributor

alamb commented Jun 29, 2022

Given the tradeoffs articulated by @tustvold in #2677 (comment) I think we should merge this PR.

@Ted-Jiang what do you think?

cc @thinkharderdev @matthewmturner @andygrove @liukun4515 @yjshen @wjones127 @houqp -- any other thoughts / concerns before doing so? It will cause churn downstream but we have know that ever since #2489 was proposed

@alamb alamb closed this Jun 29, 2022
@alamb alamb reopened this Jun 29, 2022
@matthewmturner
Copy link
Contributor

+1 for me. I will add a note to the README of the s3 object store repo to let users know of the new crate.

@matthewmturner
Copy link
Contributor

I apologize if i missed it (the github UI is being buggy for me right now) but it might be worth adding to the docs examples of how to use this with different object_store features enabled. this could be done as a follow on though.

@Ted-Jiang
Copy link
Member

Ted-Jiang commented Jun 30, 2022

Finally got profiles (by switching the VM to fedora), and it certainly fits with my hypothesis above

image

On the left we have master, and right this branch. The CPU activity under parquet_query_s demarcates each benchmark iteration, within this you have two row groups being read. We can clearly see that with this PR there is a noticeable delay as it fetches the bytes into memory before starting decoding the data, whereas master interleaves the IO and decoding. There is a trade-off here, the approach of master is faster for this particular benchmark, but comes at the cost of stalling out worker threads on IO that could have been doing other work during decode.

There are some ways we could potentially improve this, e.g. interleaving IO at the page instead of column chunk, but this is unlikely to help with object storage and may actually perform worse. I'm not sure if this is something worth optimising, but would appreciate other people's thoughts

@tustvold Thanks a lot for your sharing 👍.
I am not clear about the whereas master interleaves the IO and decoding i think master use block IO, decode must wait for IO. this patch uses interleaving with async function to reduce the blocked IO.

And about the delay as it fetches the bytes into memory, is it cause of the IO unit is large even use async reader.

If i miss something plz tell me 😂

@Ted-Jiang
Copy link
Member

I wonder if we can find some workaround so that @Ted-Jiang and his team doesn't lose performance while we continue to make progress (e.g. could we fetch to local file? or put in some hack for people who wanted to decode using a blocking IO thread or something)

@alamb Thanks for your kindly attention ❤️ I think this change is reasonable !
we can keep both async and non-async, we will test in our env.

@andygrove
Copy link
Member

I haven't reviewed the changes here yet but I have no objection to this being merged if the community supports it.

@rdettai
Copy link
Contributor

rdettai commented Jun 30, 2022

That being said, I'm not really sure I agree that the object store abstraction is all that core to DataFusion. It is just an IO abstraction used at the edges of plans

That's quite of a lot files that got modified for switching an "IO abstraction used at the edge of plans" 😄. I also believe that reading the data in from files is very crucial to an analytics query engine. Indeed it isn't core in the sense that you can do things with your engine without it (reading in memory or streaming data...), but it is still one of its main use case and more importantly, a critical performance bottleneck. And as always with optimization, you sometime need to bend the separation of concern a bit to reach your goal, which means that you will need to tweak the abstraction to get the performance you want (as you can see with topics like prefetch strategies....). And this can be made more complicated if we refer to an external repository that is not owned by us.

TL;DR: I would also be more comfortable with this change if we first integrated the object store abstraction into the repository.

@rdettai
Copy link
Contributor

rdettai commented Jun 30, 2022

I would be interested with @wesm point of view on this governance question. Just to recap the question:
-> we are about to replace the file system abstraction (that we call object store here, https://github.com/apache/arrow-datafusion/tree/master/datafusion/data-access) with an external one that is currently owned by InfluxData (https://github.com/influxdata/object_store_rs/blob/main/src/lib.rs). They are some concerns about whether this is a wise decision or not.

@tustvold
Copy link
Contributor Author

tustvold commented Jun 30, 2022

I am not clear about the whereas master interleaves the IO and decoding i think master use block IO, decode must wait for IO. this patch uses interleaving with async function to reduce the blocked IO.

Master interleaves IO at the page level, reading individual pages as required, blocking the calling thread as it does so. This branch instead performs async IO fetching column chunks into memory without blocking threads, this is significantly better for object stores, but will perform "worse" for certain workloads accessing local files where the approach on master may be faster, but with the obvious drawback of blocking threads.

if we first integrated the object store abstraction into the repository.

I would be fine waiting until the donation to arrow-rs goes through (influxdata/object_store_rs#41) but I had hoped that given this intent had been clearly broadcast, rather than waiting the 3 or so weeks it will take to go through this process, we could just get this in. What do you think?

@wesm
Copy link
Member

wesm commented Jun 30, 2022

I think it's fine to switch without the code donation and not wait, but if you think that other DataFusion contributors will want to participate in the maintenance and governance of the object store crate, then doing the code donation sounds like a good idea to me.

@rdettai
Copy link
Contributor

rdettai commented Jul 1, 2022

Great! I missed that the donation was in progress. Obviously no need to wait then 😉

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to give this a final review and merge tomorrow unless anyone objects. Thank you all

@alamb
Copy link
Contributor

alamb commented Jul 2, 2022

I took the liberty of merging up from master to resolve some conflicts in Cargo.toml

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consider adopting IOx ObjectStore abstraction
9 participants