Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write forum post asking about how to convert Zarr to Parquet #89

Closed
Tracked by #90
lewfish opened this issue Aug 2, 2022 · 5 comments
Closed
Tracked by #90

Write forum post asking about how to convert Zarr to Parquet #89

lewfish opened this issue Aug 2, 2022 · 5 comments
Assignees

Comments

@lewfish
Copy link
Contributor

lewfish commented Aug 2, 2022

No description provided.

@vlulla
Copy link
Contributor

vlulla commented Aug 17, 2022

In our attempts to convert the NWM retrospective zarr data to parquet we've run into a snag. We have been unable to convert the zarr to parquet using JupyterHub on a dask cluster. We have run into issues with the jupyter kernel crashing, workers getting killed (in dask-worker), and cancelled errors (in dask-scheduler). It appears that these errors are from the different parts of the libraries/systems we are using.

The jupyter kernel crashes are due to the kernel running out of memory! Initially, we started with a small jupyterhub instance (8GiB) which crashed the kernel for even very small selections/subsets (even a month would crash the kernel...we could only select 14 days worth of data for the 122_256 feature_ids that are in the subuset)! But now that Justin has provisioned a larger instance (16GiB) on which to run the jupyterhub we can make selections/subsets over larger time horizon (3 years without any problems). However, there still appears to be the central problem (very aptly articulated by Justin) that the statement ds.sel(time=slice('1990-01-01','1994-12-31')).to_pandas() runs in the jupyter notebook and not on the cluster! This defeats the whole purpose of using a distributed cluster in the first place!

I have also learned, after reading the documentation more carefully, that the ds.to_dask_dataframe() function does a transpose of the data creating a long table. Whereas, we need a wide table (i.e., each feature_id is its own column) as recommended by participants from the ESIP conference (Terence can provide more context/details). Anyhow, we can achieve such a wide dataframe using the ds.to_pandas function which does not alter the dimensions (also from carefully reading the documentation). It appears that both of these functions materialize a pandas dataframe in the jupyter notebook kernel, instead of generating this dataframe on the cluster, which I suspect were the cause of the kernel crashes I experienced. Here are the source code definitions for the respective functions:

  • ds.to_pandas()

    pd.DataFrame materialization happens on line 5877. You might have to trace the control flow of a few statements in ds.to_pandas to reach this line.

  • ds.to_dask_dataframe()

    I'm not sure what causes the issue here but I suspect that the line (around line number 6111) series = dd.from_array(dask_array.reshape(-1), columns=[name]) might be causing the reshape. This was brought up in the comments on Lewis's issue post.

Workaround

I have been able to generate the wide parquets, by year, for our zarr subset by running this code on a r5a.4xlarge ec2 instance. Lewis thinks this is unfeasible, and I agree with him, because our subset is only for 10 years and one huc 2 boundary which yielded a subset of 80 gb. The complete dataset is for 40 years and all the huc 2 boundaries which is about 8 TB! Trying to process this full dataset on anything but a distributed cluster is going to be an exercise in frustration.

Areas to explore

  • Wes McKinney's blog post titled Apache Arrow and the "10 Things I Hate About pandas" has a good description of issues with pandas.

    The other pandas issue, mentioned in this blog post, that caused a lot of friction was dealing with categoricals. We cannot do long to wide conversion, pivot_table, unless the column that is being transposed is a categorical. I was unable to pivot the dask dataframe which prompted, and eventually led, me to use to_pandas.

@lewfish
Copy link
Contributor Author

lewfish commented Aug 18, 2022

I would expect to_pandas and to_dataframe to use up memory on the notebook instance since Pandas dataframes are non-lazy, in-memory objects.

Now that we've exhausted a couple of workarounds, do you want to just ask the question in the xArray forum about how to convert from Zarr to Parquet? You could link to this, pydata/xarray#6811, and https://dask.discourse.group/t/workers-dont-have-promised-key-error-and-delayed-computation/936 to show what we've tried so far.

@vlulla
Copy link
Contributor

vlulla commented Aug 18, 2022

Good idea Lewis! I had already asked this question on the xarray community discussion forum ... pydata/xarray#6905 but there has been no response yet. I suspect that the terseness of the question might have something to do with the lack of response. Anyways, what are the norms for editing already mentioned forum questions? Is it alright to edit the original question with these additional links/context? Or is it better to add these as a comment to that question? I'm leaning towards adding a comment.

@lewfish
Copy link
Contributor Author

lewfish commented Aug 18, 2022

Ok, somehow I missed that you posted that. I would add another comment in the thread with the additional information.

@vlulla
Copy link
Contributor

vlulla commented Aug 18, 2022

Ok, done. I hope we get some discussion going!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants