Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr fragment writers #391

Merged
merged 60 commits into from
Aug 29, 2022
Merged

Conversation

rabernat
Copy link
Contributor

@rabernat rabernat commented Jul 27, 2022

This implements one of the last few key pieces of #376: writing dataset fragments to Zarr in parallel. Still lots TODO here, namely:

  • Better testing around chunks. We are no longer using locks for writing, so we have to take care to explicitly align writes with chunks. This PR does not implement rechunking.
  • Unit tests for IndexItems (and maybe move its core functionality to a different module)
  • Improve docstrings for transforms
  • Incorporate feedback on the API. Do we like the names of the functions, PTransforms, and their arguments? All of this is free to change.
  • Parametrize the 💩 out of test_end_to_end.py::test_xarray_zarr

I don't know why there are so many commits in this PR. Something about how I merged my branch with upstream. The actual diff is not that huge.

Comment on lines 32 to 44
with pipeline as p:
inputs = p | beam.Create(pattern.items())
datasets = inputs | OpenWithXarray(file_type=pattern.file_type)
# TODO determine this dynamically
combine_dims = [DimKey("time", operation=CombineOp.CONCAT)]
schemas = datasets | DatasetToSchema()
schema = schemas | DetermineSchema(combine_dims=combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
target = schema | PrepareZarrTarget(target_url=tmp_target_url)
_ = indexed_datasets | StoreToZarr(target_store=target)

ds = xr.open_dataset(tmp_target_url, engine="zarr").load()
xr.testing.assert_equal(ds, daily_xarray_dataset)
Copy link
Contributor Author

@rabernat rabernat Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is really the highlight of this PR. It is the closest we have come yet to recreating the original monolithic XarrayZarrRecipe in modular Beam style.

But there is still a lot of room to refine this. Perhaps some of the intermediate steps can be merged into a single PTransform? I'm not sure the user needs to be passing schemas around.

Really interested in getting feedback on the API and flow here - @cisaacstern and @alxmrs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll review this PR in more depth later today or tomorrow. It's exciting to see all this progress!

Comment on lines 33 to 41
inputs = p | beam.Create(pattern.items())
datasets = inputs | OpenWithXarray(file_type=pattern.file_type)
# TODO determine this dynamically
combine_dims = [DimKey("time", operation=CombineOp.CONCAT)]
schemas = datasets | DatasetToSchema()
schema = schemas | DetermineSchema(combine_dims=combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
target = schema | PrepareZarrTarget(target_url=tmp_target_url)
_ = indexed_datasets | StoreToZarr(target_store=target)
Copy link
Contributor

@alxmrs alxmrs Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a translation (really, for myself) to more idiomatic Beam:

Suggested change
inputs = p | beam.Create(pattern.items())
datasets = inputs | OpenWithXarray(file_type=pattern.file_type)
# TODO determine this dynamically
combine_dims = [DimKey("time", operation=CombineOp.CONCAT)]
schemas = datasets | DatasetToSchema()
schema = schemas | DetermineSchema(combine_dims=combine_dims)
indexed_datasets = datasets | IndexItems(schema=schema)
target = schema | PrepareZarrTarget(target_url=tmp_target_url)
_ = indexed_datasets | StoreToZarr(target_store=target)
combine_dims = [DimKey("time", operation=CombineOp.CONCAT)]
datasets = (
p
| beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
)
schema = (
datasets
| DatasetToSchema()
| DetermineSchema(combine_dims=combine_dims)
)
target = schema | PrepareZarrTarget(target_url=tmp_target_url)
_ = (
datasets
| IndexItems(schema=schema)
| StoreToZarr(target_store=target)
)

My high level feedback is that this seems a bit too imperative. This is my bias, but I could see the pipeline fitting closer to the XArray Beam abstractions, something like:

(
    p
    | beam.PatternToChunks(pattern)
    # other processing...
    | beam.ChunksToZarr()
)

Though, I could see how each of these steps would be composed out of the components you have here. Exposing the right level of granularity is hard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From your comment below:

Perhaps some of the intermediate steps can be merged into a single PTransform? I'm not sure the user needs to be passing schemas around.

I totally agree with this sentiment!

@vietnguyengit
Copy link

vietnguyengit commented Jul 29, 2022

Hi @rabernat , maybe my question is not too related to this PR, but keywords zarr and pangeo-forge bring me here, I wonder when I build a recipe to convert multiple NetCDF files (around 100k+) to Zarr to S3 using pangeo-forge-recipe on the private environment (that I won't use pangeo-forge cloud, the recipe also build to use with Prefect 2.0, will pangeo-forge-recipe and DaskTaskRunner of Prefect perform the tasks (e.g. xarray.dataset.to_zarr()) in parallel wisely? Currently, DaskTaskRunner of Prefect won't be able to pick up the tasks of xarray functions automatically, tasks need to be wrapped in a function with @task decorator to be added into Dask graph, eg. Perhaps I'm not advanced enough to figure out the way for DaskTaskRunner to pick up xarray functions and fill up the CPU threads wisely, I'm not sure if it's doable as well.

@task
def xr_to_zarr(dataset):
  dataset.to_zarr(...)

but doing this way, the whole block xr_to_zarr will be performed on a single thread. When without Prefect, calling dataset.to_zarr(...) , Dask will be able to fill up the threads with tasks, something looks like this:

image

Much appreciated your time reading this. Please advice. Thank you.

@rabernat
Copy link
Contributor Author

Hi @vietnguyengit! Thanks for your interest in Pangeo Forge. We would love to help you, but please open a new issue with your question from above. As you said, your question is not related to this pull request.

@rabernat rabernat mentioned this pull request Aug 7, 2022
@rabernat
Copy link
Contributor Author

@martindurant - there is a new fsspec serialization error showing up in the tests. The errors look like

E   AttributeError: Can't pickle local object 'OpenFile.open.<locals>.close' [while running 'OpenWithXarray/Open with Xarray']

These errors did not get raised earlier (see green tests on #375) nor in my local environment. However, if I update to 2022.7.1, I can reproduce the error. Downgrading back to 2022.7.0 makes the error go away. So it must be a fairly recent change.

@martindurant
Copy link
Contributor

I am looking into it. However, the following snippet is informative:

import pickle, io
b = io.BytesIO(b"data")
s = io.TextIOWrapper(b)
pickle.dumps(s)  # TypeError

i.e., even simple file-like types are often not pickleable. I have been playing with weakrefs to augment OpenFile.open's logic, but I cannot get around something fundamental like the above, without writing an explicitly pickleable wrapper class such as LocalFileOpener currently is. This way we could, with a little effort, more rigorously guarantee that the output of OpenFile.open() (or inside with) can be pickled.

Conversely, we could backtrack and declare that file-like objects should not, in general, be pickled and we don't guarantee that they can; this means only pickling outside contexts. Indeed, it is somewhat rare for context manager yielded objects to be pickleable, since they explicitly contain state and at least some closure.

@rabernat
Copy link
Contributor Author

I understand that pickling these objects is complicated (remember fsspec/filesystem_spec#579 (comment) 🙃 ). The problem here is API stability. I spent literally many full days trying to figure out a solution for serializing fsspec open file objects inside Xarray datasets that worked with beam. I found one. It's coded up and tested in Pangeo Forge. The fsspec 2022.7.1 release made some change that broke whatever fragile solution I had found, which triggered the tests to fail. (Unit testing ftw!) So there are two options:

  • The solution I found was actually unsupported behavior. It was an accident that it worked. I have to go back to the drawing board.
  • There was a regression in fsspec in the 2022.7.1 release which needs to be fixed in fsspec.

Do you have a sense of which of the two options above is more appropriate here?

@martindurant
Copy link
Contributor

I'll have a look to see what changed, and whether it can easily be overcome. I'd say that indeed, this was not explicitly supported. I'll also look into my suggestion, above, of how it could be supported and see whether that's tractable or not.

@rabernat rabernat marked this pull request as ready for review August 29, 2022 16:49
@rabernat
Copy link
Contributor Author

Upstream dev passes thanks to fsspec/filesystem_spec#1024. Gonna move on.

@rabernat rabernat merged commit c5bc983 into pangeo-forge:beam-refactor Aug 29, 2022
@cisaacstern
Copy link
Member

🚀

@martindurant
Copy link
Contributor

Sorry, I'm going to have to revert that PR. It breaks too many assumptions in other places :|

In short: an OpenFile should either

  • be used in a context
  • have .close() called on the OpenFile to ensure all contained file-like objects are cleaned up.

Since all AbstractBufferedFile descendants (s3fs, gcsfs, abfs) are pickleable, and now so are LocalFileOpeners, you should be OK to pickle inside the context, if you want, either the OpenFile or the file-like it generates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants