-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask integration #1
Comments
Varying the chunk size might be the only way to scale this to long sequences, indeed. The problem with that is that you might perform too much IO. An example: Say I had a 100fps movie and I need to do a heavy per-frame analysis. Chunking with (1, ...) is not an option here because of too many frames. I need to perform an analysis an every 100th frame (equivalent of So either you construct a very big dict in memory or you perform far too much IO. Is there a way out of this? Most of the current PIMS readers will allow this and only load the relevant frames from disk. There even is code that selects the most efficient reading function based on the queried slice (https://github.com/soft-matter/pims/blob/master/pims/base_frames.py#L310) This might be an extreme example, but it stands for a whole bunch of use cases in which a dataset is partially processed. |
I see what you mean. I don't immediately see a way to do efficient strided slicing with dask without building the large dict up front. Brainstorming possibilities to explore:
I'd be curious to know if @GenevieveBuckley, @jakirkham, or @jni see a clear way forward here. |
One option is a two-step procedure. This is what I implemented in https://github.com/nens/dask-geomodeling , where we faced a similar issue. You stack lazy operations together, and only when you call .get_data(bbox=...), the graph is constructed and computed with dask. This approach works well for us, but we have to implement every array/dataframe operation, which is a pity because there is so much more available in dask.Array. An advanced optimization approach might be feasible. You may initialize one chunk with size (100000, 256, 256), and optimize that such that you reduce it to the slice(s) you are interested in. |
I found a related discussion at dask: dask/dask#3514 The lazy dict-creation approach seems to require significant work on the dask core. For an optimization approach, we could make:
This would at least allow strided reads combined with a large subset of dask.Array operations. |
Thanks, @danielballan for getting this started and @caspervdw for your focus on this exciting possibility! Sorry I haven't had much chance to contribute, especially because I have experience using dask for my particle-tracking workflow. It sounds like dask is currently not well-suited to processing large movies at all, which is also my experience. I know little of dask's internals so I'm not sure how difficult @caspervdw 's optimization rule ideas would be in practice. One complementary way to make progress would be to make it easier for users to develop their own workarounds to this limitation. pims2 could be something that can reliably plug into raw_seq = pims2.open(...)
seq = pipeline_func_2(pipeline_func_1(raw_seq))[::100]
pool_exec = concurrent.futures.ProcessPoolExecutor(...)
# (or equivalent object from ipyparallel, dask, etc.)
future_frame_12 = pool_exec.submit(seq.get_frame, 12) # obvious but inelegant
results = []
# Iterate through the sequence as an iterable of futures objects.
# If the user is not careful, results will quickly pile up in memory
# or on disk.
for fut in seq.read_futures(concurrent=pool_exec):
results.append(analyze(fut.result()))
results = []
# Iterate through the sequence as actual frames, but with streaming.
# pims2 deals with the pool and yields frames as they become available.
# "buffer" limits the number of frames that can be processed concurrently, which
# limits memory use.
for im in seq.read(concurrent=pool_exec, buffer=50):
results.append(analyze(im)) This would force pims2 to be as serializable and/or thread-safe as reasonably possible, which would hopefully make any eventual dask integration much less painful. (In case you're curious, my own solution to particle tracking in dask has been to implement streaming, whereby each track-linking task in turn talks to the cluster scheduler, and submits feature-identification tasks on several frames concurrently. This approach limits the number of frame-level tasks that can be on the graph at once. But it has the drawback of opening a fresh pims reader for each frame! That happens to be inexpensive for the movie format I use.) |
Based on the discussion so far, it seems like what you want is lazy support for things like My naive suggestion would be to look into making a very simplistic NumPy-like array object that supports this behavior. So objects would track their start and stop indices along with their step size. Dask could still go about slicing them assuming them to be NumPy-like arrays. Though the underlying behavior will delay the reads as expected. Users or pims2 would need to be mindful of this need by passing Since Dask will also try to call other NumPy operations on this object, you may want to implement things like As serialization is relevant for any data transmission or spilling, the NumPy-like array would need some kind of My hope is these would be relatively lightweight/simple things to implement (and similar in some ways to how pims worked), but I could be missing context on how pims2 would/should work. |
Yes. It sounds like we could do this by making slicerator the "very simplistic NumPy-like array object." It hasn't been revised to account for NEP-18 yet, but I think it easily could be. |
Yeah that makes sense. One other thing I forgot to mention is Dask has support for
1 seems pretty easy. Though that may box us in a little if we want to use different arrays, but it could be overridable. 2 likely mostly falls out of what we have already described above. We may just want to make sure it corresponds to some small in-memory value under-the-hood. So perhaps reading a single pixel from the image and packing that back into a 3 is the best of both worlds. It's a tiny amount more work than 1 while being more user friendly. Has all the benefits of 2, but is conceptually (and likely actually) simpler to implement. Admittedly this is a minor point once everything else is in place. Going with 1 should get this off the ground with no additional work. Don't have a sense offhand how much work 2 would be. 3 would be pretty easy to do and just as easy to use. |
Just want to +1 that from the perspective of a downstream user having the NumPy-like array interface here would be very desirable. Exciting discussion!! |
Short summary of an email discussion: it was proposed to replace our lazy
FramesSequence
with dask.I actually tried to combine dask with PIMS a while ago. Back then I concluded that
dask.Array
is not suitable for image sequences. This has to do with the sheer number of chunks that you need to initialize on loading. Dask keeps track of every chunk in a dict: 100000 frames would give a dict with that size. Any operation would add another 100000.@danielballan proposed to not give each frame its own dask task, i.e. that dask array chunks have shape (1, ...). The internal dask array chunks should be of shape (N, ...) so that the each task in the graph (the dict kept by dask) represents multiple frames, keeping the total number of tasks in the graph manageable. Dask has some useful tooling where you can set the chunk size based on desired bytes-per-chunk. The relevant docs say that anything from 10 MB to 1 GB is common. Each PIMS reader can choose a suitable default value for chunk size and make it tune-able in init.
The text was updated successfully, but these errors were encountered: