-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Add read_clickhouse API to read ClickHouse Dataset #49060
[Data] Add read_clickhouse API to read ClickHouse Dataset #49060
Conversation
Signed-off-by: Connor Sanders <connor@elastiflow.com>
@alexeykudinkin This is the fresh PR for the ClickHouse datasource. The current state of the code here addresses your latest feedback:
Also here's the stacked follow up PR that adds support for filtering: jecsand838#2 |
offset += num_rows | ||
return read_tasks | ||
|
||
def _get_estimate(self, query_type: str) -> Optional[int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd suggest to restructure it like following
- Create 2 methods for
_get_estimate_size
,_get_estimate_count
- Make both of them use
_execute_query
common method (accepting target query)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this change in my latest commit.
import pyarrow as pa | ||
|
||
client = self._init_client() | ||
query = f"SELECT * FROM ({self._query}) LIMIT {num_rows} OFFSET {offset}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jecsand838 does Clickhouse guarantee to always traverse the rows in the same order?
If that's not the case we'd have to revert back to reading the whole dataset in 1 task unfortunately (we've been recently bitten by this for SQL datasource)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin You have to explicitly define an ORDER BY
clause to guarantee order. Let me know if that's acceptable (I can update the docs and make order_by
a required parameter) or if we need to revert still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually there maybe a way to get the behavior we want in ClickHouse. I'll look into it and get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, what we can do is following:
- If users want parallelism we will require order-by to be specified
- If they don't care/can't provider order-by then will be read as 1 task
Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin That makes sense. I went ahead and attempted those changes. Let me know what you think.
with mock.patch( | ||
"ray.data.block.BlockAccessor.for_block", return_value=mock_block_accessor | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem with this approach is that this will mock this static method for all invocations not just 1 you intend to.
Instead let's just introduce a method producing estimates you need (size, num of rows) and mock that here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this change in my latest commit.
…ble ClickHouse datasource parallelism, improved clickhouse_test mock Signed-off-by: Connor Sanders <connor@elastiflow.com>
Signed-off-by: Connor Sanders <connor@elastiflow.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your contribution @jecsand838!
return self._execute_query(self._estimates["count"]) | ||
|
||
def _get_estimate_size(self) -> Optional[int]: | ||
return self._execute_query(self._estimates["size"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's just use constants to avoid indirection
return self._execute_query(self._estimates["size"]) | |
return self._execute_query(_SIZE_ESTIMATE_QUERY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin I actually went down somewhat of a rabbit hole with this one. I went ahead and implemented a query template system using constants, which should be more aligned with what you're wanting.
I also went ahead and pivoted from using LIMIT / OFFSET clauses to OFFSET / FETCH clauses in the queries. The impact on ClickHouse's CPU overhead was substantial.
To showcase this, here are two comparisons I ran locally using Docker:
Red is LIMIT / OFFSET
Yellow is OFFSET / FETCH
- Evaluation 1
![evalStats1](https://private-user-images.githubusercontent.com/170039284/394612524-ce630a27-240a-496d-aef7-6b02b70a6652.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk2MzA4NzMsIm5iZiI6MTczOTYzMDU3MywicGF0aCI6Ii8xNzAwMzkyODQvMzk0NjEyNTI0LWNlNjMwYTI3LTI0MGEtNDk2ZC1hZWY3LTZiMDJiNzBhNjY1Mi5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjUwMjE1JTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI1MDIxNVQxNDQyNTNaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT02YTRiOTIwZWU2ZDRiYjVlZTM0MDgwMjZlNWNlNzEwZTFjNmE1ODM1ZTAzMGNlZmJiZTFkYzc4YzU1MGQ2OWFjJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCJ9._g8HghSu3tOsDTyvlHK9JYJ9EdKuw3wY7zAIFYFQE_U)
- Evaluation 2
![evalStats2](https://private-user-images.githubusercontent.com/170039284/394612601-93b87f36-c239-441c-8377-1b127c991e40.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk2MzA4NzMsIm5iZiI6MTczOTYzMDU3MywicGF0aCI6Ii8xNzAwMzkyODQvMzk0NjEyNjAxLTkzYjg3ZjM2LWMyMzktNDQxYy04Mzc3LTFiMTI3Yzk5MWU0MC5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjUwMjE1JTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI1MDIxNVQxNDQyNTNaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT0yNDZjZTBhMzJlODRlMTYxYjE2ZjRlYjE2NzI1YTI2MWU0YmMzYjFmNDQ0NWNkODE4MTkxZTVlZjY0NTlmZGZiJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCJ9.dLSoLBJQDUMMCO2FYCcOuFPYq8if95zILLjOV6IbIpk)
Overall the Dataset execution was about ~2% faster using OFFSET / FETCH as well.
Here's a screenshot of one of the OFFSET / FETCH executions as evidence that the approach is working:
![Screenshot 2024-12-10 at 11 31 05 PM](https://private-user-images.githubusercontent.com/170039284/394616576-518bf8cc-ac09-41a2-a0b4-8483838a9858.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzk2MzA4NzMsIm5iZiI6MTczOTYzMDU3MywicGF0aCI6Ii8xNzAwMzkyODQvMzk0NjE2NTc2LTUxOGJmOGNjLWFjMDktNDFhMi1hMGI0LTg0ODM4MzhhOTg1OC5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjUwMjE1JTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI1MDIxNVQxNDQyNTNaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT04YjY0N2Q1YTBmNjRmMzVmYWUzM2YxZDBmMmM5YWJlYTAyMDM0MDJhMWZkZTVmNzFjYTUzMzI0MTQ3YTY1NThjJlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCJ9.EM6wm6Yp783P_tq7iLdGi8CMjTwy7-SapLcw1Qjw_3E)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, great analysis @jecsand838!
Offset is known to be an operation having non-trivial overhead on the DB and this tweak LGTM
) | ||
parallelism = 1 | ||
num_rows_per_block = num_rows_total // parallelism | ||
num_blocks_with_extra_row = num_rows_total % parallelism |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment elaborating what you're doing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more comments to the code. Both there and in other places.
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: Connor Sanders <connor@elastiflow.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: Connor Sanders <connor@elastiflow.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: Connor Sanders <connor@elastiflow.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: Connor Sanders <connor@elastiflow.com>
…H clauses in ClickHouse datasource, and improved comments Signed-off-by: Connor Sanders <connor@elastiflow.com>
…t#49060) Greetings from ElastiFlow! This PR introduces a new ClickHouseDatasource connector for Ray, which provides a convenient way to read data from ClickHouse into Ray Datasets. The ClickHouseDatasource is particularly useful for users who are working with large datasets stored in ClickHouse and want to leverage Ray's distributed computing capabilities for AI and ML use-cases. We found this functionality useful while evaluating ML technologies and wanted to contribute this back. Key Features and Benefits: 1. **Seamless Integration**: The ClickHouseDatasource allows for seamless integration of ClickHouse data into Ray workflows, enabling users to easily access their data and apply Ray's powerful parallel computation. 2. **Custom Query Support**: Users can specify custom columns, and orderings, allowing for flexible query generation directly from the Ray interface, which helps in reading only the necessary data, thereby improving performance. 3. **User-Friendly API**: The connector abstracts the complexity of setting up and querying ClickHouse, providing a simple API that allows users to focus on data analysis rather than data extraction. Tested locally with a ClickHouse table containing ~12m records. <img width="1340" alt="Screenshot 2024-11-20 at 3 52 42 AM" src="https://github.com/user-attachments/assets/2421e48a-7169-4a9e-bb4d-b6b96f7e502b"> PLEASE NOTE: This PR is a continuation of ray-project#48817, which was closed without merging. --------- Signed-off-by: Connor Sanders <connor@elastiflow.com> Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
…t#49060) Greetings from ElastiFlow! This PR introduces a new ClickHouseDatasource connector for Ray, which provides a convenient way to read data from ClickHouse into Ray Datasets. The ClickHouseDatasource is particularly useful for users who are working with large datasets stored in ClickHouse and want to leverage Ray's distributed computing capabilities for AI and ML use-cases. We found this functionality useful while evaluating ML technologies and wanted to contribute this back. Key Features and Benefits: 1. **Seamless Integration**: The ClickHouseDatasource allows for seamless integration of ClickHouse data into Ray workflows, enabling users to easily access their data and apply Ray's powerful parallel computation. 2. **Custom Query Support**: Users can specify custom columns, and orderings, allowing for flexible query generation directly from the Ray interface, which helps in reading only the necessary data, thereby improving performance. 3. **User-Friendly API**: The connector abstracts the complexity of setting up and querying ClickHouse, providing a simple API that allows users to focus on data analysis rather than data extraction. Tested locally with a ClickHouse table containing ~12m records. <img width="1340" alt="Screenshot 2024-11-20 at 3 52 42 AM" src="https://github.com/user-attachments/assets/2421e48a-7169-4a9e-bb4d-b6b96f7e502b"> PLEASE NOTE: This PR is a continuation of ray-project#48817, which was closed without merging. --------- Signed-off-by: Connor Sanders <connor@elastiflow.com> Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
Why are these changes needed?
Greetings from ElastiFlow!
This PR introduces a new ClickHouseDatasource connector for Ray, which provides a convenient way to read data from ClickHouse into Ray Datasets. The ClickHouseDatasource is particularly useful for users who are working with large datasets stored in ClickHouse and want to leverage Ray's distributed computing capabilities for AI and ML use-cases. We found this functionality useful while evaluating ML technologies and wanted to contribute this back.
Key Features and Benefits:
Tested locally with a ClickHouse table containing ~12m records.
PLEASE NOTE: This PR is a continuation of #48817, which was closed without merging.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.