-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do we want something like this? #1
Comments
Thanks for starting this! I wonder whether this API gives enough information though. For DataFrames at least, knowing that we're in a context that is already parallelized isn't enough to decide whether we should spawn new tasks or not. In many places, spawning tasks is cheap so we probably always want to do it (except for thread-safety, which is out of scope here IIUC). In other places, we need to know whether all CPUs are likely to be busy or not, which could be approximated by knowing the total number of parallel tasks. Shouldn't the API provide this information? It could also allow choosing a reasonable basesize. |
This is a very dangerous route that I will avoid. The number of active worker threads is a non-deterministic quantity. Packages shouldn't depend on it. First, if package authors depend on it, I suspect that we will have an ecosystem that cannot compute deterministic results. Given that Julia users have a wide range of concurrent programming skills, I will keep advocating that packages should compute deterministic results by default for debuggability and reproducibility. (Of course, I'm not saying it's impossible to build a robust function that computes identical results given such non-determinism.) The proposed API is designed in such a way that the package authors don't have to worry about this much. Second, the number of active worker threads that the current worker thread happened to observe at the moment your API is called is not a very useful quantity. It is not hard to imagine a program where all worker threads are initially busy but then a few milliseconds later all other tasks started concurrently end (and this happens consistently). Imagine that the invocation of your API takes 10 seconds. It's clear that deciding the number of tasks to use at the beginning of the API invocation is a bad idea. Lastly, it is important that packages don't become the scheduler 1. Instead, packages should try to expose a parallel task structure that maximizes what the scheduler can do... in principle. However, we need to take a nuanced approach because the task overhead is relatively high. This is why it may make sense to simplify the "inner" parallel task structure somtimes. It is kind of like some Julia programs pre-allocate and manually reuses memory to help GC. As explained in Extended help of All that said, I think it is reasonable to expose a "parallel region depth" as in @contextvar DEPTH::Int = 0
within_parallel(f::F) where {F} = with_context(f, DEPTH => DEPTH[] + 1)
parallel_depth() = DEPTH[]
is_parallelized() = DEPTH[] != 0 If packages use a divide-and-conquer approach, you can guess that there are roughly Footnotes
|
I didn't mean we should track the number of active threads, but rather than each call to That said, I agree this is a very tricky issue. As soon as the overhead due to multithreading is significant, it's very hard to decide whether using multiple tasks is a good idea or not (we haven't been able to merge JuliaData/DataFrames.jl#2491 yet because we couldn't figure out reliable conditions under which multithreading is faster than single-threading). |
So, just to make it very concrete, is possible to implement: struct TaskSplitInfo
ntasks::Union{Int,Nothing}
parent::Union{TaskSplitInfo,Nothing}
end
Base.iterate(info::TaskSplitInfo) = (info, info)
function Base.iterate(::TaskSplitInfo, pre::TaskSplitInfo)
parent = pre.parent
parent === nothing ? nothing : (parent, parent)
end
# (... omitting other mandatory iterator interfaces ...)
@contextvar TASKSPLIT::Union{TaskSplitInfo,Nothing} = 0
# APIs:
within_parallel(f::F, ntasks = nothing) where {F} =
with_context(f, TASKSPLIT => TaskSplitInfo(ntasks, TASKSPLIT[]))
taskplitinfo() = TASKSPLIT[]
parallel_depth() = cont(Returns(true), TASKSPLIT[])
is_parallelized() = TASKSPLIT[] !== nothing However, it has several issues:
Given these two issues, I designed a very minimalistic API as suggested in the OP. |
I think it is up to @jpsamaroo to say what is needed. |
So, based on what strategy Dagger thinks will be most efficient, Dagger might choose any amount of parallelism from 1 task up to infinity (in the (rare) event that all tasks are expected to immediately yield and otherwise be very cheap, we might just schedule all of them at once). Based on this knowledge, and an assumption that DataFrames (among other libraries) is looking to tune its task splitting based on what we tell it, we should be able to indicate how much parallelism should be exposed per Dagger-submitted task. For example, if we have 128 threads on a worker, and Dagger has 16 total tasks (each calling into some DataFrames function) that it can schedule concurrently on this worker, Dagger would like to inform DataFrames to use about 8 concurrent tasks per DataFrames top-level invocation. So Dagger can be pretty exact about how many tasks it wants to see spawned. If this package can provide an interface like that, I'd be happy to integrate it and start teaching the scheduler to "explore" what number of Dagger tasks is optimal for the workload (i.e. minimizes total runtime). Relatedly: can we have a mechanism for Dagger to query DataFrames and ask how much parallelism a given operation (potentially for a given set of inputs) could expose? This would let us bound our exploration to some max amount of parallelism so that we don't spend too much time exploring solutions which undersubscribe the system. This is, of course, not strictly necessary, but it might prove useful. |
Here the problem is Amdahl's law :). That is the operations that can be parallelized are only a part of execution schedule and it is hard in general to say what portion of load can be parallelized. The reason is that it would depend on how expensive a user supplied operation that is parallelized is (which cannot be checked). In short - at least currently we cannot reliably provide such information unfortunately. |
The second limitation is that, at least DataFrames.jl, currently can do all-or-nothing approach. That is it uses standard Julia |
I think this mindset made sense when Dagger had explicit DAG. However, it does not seem to be compatible with that Dagger is now gearing towards a more "dynamic" task model (which is my understanding of the eager thunk) that is closer to Julia's task system, and the DAG is implicitly defined. In such a system, it is important to design the scheduler given that workload of each task and the structure of the DAG are unknowable before the execution. |
Querying DataFrames to know how many tasks could we spawned indeed sounds relatively difficult. Maybe for now we can focus on the first issue, i.e. having Dagger indicate to DataFrames (and others) the number of desired tasks? |
The scheduler still has full knowledge of all submitted tasks and their dependencies (the DAG is just more lazily elaborated). While we may not be able to apply an optimization to the "full" DAG at any one time, we might be able to do it for a portion of the DAG that's already submitted.
Agreed, that is the most important part. If it's going to be all-or-nothing parallelism for now, then that's fine, I'll just need to figure out how to communicate that to Dagger. |
As an additional comment on DataFrames.jl side. I believe that all or nothing approach is enough because we already make sure not to spawn very small tasks as then cost of spawning might be bigger than the benefit. If the operation is very cheap we will not execute it in multithreaded mode anyway even if we are allowed to. Therefore, the only benefit from passing exact number of allowed threads would be if the scheduler could exactly compute the cost of given operation (e.g. it knows it has two tasks 1 and 2 of which both parallelize well and are followed by task 3 (i.e. it can run only if both 1 and 2 are finished). And task 1 is 2x cheaper which means that it cen get 2x less CPUs - but it seems that such detailed information is not likely to be available to the scheduler - but maybe I am wrong here). |
As I explained above, this has undesirable properties like unpredictability in computation results and performance. |
This package is a POC implementation of a package that could support a part of usages like JuliaData/DataFrames.jl#3030. APIs can be found here:
ParallelismHints.is_parallelized() -> ans::Bool
: https://github.com/tkf/ParallelismHints.jl/blob/main/src/docs/is_parallelized.mdParallelismHints.within_parallel(f) -> y
: https://github.com/tkf/ParallelismHints.jl/blob/main/src/docs/within_parallel.mdThe rough idea is that libraries that spawn tasks wrap the "fork-join region" in
within_parallel
and libraries that provide parallelizable algorithms check if they should fallback to serial (or less parallel) algorithm based on whatis_parallelized()
returns.It may be better to integrate this into Julia itself (e.g., when we have JuliaLang/julia#35833) and I also have an idea to use JuliaLang/julia#39773 for providing this information more efficiently. However, until we get there, I think it's a reasonably simple package to have it around and use it right away.
Do people want something like this package? Is the design/direction good (enough)?
Once we decide on the rough design idea, I think it's better to move this package to JuliaParallel organization.
cc @jpsamaroo @vchuravy @bkamins @nalimilan
The text was updated successfully, but these errors were encountered: