Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pool sharing with copy on write #56

Merged
merged 39 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
96e5d3a
add pool sharing to PooledArrays
bkamins Feb 20, 2021
dd77169
further code review
bkamins Feb 20, 2021
7932121
use Atomic{Int} and implement copyto!
bkamins Feb 20, 2021
8dbee32
fix copy
bkamins Feb 20, 2021
92850e5
fix typo
bkamins Feb 20, 2021
995b3cd
fix leftover code
bkamins Feb 20, 2021
4c407f3
add missing )
bkamins Feb 20, 2021
00af820
fix small issues
bkamins Feb 20, 2021
45a33a9
add missing method
bkamins Feb 20, 2021
da13879
another missing method
bkamins Feb 20, 2021
665bf9a
fix various lurking problems (and bugs) in old code
bkamins Feb 20, 2021
cb1308c
Apply suggestions from code review
bkamins Feb 21, 2021
be31412
apply comments from the review
bkamins Feb 21, 2021
b41e9a1
improve getindex
bkamins Feb 21, 2021
07e7b9c
add view
bkamins Feb 21, 2021
1419283
Apply suggestions from code review
bkamins Feb 21, 2021
a1df0a0
Merge remote-tracking branch 'origin/main' into bk/pool_sharing
bkamins Feb 21, 2021
0cb0021
add SubArray handling
bkamins Feb 21, 2021
be09d78
Apply suggestions from code review
bkamins Feb 23, 2021
7f5f7e2
Update src/PooledArrays.jl
bkamins Feb 23, 2021
61bcbdc
add PooledArrOrSub
bkamins Feb 24, 2021
8ec3854
start adding tests
bkamins Feb 24, 2021
cd20319
Apply suggestions from code review
bkamins Feb 24, 2021
ee6100b
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 24, 2021
02b002b
merge methods (currently fails but I first need to understand if it i…
bkamins Feb 24, 2021
21b3354
enough to remove where
bkamins Feb 24, 2021
ec0f710
simplify definition
bkamins Feb 24, 2021
35430e0
update getindex
bkamins Feb 24, 2021
d3fb0a3
continue adding tests
bkamins Feb 24, 2021
b6e6c85
fix current tests
bkamins Feb 24, 2021
2af32ca
finalize tests
bkamins Feb 24, 2021
c0333af
hopefully final fixes
bkamins Feb 24, 2021
46d971c
add Julia 1.0.5 support
bkamins Feb 24, 2021
c55b8e9
Update src/PooledArrays.jl
bkamins Feb 25, 2021
03a3221
fixes after code review
bkamins Feb 27, 2021
0b8ba4d
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 27, 2021
3a92892
Update src/PooledArrays.jl
bkamins Feb 27, 2021
49afb14
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 27, 2021
3eb65e7
Update src/PooledArrays.jl
bkamins Feb 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ jobs:
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: 4
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
172 changes: 129 additions & 43 deletions src/PooledArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,32 @@ function _invert(d::Dict{K,V}) where {K,V}
for (k, v) in d
d1[v] = k
end
d1
return d1
end

mutable struct PooledArray{T, R<:Integer, N, RA} <: AbstractArray{T, N}
refs::RA
pool::Vector{T}
invpool::Dict{T,R}

function PooledArray(rs::RefArray{RA},
invpool::Dict{T, R},
pool=_invert(invpool)) where {T,R,N,RA<:AbstractArray{R, N}}
# refcount[] is 0 if only one PooledArray holds a reference to pool and invpool
refcount::Threads.Atomic{Int}

function PooledArray{T,R,N,RA}(rs::RefArray{RA}, invpool::Dict{T, R},
pool::Vector{T}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic()) where {T,R,N,RA<:AbstractArray{R, N}}
bkamins marked this conversation as resolved.
Show resolved Hide resolved
# this is a quick but incomplete consistency check
if length(pool) != length(invpool)
throw(ArgumentError("inconsistent pool and invpool"))
end
# refs mustn't overflow pool
if length(rs.a) > 0 && maximum(rs.a) > length(invpool)
minref, maxref = extrema(rs.a)
# 0 indicates #undef
if length(rs.a) > 0 && (minref < 0 || maxref > length(invpool))
throw(ArgumentError("Reference array points beyond the end of the pool"))
end
new{T,R,N,RA}(rs.a,pool,invpool)
pa = new{T,R,N,RA}(rs.a, pool, invpool, refcount)
finalizer(x -> Threads.atomic_sub!(x.refcount, 1), pa)
return pa
end
end
const PooledVector{T,R} = PooledArray{T,R,1}
Expand All @@ -60,11 +70,14 @@ const PooledMatrix{T,R} = PooledArray{T,R,2}
##############################################################################

# Echo inner constructor as an outer constructor
function PooledArray(refs::RefArray{R}, invpool::Dict{T,R}, pool=_invert(invpool)) where {T,R}
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
PooledArray{T,eltype(R),ndims(R),R}(refs, invpool, pool)
end
PooledArray(refs::RefArray{RA}, invpool::Dict{T,R}, pool::Vector{T}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic()) where {T,R,RA<:AbstractArray{R}} =
PooledArray{T,R,ndims(RA),RA}(refs, invpool, pool, refcount)

PooledArray(d::PooledArray) = copy(d)
bkamins marked this conversation as resolved.
Show resolved Hide resolved
function PooledArray(d::PooledArray)
Threads.atomic_add!(d.refcount, 1)
return PooledArray(RefArray(copy(d.refs)), d.invpool, d.pool, d.refcount)
end

function _label(xs::AbstractArray,
::Type{T}=eltype(xs),
Expand Down Expand Up @@ -121,6 +134,14 @@ If `array` is not a `PooledArray` then the order of elements in `refpool` in the

Note that if you hold mutable objects in `PooledArray` it is not allowed to modify them
after they are stored in it.

In order to improve performance of `getindex` and `copyto!` operations `PooledArray`s
may share pools.
bkamins marked this conversation as resolved.
Show resolved Hide resolved

It is not safe to assign values that are not already present in a `PooledArray`'s pool
from one thread while either reading or writing to the same array from another thread
(even if pools are not shared). However, reading and writing from different threads is safe
if all values already exist in the pool.
bkamins marked this conversation as resolved.
Show resolved Hide resolved
"""
PooledArray

Expand All @@ -132,19 +153,18 @@ function PooledArray{T}(d::AbstractArray, r::Type{R}) where {T,R<:Integer}
end

# Assertions are needed since _label is not type stable
PooledArray(RefArray(refs::Vector{R}), invpool::Dict{T,R}, pool)
return PooledArray(RefArray(refs::Vector{R}), invpool::Dict{T,R}, pool)
end

function PooledArray{T}(d::AbstractArray; signed::Bool=false, compress::Bool=false) where {T}
R = signed ? (compress ? Int8 : DEFAULT_SIGNED_REF_TYPE) : (compress ? UInt8 : DEFAULT_POOLED_REF_TYPE)
refs, invpool, pool = _label(d, T, R)
PooledArray(RefArray(refs), invpool, pool)
return PooledArray(RefArray(refs), invpool, pool)
end

PooledArray(d::AbstractArray{T}, r::Type) where {T} = PooledArray{T}(d, r)
function PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T}
PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T} =
PooledArray{T}(d, signed=signed, compress=compress)
end

# Construct an empty PooledVector of a specific type
PooledArray(t::Type) = PooledArray(Array(t,0))
Expand All @@ -165,31 +185,73 @@ Base.size(pa::PooledArray) = size(pa.refs)
Base.length(pa::PooledArray) = length(pa.refs)
Base.lastindex(pa::PooledArray) = lastindex(pa.refs)

Base.copy(pa::PooledArray) = PooledArray(RefArray(copy(pa.refs)), copy(pa.invpool))
# TODO: Implement copy_to()
Base.copy(pa::PooledArray) = PooledArray(pa)

Base.copyto!(dest::PooledArray{T, R, N, RA},
src::PooledArray{T, R, N, RA}) where {T, R, N, RA} =
copyto!(dest, 1, src, 1, length(src))

function Base.copy!(dest::PooledArray{T, R, N, RA},
src::PooledArray{T, R, N, RA}) where {T, R, N, RA}
copy!(dest.refs, src.refs)
Threads.atomic_add!(src.refcount, 1)
dest.pool = src.pool
dest.invpool = src.invpool
dest.refcount = src.refcount
return dest
end

function Base.copyto!(dest::PooledArray{T, R, N, RA}, doffs::Union{Signed, Unsigned},
src::PooledArray{T, R, N, RA}, soffs::Union{Signed, Unsigned},
n::Union{Signed, Unsigned}) where {T, R, N, RA}
n == 0 && return dest
n > 0 || Base._throw_argerror()
if soffs < 1 || doffs < 1 || soffs+n-1 > length(src) || doffs+n-1 > length(dest)
bkamins marked this conversation as resolved.
Show resolved Hide resolved
throw(BoundsError())
end

if length(dest.pool) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a code comment explaining this if branch of code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a comment - it is useful if we want to use a pattern like:

newpa = similar(pa, newsize)
copyto!(newpa, start1, pa, start2, n)

The only case we do not resolve is line like
https://github.com/JuliaData/DataFrames.jl/pull/2622/files#diff-00fe5953fad7e5572a8a4ae57ab3724fd3a3895cae501c40bc60a9fed12cf60fR221

as here we have a problem that union with Missing is potentially introduced and we have to copy a pool anyway (because we add Missing to it). This kind of problem could probably be resolved by adding special handling of Missing in PolledArrays.jl like it is done in CategoricalArrays.jl, but I think it should be left for a separate PR.

The conclusion is that innerjoin will be always fully fast after this PR, but leftjoin, rightjoin and outerjoin will not be maximally fast because of this union-taking with Missing.

bkamins marked this conversation as resolved.
Show resolved Hide resolved
@assert length(dest.invpool) == 0
Threads.atomic_add!(src.refcount, 1)
dest.pool = src.pool
dest.invpool = src.invpool
Threads.atomic_sub!(dest.refcount, 1)
bkamins marked this conversation as resolved.
Show resolved Hide resolved
copyto!(dest.refs, doffs, src.refs, soffs, n)
elseif dest.pool === src.pool && dest.invpool === src.invpool
copyto!(dest.refs, doffs, src.refs, soffs, n)
else
@inbounds for i in 0:n-1
dest[dstart+i] = src[sstart+i]
end
end
return dest
end


function Base.resize!(pa::PooledArray{T,R,1}, n::Integer) where {T,R}
oldn = length(pa.refs)
resize!(pa.refs, n)
pa.refs[oldn+1:n] .= zero(R)
pa
return pa
end

Base.reverse(x::PooledArray) = PooledArray(RefArray(reverse(x.refs)), x.invpool)
function Base.reverse(x::PooledArray)
Threads.atomic_add!(x.refcount, 1)
PooledArray(RefArray(reverse(x.refs)), x.invpool, x.pool, x.refcount)
end

function Base.permute!!(x::PooledArray, p::AbstractVector{T}) where T<:Integer
Base.permute!!(x.refs, p)
x
return x
end

function Base.invpermute!!(x::PooledArray, p::AbstractVector{T}) where T<:Integer
Base.invpermute!!(x.refs, p)
x
return x
end

function Base.similar(pa::PooledArray{T,R}, S::Type, dims::Dims) where {T,R}
Base.similar(pa::PooledArray{T,R}, S::Type, dims::Dims) where {T,R} =
PooledArray(RefArray(zeros(R, dims)), Dict{S,R}())
end

Base.findall(pdv::PooledVector{Bool}) = findall(convert(Vector{Bool}, pdv))

Expand Down Expand Up @@ -224,7 +286,7 @@ function Base.map(f, x::PooledArray{T,R}) where {T,R<:Integer}
newinvpool = Dict(zip(map(f, ks), vs))
refarray = copy(x.refs)
end
PooledArray(RefArray(refarray), newinvpool)
return PooledArray(RefArray(refarray), newinvpool)
end

##############################################################################
Expand Down Expand Up @@ -288,10 +350,24 @@ Base.sort(pa::PooledArray; kw...) = pa[sortperm(pa; kw...)]
##
##############################################################################

Base.convert(::Type{PooledArray{S,R1,N}}, pa::PooledArray{T,R2,N}) where {S,T,R1<:Integer,R2<:Integer,N} =
PooledArray(RefArray(convert(Array{R1,N}, pa.refs)), convert(Dict{S,R1}, pa.invpool))
Base.convert(::Type{PooledArray{S,R,N}}, pa::PooledArray{T,R,N}) where {S,T,R<:Integer,N} =
PooledArray(RefArray(copy(pa.refs)), convert(Dict{S,R}, pa.invpool))
function Base.convert(::Type{PooledArray{S,R1,N}}, pa::PooledArray{T,R2,N}) where {S,T,R1<:Integer,R2<:Integer,N}
if S === R && R1 === R2
return pa
else
invpool_conv = convert(Dict{S,R1}, pa.invpool)
@assert invpool_conv !== pa.invpool
end

if R1 === R2
refs_conv = pa.refs
else
refs_conv = convert(Array{R1,N}, pa.refs)
@assert refs_conv !== pa.refs
end

return PooledArray(RefArray(refs_conv), invpool_conv)
end

Base.convert(::Type{PooledArray{T,R,N}}, pa::PooledArray{T,R,N}) where {T,R<:Integer,N} = pa
Base.convert(::Type{PooledArray{S,R1}}, pa::PooledArray{T,R2,N}) where {S,T,R1<:Integer,R2<:Integer,N} =
convert(PooledArray{S,R1,N}, pa)
Expand Down Expand Up @@ -341,16 +417,20 @@ Base.@propagate_inbounds function Base.isassigned(pa::PooledArray, I::Int...)
!iszero(pa.refs[I...])
end

# Vector case
Base.@propagate_inbounds function Base.getindex(A::PooledArray, I::Union{Real,AbstractVector}...)
PooledArray(RefArray(getindex(A.refs, I...)), copy(A.invpool))
end
# Other cases
Base.@propagate_inbounds function Base.getindex(A::PooledArray, I...)
new_refs = getindex(A.refs, I...)

# Dispatch our implementation for these cases instead of Base
Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractVector) =
PooledArray(RefArray(getindex(A.refs, I)), copy(A.invpool))
Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractArray) =
PooledArray(RefArray(getindex(A.refs, I)), copy(A.invpool))
if new_refs isa AbstractArray
Threads.atomic_add!(A.refcount, 1)
return PooledArray(RefArray(getindex(A.refs, I...)), A.invpool, A.pool, A.refcount)
else
@assert typeof(new_refs) === eltype(A.refs)
# scalar produced
iszero(new_refs) && throw(UndefRefError())
return @inbounds A.pool[new_refs]
end
end

##############################################################################
##
Expand All @@ -368,7 +448,8 @@ function getpoolidx(pa::PooledArray{T,R}, val::Any) where {T,R}
end

function unsafe_pool_push!(pa::PooledArray{T,R}, val) where {T,R}
_pool_idx = length(pa.pool)+1
# Warning - unsafe_pool_push! may not be used in any multithreaded context
_pool_idx = length(pa.pool) + 1
if _pool_idx > typemax(R)
throw(ErrorException(string(
"You're using a PooledArray with ref type $R, which can only hold $(Int(typemax(R))) values,\n",
Expand All @@ -377,6 +458,12 @@ function unsafe_pool_push!(pa::PooledArray{T,R}, val) where {T,R}
)))
end
pool_idx = convert(R, _pool_idx)
if pa.refcount[] > 0
bkamins marked this conversation as resolved.
Show resolved Hide resolved
pa.invpool = copy(pa.invpool)
pa.pool = copy(pa.pool)
Threads.atomic_sub!(pa.refcount, 1)
pa.refcount = Threads.Atomic()
end
pa.invpool[val] = pool_idx
push!(pa.pool, val)
pool_idx
Expand Down Expand Up @@ -420,20 +507,19 @@ Base.empty!(pv::PooledVector) = (empty!(pv.refs); pv)

Base.deleteat!(pv::PooledVector, inds) = (deleteat!(pv.refs, inds); pv)

function _vcat!(c,a,b)
function _vcat!(c, a, b)
copyto!(c, 1, a, 1, length(a))
copyto!(c, length(a)+1, b, 1, length(b))
return copyto!(c, length(a)+1, b, 1, length(b))
end


function Base.vcat(a::PooledArray{<:Any, <:Integer, 1}, b::AbstractArray{<:Any, 1})
output = similar(b, promote_type(eltype(a), eltype(b)), length(b) + length(a))
_vcat!(output, a, b)
return _vcat!(output, a, b)
end

function Base.vcat(a::AbstractArray{<:Any, 1}, b::PooledArray{<:Any, <:Integer, 1})
output = similar(a, promote_type(eltype(a), eltype(b)), length(b) + length(a))
_vcat!(output, a, b)
return _vcat!(output, a, b)
end

function Base.vcat(a::PooledArray{T, <:Integer, 1}, b::PooledArray{S, <:Integer, 1}) where {T, S}
Expand Down
8 changes: 7 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ using Test
using PooledArrays
using DataAPI: refarray, refvalue, refpool, invrefpool

if Threads.nthreads() < 2
@warn("Running with only one thread: correctness of parallel operations is not tested")
else
@show Threads.nthreads()
end

@testset "PooledArrays" begin
a = rand(10)
b = rand(10,10)
Expand Down Expand Up @@ -88,7 +94,7 @@ using DataAPI: refarray, refvalue, refpool, invrefpool
end
@test refpool(s) == ["a", "b"]
@test invrefpool(s) == Dict("a" => 1, "b" => 2)

@testset "push!" begin
xs = PooledArray([10, 20, 30])
@test xs === push!(xs, -100)
Expand Down