Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreading in groupreduce #2491

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ jobs:
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 2
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
9 changes: 9 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# DataFrames v1.0 Release Notes

## New functionalities

* `combine`, `select` and `transform` with `GroupedDataFrame` now accept
a `nthreads` argument which enables multithreading for some optimized
grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)).


# DataFrames v0.22 Release Notes

## Breaking changes
Expand Down
6 changes: 6 additions & 0 deletions docs/src/lib/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,9 @@ pairs
```@docs
isapprox
```

## Multithreading
```@docs
DataFrames.nthreads
DataFrames.nthreads!
```
2 changes: 1 addition & 1 deletion src/DataFrames.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module DataFrames
using Statistics, Printf, REPL
using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays, CategoricalArrays
@reexport using Missings, InvertedIndices
using Base.Sort, Base.Order, Base.Iterators
using Base.Sort, Base.Order, Base.Iterators, Base.Threads
using TableTraits, IteratorInterfaceExtensions
import LinearAlgebra: norm
using Markdown
Expand Down
58 changes: 48 additions & 10 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,12 @@ end
"""
select!(df::DataFrame, args...; renamecols::Bool=true)
select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true)
select!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true)
select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
select!(gd::GroupedDataFrame{DataFrame}, args...;
ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=DataFrames.nthreads())
select!(f::Base.Callable, gd::GroupedDataFrame;
ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=DataFrames.nthreads())

Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and
return it. The result is guaranteed to have the same number of rows as `df` or
Expand All @@ -595,6 +599,11 @@ $TRANSFORMATION_COMMON_RULES
column names should include the name of transformation functions or not.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
a different value. Passing a value higher than 1 currently has an effect only
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
will be replaced with that value.

See [`select`](@ref) for examples.
```
Expand All @@ -613,8 +622,12 @@ end
"""
transform!(df::DataFrame, args...; renamecols::Bool=true)
transform!(args::Callable, df::DataFrame; renamecols::Bool=true)
transform!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true)
transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
transform!(gd::GroupedDataFrame{DataFrame}, args...;
ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=DataFrames.nthreads())
transform!(f::Base.Callable, gd::GroupedDataFrame;
ungroup::Bool=true, renamecols::Bool=true,
nthreads::Int=DataFrames.nthreads())

Mutate `df` or `gd` in place to add columns specified by `args...` and return it.
The result is guaranteed to have the same number of rows as `df`.
Expand All @@ -627,6 +640,11 @@ $TRANSFORMATION_COMMON_RULES
column names should include the name of transformation functions or not.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
a different value. Passing a value higher than 1 currently has an effect only
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
will be replaced with that value.

See [`select`](@ref) for examples.
"""
Expand All @@ -644,9 +662,10 @@ end
select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
select(args::Callable, df::DataFrame; renamecols::Bool=true)
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
ungroup::Bool=true, renamecols::Bool=true)
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())

Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result is guaranteed to have the same number of rows
Expand All @@ -664,6 +683,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
a different value. Passing a value higher than 1 currently has an effect only
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
will be replaced with that value.

# Examples
```jldoctest
Expand Down Expand Up @@ -858,9 +882,11 @@ end
transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
transform(f::Callable, df::DataFrame; renamecols::Bool=true)
transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())

Create a new data frame that contains columns from `df` or `gd` plus columns
specified by `args` and return it. The result is guaranteed to have the same
Expand All @@ -877,6 +903,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
a different value. Passing a value higher than 1 currently has an effect only
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
will be replaced with that value.

Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false`
is needed to be able to return a different value for the grouping column:
Expand Down Expand Up @@ -924,9 +955,11 @@ end
combine(df::AbstractDataFrame, args...; renamecols::Bool=true)
combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true)
combine(gd::GroupedDataFrame, args...;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
combine(f::Base.Callable, gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())

Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result can have any number of rows that is determined
Expand All @@ -941,6 +974,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
a different value. Passing a value higher than 1 currently has an effect only
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
will be replaced with that value.

# Examples
```jldoctest
Expand Down
137 changes: 104 additions & 33 deletions src/groupeddataframe/fastaggregates.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,84 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
end

function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame)
incol::AbstractVector, gd::GroupedDataFrame, nthreads::Int)
n = length(gd)
groups = gd.groups
if adjust !== nothing || checkempty
counts = zeros(Int, n)
end
groups = gd.groups
@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if U is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
nt = min(nthreads, Threads.nthreads())
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
if nt <= 1 || axes(incol) != axes(groups)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when axes might be different?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since groups is a Vector, that could happen only when incol doesn't use 1-based indexing. It's not completely clear whether we support it but it's probably safer the check that.

@inbounds for i in eachindex(incol, groups)
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res) === Any && !isassigned(res, gix)
res[gix] = f(x, gix)
else
res[gix] = op(res[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts[gix] += 1
end
end
end
else
res_vec = Vector{typeof(res)}(undef, nt)
# needs to be always allocated to fix type instability with @threads
counts_vec = Vector{Vector{Int}}(undef, nt)
res_vec[1] = res
if adjust !== nothing || checkempty
counts_vec[1] = counts
end
for i in 2:nt
res_vec[i] = copy(res)
if adjust !== nothing || checkempty
counts_vec[i] = zeros(Int, n)
end
end
Threads.@threads for tid in 1:nt
res′ = res_vec[tid]
if adjust !== nothing || checkempty
counts[gix] += 1
counts′ = counts_vec[tid]
end
start = 1 + ((tid - 1) * length(groups)) ÷ nt
stop = (tid * length(groups)) ÷ nt
@inbounds for i in start:stop
gix = groups[i]
x = incol[i]
if gix > 0 && (condf === nothing || condf(x))
# this check should be optimized out if eltype is not Any
if eltype(res′) === Any && !isassigned(res′, gix)
res′[gix] = f(x, gix)
else
res′[gix] = op(res′[gix], f(x, gix))
end
if adjust !== nothing || checkempty
counts′[gix] += 1
end
end
end
end
for i in 2:length(res_vec)
resi = res_vec[i]
@inbounds @simd for j in eachindex(res)
# this check should be optimized out if eltype is not Any
if eltype(res) === Any
if isassigned(resi, j) && isassigned(res, j)
res[j] = op(res[j], resi[j])
elseif isassigned(resi, j)
res[j] = resi[j]
end
else
res[j] = op(res[j], resi[j])
end
end
end
if adjust !== nothing || checkempty
for i in 2:length(counts_vec)
counts .+= counts_vec[i]
end
end
end
Expand Down Expand Up @@ -218,26 +278,31 @@ end

# function barrier works around type instability of groupreduce_init due to applicable
groupreduce(f, op, condf, adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame,
nthreads::Int) =
groupreduce!(groupreduce_init(op, condf, adjust, incol, gd),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads)
# Avoids the overhead due to Missing when computing reduction
groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
incol::AbstractVector, gd::GroupedDataFrame) =
incol::AbstractVector, gd::GroupedDataFrame,
nthreads::Int) =
groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)),
f, op, condf, adjust, checkempty, incol, gd)
f, op, condf, adjust, checkempty, incol, gd, nthreads)

(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=nthreads()) =
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads)

# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
# TODO: remove this when we drop 1.0 support
if VERSION < v"1.1"
Base.zero(::Type{Missing}) = missing
end

function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=nthreads())
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false,
incol, gd, nthreads)
# !ismissing check is purely an optimization to avoid a copy later
if eltype(means) >: Missing && agg.condf !== !ismissing
T = Union{Missing, real(eltype(means))}
Expand All @@ -247,32 +312,38 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
res = zeros(T, length(gd))
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
false, incol, gd)
false, incol, gd, nthreads)
end

function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
outcol = Aggregate(var, agg.condf)(incol, gd)
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=nthreads())
outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads)
if eltype(outcol) <: Union{Missing, Rational}
return sqrt.(outcol)
else
return map!(sqrt, outcol, outcol)
end
end

for f in (first, last)
function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame)
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
for f in (:first, :last)
# Without using @eval the presence of a keyword argument triggers a Julia bug
@eval begin
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=nthreads())
n = length(gd)
outcol = similar(incol, n)
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
if isconcretetype(eltype(outcol))
return outcol
else
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
end
end
end
end

function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame)
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
nthreads::Int=nthreads())
if getfield(gd, :idx) === nothing
lens = zeros(Int, length(gd))
@inbounds for gix in gd.groups
Expand Down
Loading