Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pool sharing with copy on write #56

Merged
merged 39 commits into from
Mar 1, 2021
Merged
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
96e5d3a
add pool sharing to PooledArrays
bkamins Feb 20, 2021
dd77169
further code review
bkamins Feb 20, 2021
7932121
use Atomic{Int} and implement copyto!
bkamins Feb 20, 2021
8dbee32
fix copy
bkamins Feb 20, 2021
92850e5
fix typo
bkamins Feb 20, 2021
995b3cd
fix leftover code
bkamins Feb 20, 2021
4c407f3
add missing )
bkamins Feb 20, 2021
00af820
fix small issues
bkamins Feb 20, 2021
45a33a9
add missing method
bkamins Feb 20, 2021
da13879
another missing method
bkamins Feb 20, 2021
665bf9a
fix various lurking problems (and bugs) in old code
bkamins Feb 20, 2021
cb1308c
Apply suggestions from code review
bkamins Feb 21, 2021
be31412
apply comments from the review
bkamins Feb 21, 2021
b41e9a1
improve getindex
bkamins Feb 21, 2021
07e7b9c
add view
bkamins Feb 21, 2021
1419283
Apply suggestions from code review
bkamins Feb 21, 2021
a1df0a0
Merge remote-tracking branch 'origin/main' into bk/pool_sharing
bkamins Feb 21, 2021
0cb0021
add SubArray handling
bkamins Feb 21, 2021
be09d78
Apply suggestions from code review
bkamins Feb 23, 2021
7f5f7e2
Update src/PooledArrays.jl
bkamins Feb 23, 2021
61bcbdc
add PooledArrOrSub
bkamins Feb 24, 2021
8ec3854
start adding tests
bkamins Feb 24, 2021
cd20319
Apply suggestions from code review
bkamins Feb 24, 2021
ee6100b
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 24, 2021
02b002b
merge methods (currently fails but I first need to understand if it i…
bkamins Feb 24, 2021
21b3354
enough to remove where
bkamins Feb 24, 2021
ec0f710
simplify definition
bkamins Feb 24, 2021
35430e0
update getindex
bkamins Feb 24, 2021
d3fb0a3
continue adding tests
bkamins Feb 24, 2021
b6e6c85
fix current tests
bkamins Feb 24, 2021
2af32ca
finalize tests
bkamins Feb 24, 2021
c0333af
hopefully final fixes
bkamins Feb 24, 2021
46d971c
add Julia 1.0.5 support
bkamins Feb 24, 2021
c55b8e9
Update src/PooledArrays.jl
bkamins Feb 25, 2021
03a3221
fixes after code review
bkamins Feb 27, 2021
0b8ba4d
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 27, 2021
3a92892
Update src/PooledArrays.jl
bkamins Feb 27, 2021
49afb14
Merge remote-tracking branch 'origin/bk/pool_sharing' into bk/pool_sh…
bkamins Feb 27, 2021
3eb65e7
Update src/PooledArrays.jl
bkamins Feb 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 85 additions & 36 deletions src/PooledArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import DataAPI

export PooledArray, PooledVector, PooledMatrix

# TODO:
# 1. review whole code because we changed constructor
# 2. implement compress! function that in place replaces refarray, invpool and pool dropping unused levels

##############################################################################
##
## PooledArray type definition
Expand All @@ -24,22 +28,28 @@ function _invert(d::Dict{K,V}) where {K,V}
for (k, v) in d
d1[v] = k
end
d1
return d1
end

mutable struct PooledArray{T, R<:Integer, N, RA} <: AbstractArray{T, N}
refs::RA
pool::Vector{T}
invpool::Dict{T,R}

function PooledArray(rs::RefArray{RA},
invpool::Dict{T, R},
pool=_invert(invpool)) where {T,R,N,RA<:AbstractArray{R, N}}
cow::Bool
lock::Ref{Threads.ReentrantLock}

function PooledArray(rs::RefArray{RA}, invpool::Dict{T, R}, pool::Vector{T}, cow::Bool,
lock::Ref{Threads.ReentrantLock}) where {T,R,N,RA<:AbstractArray{R, N}}
# this is a quick but incomplete consistency check
if length(pool) != length(invpool)
throw(ArgumentError("inconsistent pool and invpool"))
end
# refs mustn't overflow pool
if length(rs.a) > 0 && maximum(rs.a) > length(invpool)
minref, maxref = extrema(rs.a)
if length(rs.a) > 0 && (minref < 1 || maxref > length(invpool))
throw(ArgumentError("Reference array points beyond the end of the pool"))
end
new{T,R,N,RA}(rs.a,pool,invpool)
new{T,R,N,RA}(rs.a,pool,invpool, cow, lock)
end
end
const PooledVector{T,R} = PooledArray{T,R,1}
Expand All @@ -60,12 +70,17 @@ const PooledMatrix{T,R} = PooledArray{T,R,2}
##############################################################################

# Echo inner constructor as an outer constructor
function PooledArray(refs::RefArray{R}, invpool::Dict{T,R}, pool=_invert(invpool)) where {T,R}
nalimilan marked this conversation as resolved.
Show resolved Hide resolved
PooledArray{T,eltype(R),ndims(R),R}(refs, invpool, pool)
PooledArray(refs::RefArray{R}, invpool::Dict{T,R}, pool::Vector{T}, cow::Bool,
lock::Ref{Threads.ReentrantLock}) where {T,R} =
PooledArray{T,eltype(R),ndims(R),R}(refs, invpool, pool, cow, lock)
bkamins marked this conversation as resolved.
Show resolved Hide resolved

function PooledArray(d::PooledArray{T,R}) where {T,R}
Threads.lock(d.lock)
d.cow = true
Threads.unlock(d.lock)
bkamins marked this conversation as resolved.
Show resolved Hide resolved
return PooledArray(RefArray(RefArray(copy(d.refs.a)), d.invpool, d.pool, true)
bkamins marked this conversation as resolved.
Show resolved Hide resolved
end

PooledArray(d::PooledArray) = copy(d)
bkamins marked this conversation as resolved.
Show resolved Hide resolved

function _label(xs::AbstractArray,
::Type{T}=eltype(xs),
::Type{I}=DEFAULT_POOLED_REF_TYPE,
Expand Down Expand Up @@ -132,19 +147,19 @@ function PooledArray{T}(d::AbstractArray, r::Type{R}) where {T,R<:Integer}
end

# Assertions are needed since _label is not type stable
PooledArray(RefArray(refs::Vector{R}), invpool::Dict{T,R}, pool)
return PooledArray(RefArray(refs::Vector{R}), invpool::Dict{T,R}, pool, false,
Ref{Threads.ReentrantLock()})
end

function PooledArray{T}(d::AbstractArray; signed::Bool=false, compress::Bool=false) where {T}
R = signed ? (compress ? Int8 : DEFAULT_SIGNED_REF_TYPE) : (compress ? UInt8 : DEFAULT_POOLED_REF_TYPE)
refs, invpool, pool = _label(d, T, R)
PooledArray(RefArray(refs), invpool, pool)
return PooledArray(RefArray(refs), invpool, pool, false, Ref{Threads.ReentrantLock()})
end

PooledArray(d::AbstractArray{T}, r::Type) where {T} = PooledArray{T}(d, r)
function PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T}
PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T} =
PooledArray{T}(d, signed=signed, compress=compress)
end

# Construct an empty PooledVector of a specific type
PooledArray(t::Type) = PooledArray(Array(t,0))
Expand All @@ -165,31 +180,39 @@ Base.size(pa::PooledArray) = size(pa.refs)
Base.length(pa::PooledArray) = length(pa.refs)
Base.lastindex(pa::PooledArray) = lastindex(pa.refs)

Base.copy(pa::PooledArray) = PooledArray(RefArray(copy(pa.refs)), copy(pa.invpool))
# TODO: Implement copy_to()
Base.copy(pa::PooledArray) =
return PooledArray(RefArray(copy(pa.refs)), pa.invpool, pa.pool, true, pa.lock)

# TODO: Implement copy! and copyto! taking into account when pool sharing should happen
# the idea is that if the target is PooledArray and it has an empty pool
# instead of creating the pool from scratch we can do pool sharing

function Base.resize!(pa::PooledArray{T,R,1}, n::Integer) where {T,R}
oldn = length(pa.refs)
resize!(pa.refs, n)
pa.refs[oldn+1:n] .= zero(R)
pa
return pa
end

Base.reverse(x::PooledArray) = PooledArray(RefArray(reverse(x.refs)), x.invpool)
function Base.reverse(x::PooledArray)
Threads.lock(x.lock)
x.cow = true
Threads.unlock(x.lock)
PooledArray(RefArray(reverse(x.refs)), x.invpool, x.pool, true, x.lock)
end

function Base.permute!!(x::PooledArray, p::AbstractVector{T}) where T<:Integer
Base.permute!!(x.refs, p)
x
return x
end

function Base.invpermute!!(x::PooledArray, p::AbstractVector{T}) where T<:Integer
Base.invpermute!!(x.refs, p)
x
return x
end

function Base.similar(pa::PooledArray{T,R}, S::Type, dims::Dims) where {T,R}
Base.similar(pa::PooledArray{T,R}, S::Type, dims::Dims) where {T,R} =
PooledArray(RefArray(zeros(R, dims)), Dict{S,R}())
end

Base.findall(pdv::PooledVector{Bool}) = findall(convert(Vector{Bool}, pdv))

Expand Down Expand Up @@ -224,7 +247,8 @@ function Base.map(f, x::PooledArray{T,R}) where {T,R<:Integer}
newinvpool = Dict(zip(map(f, ks), vs))
refarray = copy(x.refs)
end
PooledArray(RefArray(refarray), newinvpool)
return PooledArray(RefArray(refarray), newinvpool, _invert(newinvpool), false,
Ref(Threads.ReentrantLock()))
end

##############################################################################
Expand Down Expand Up @@ -288,6 +312,8 @@ Base.sort(pa::PooledArray; kw...) = pa[sortperm(pa; kw...)]
##
##############################################################################

# TODO: fix conversions to correctly handle cow and lock

Base.convert(::Type{PooledArray{S,R1,N}}, pa::PooledArray{T,R2,N}) where {S,T,R1<:Integer,R2<:Integer,N} =
PooledArray(RefArray(convert(Array{R1,N}, pa.refs)), convert(Dict{S,R1}, pa.invpool))
Base.convert(::Type{PooledArray{S,R,N}}, pa::PooledArray{T,R,N}) where {S,T,R<:Integer,N} =
Expand Down Expand Up @@ -342,15 +368,27 @@ Base.@propagate_inbounds function Base.isassigned(pa::PooledArray, I::Int...)
end

# Vector case
Base.@propagate_inbounds function Base.getindex(A::PooledArray, I::Union{Real,AbstractVector}...)
PooledArray(RefArray(getindex(A.refs, I...)), copy(A.invpool))
function Base.@propagate_inbounds Base.getindex(A::PooledArray, I::Union{Real,AbstractVector}...)
Threads.lock(A.lock)
A.cow = true
Threads.lock(A.unlock)
return PooledArray(RefArray(getindex(A.refs, I...)), A.invpool, A.pool, true, A.lock)
end

# Dispatch our implementation for these cases instead of Base
Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractVector) =
PooledArray(RefArray(getindex(A.refs, I)), copy(A.invpool))
Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractArray) =
PooledArray(RefArray(getindex(A.refs, I)), copy(A.invpool))
function Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractVector)
Threads.lock(A.lock)
A.cow = true
Threads.lock(A.unlock)
return PooledArray(RefArray(getindex(A.refs, I)), A.invpool, A.pool, true, A.lock)
end

function Base.@propagate_inbounds Base.getindex(A::PooledArray, I::AbstractArray)
Threads.lock(A.lock)
A.cow = true
Threads.lock(A.unlock)
return PooledArray(RefArray(getindex(A.refs, I)), A.invpool, A.pool, true, A.lock)
end

##############################################################################
##
Expand All @@ -368,7 +406,7 @@ function getpoolidx(pa::PooledArray{T,R}, val::Any) where {T,R}
end

function unsafe_pool_push!(pa::PooledArray{T,R}, val) where {T,R}
_pool_idx = length(pa.pool)+1
_pool_idx = length(pa.pool) + 1
if _pool_idx > typemax(R)
throw(ErrorException(string(
"You're using a PooledArray with ref type $R, which can only hold $(Int(typemax(R))) values,\n",
Expand All @@ -377,6 +415,15 @@ function unsafe_pool_push!(pa::PooledArray{T,R}, val) where {T,R}
)))
end
pool_idx = convert(R, _pool_idx)
if pa.cow
l = pa.lock
Threads.lock(l)
pa.invpool = copy(pa.invpool)
pa.pool = copy(pa.pool)
pa.cow = false
bkamins marked this conversation as resolved.
Show resolved Hide resolved
pa.lock = Threads.ReentrantLock()
Threads.unlock(l)
end
pa.invpool[val] = pool_idx
push!(pa.pool, val)
pool_idx
Expand Down Expand Up @@ -420,22 +467,24 @@ Base.empty!(pv::PooledVector) = (empty!(pv.refs); pv)

Base.deleteat!(pv::PooledVector, inds) = (deleteat!(pv.refs, inds); pv)

function _vcat!(c,a,b)
function _vcat!(c, a, b)
copyto!(c, 1, a, 1, length(a))
copyto!(c, length(a)+1, b, 1, length(b))
return copyto!(c, length(a)+1, b, 1, length(b))
end


function Base.vcat(a::PooledArray{<:Any, <:Integer, 1}, b::AbstractArray{<:Any, 1})
output = similar(b, promote_type(eltype(a), eltype(b)), length(b) + length(a))
_vcat!(output, a, b)
return _vcat!(output, a, b)
end

function Base.vcat(a::AbstractArray{<:Any, 1}, b::PooledArray{<:Any, <:Integer, 1})
output = similar(a, promote_type(eltype(a), eltype(b)), length(b) + length(a))
_vcat!(output, a, b)
return _vcat!(output, a, b)
end

# TODO: rethink if this cannot be made more efficient in some cases when we can just copy
# invpool and pool of the longer array instead of re-creating them

function Base.vcat(a::PooledArray{T, <:Integer, 1}, b::PooledArray{S, <:Integer, 1}) where {T, S}
ap = a.invpool
bp = b.invpool
Expand Down