Skip to content

Commit

Permalink
add pure kwarg to map (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkamins authored Sep 1, 2021
1 parent 095ea81 commit f87b540
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "PooledArrays"
uuid = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
version = "1.2.1"
version = "1.3.0"

[deps]
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
Expand Down
93 changes: 81 additions & 12 deletions src/PooledArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ mutable struct PooledArray{T, R<:Integer, N, RA} <: AbstractArray{T, N}
function PooledArray{T,R,N,RA}(rs::RefArray{RA}, invpool::Dict{T, R},
pool::Vector{T}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic{Int}(1)) where {T,R,N,RA<:AbstractArray{R, N}}
# we currently support only 1-based indexing for refs
# TODO: change to Base.require_one_based_indexing after we drop Julia 1.0 support
for ax in axes(rs.a)
if first(ax) != 1
throw(ArgumentError("offset arrays are not supported but got an array with index other than 1"))
end
end

# this is a quick but incomplete consistency check
if length(pool) != length(invpool)
throw(ArgumentError("inconsistent pool and invpool"))
Expand Down Expand Up @@ -76,7 +84,7 @@ const PooledArrOrSub = Union{SubArray{T, N, <:PooledArray{T, R}},
##############################################################################

# Echo inner constructor as an outer constructor
PooledArray(refs::RefArray{RA}, invpool::Dict{T,R}, pool::Vector{T}=_invert(invpool),
@inline PooledArray(refs::RefArray{RA}, invpool::Dict{T,R}, pool::Vector{T}=_invert(invpool),
refcount::Threads.Atomic{Int}=Threads.Atomic{Int}(1)) where {T,R,RA<:AbstractArray{R}} =
PooledArray{T,R,ndims(RA),RA}(refs, invpool, pool, refcount)

Expand All @@ -89,7 +97,7 @@ function _our_copy(x::SubArray{<:Any, 0})
return y
end

function PooledArray(d::PooledArrOrSub)
@inline function PooledArray(d::PooledArrOrSub)
Threads.atomic_add!(refcount(d), 1)
return PooledArray(RefArray(_our_copy(DataAPI.refarray(d))),
DataAPI.invrefpool(d), DataAPI.refpool(d), refcount(d))
Expand Down Expand Up @@ -131,6 +139,7 @@ _widen(::Type{UInt32}) = UInt64
_widen(::Type{Int8}) = Int16
_widen(::Type{Int16}) = Int32
_widen(::Type{Int32}) = Int64

# Constructor from array, invpool, and ref type

"""
Expand All @@ -139,7 +148,8 @@ _widen(::Type{Int32}) = Int64
Freshly allocate `PooledArray` using the given array as a source where each
element will be referenced as an integer of the given type.
If `reftype` is not specified, Boolean keyword arguments `signed` and `compress`
If `reftype` is not specified then `PooledArray` constructor is not type stable.
In this case Boolean keyword arguments `signed` and `compress`
determine the type of integer references. By default (`signed=false`), unsigned integers
are used, as they have a greater range.
However, the Arrow standard at https://arrow.apache.org/, as implemented in
Expand All @@ -162,7 +172,7 @@ if all values already exist in the pool.
"""
PooledArray

function PooledArray{T}(d::AbstractArray, r::Type{R}) where {T,R<:Integer}
@inline function PooledArray{T}(d::AbstractArray, r::Type{R}) where {T,R<:Integer}
refs, invpool, pool = _label(d, T, R)

if length(invpool) > typemax(R)
Expand All @@ -173,19 +183,19 @@ function PooledArray{T}(d::AbstractArray, r::Type{R}) where {T,R<:Integer}
return PooledArray(RefArray(refs::Vector{R}), invpool::Dict{T,R}, pool)
end

function PooledArray{T}(d::AbstractArray; signed::Bool=false, compress::Bool=false) where {T}
@inline function PooledArray{T}(d::AbstractArray; signed::Bool=false, compress::Bool=false) where {T}
R = signed ? (compress ? Int8 : DEFAULT_SIGNED_REF_TYPE) : (compress ? UInt8 : DEFAULT_POOLED_REF_TYPE)
refs, invpool, pool = _label(d, T, R)
return PooledArray(RefArray(refs), invpool, pool)
end

PooledArray(d::AbstractArray{T}, r::Type) where {T} = PooledArray{T}(d, r)
PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T} =
@inline PooledArray(d::AbstractArray{T}, r::Type) where {T} = PooledArray{T}(d, r)
@inline PooledArray(d::AbstractArray{T}; signed::Bool=false, compress::Bool=false) where {T} =
PooledArray{T}(d, signed=signed, compress=compress)

# Construct an empty PooledVector of a specific type
PooledArray(t::Type) = PooledArray(Array(t,0))
PooledArray(t::Type, r::Type) = PooledArray(Array(t,0), r)
@inline PooledArray(t::Type) = PooledArray(Array(t,0))
@inline PooledArray(t::Type, r::Type) = PooledArray(Array(t,0), r)

##############################################################################
##
Expand Down Expand Up @@ -304,7 +314,66 @@ Base.findall(pdv::PooledVector{Bool}) = findall(convert(Vector{Bool}, pdv))
##
##############################################################################

function Base.map(f, x::PooledArray{T,R}) where {T,R<:Integer}
"""
map(f, x::PooledArray; pure::Bool=false)
Transform `PooledArray` `x` by applying `f` to each element.
If `pure=true` then `f` is applied to each element of pool of `x`
exactly once (even if some elements in pool are not present it `x`).
This will typically be much faster when the proportion of unique values
in `x` is small.
If `pure=false`, the returned array will use the same reference type
as `x`, or `Int` if the number of unique values in the result is too large
to fit in that type.
"""
function Base.map(f, x::PooledArray{<:Any, R, N, RA}; pure::Bool=false)::Union{PooledArray{<:Any, R, N, RA},
PooledArray{<:Any, Int, N,
typeof(similar(x.refs, Int, ntuple(i -> 0, ndims(x.refs))))}} where {R, N, RA}
pure && return _map_pure(f, x)
length(x) == 0 && return PooledArray([f(v) for v in x])
v1 = f(x[1])
invpool = Dict(v1 => one(eltype(x.refs)))
pool = [v1]
labels = similar(x.refs)
labels[1] = 1
nlabels = 1
return _map_notpure(f, x, 2, invpool, pool, labels, nlabels)
end

function _map_notpure(f, xs::PooledArray, start,
invpool::Dict{T,I}, pool::Vector{T},
labels::AbstractArray{I}, nlabels::Int) where {T, I<:Integer}
for i in start:length(xs)
vi = f(xs[i])
lbl = get(invpool, vi, zero(I))
if lbl != zero(I)
labels[i] = lbl
else
if nlabels == typemax(I) || !(vi isa T)
I2 = nlabels == typemax(I) ? Int : I
T2 = vi isa T ? T : Base.promote_typejoin(T, typeof(vi))
nlabels += 1
invpool2 = convert(Dict{T2, I2}, invpool)
invpool2[vi] = nlabels
pool2 = convert(Vector{T2}, pool)
push!(pool2, vi)
labels2 = convert(AbstractArray{I2}, labels)
labels2[i] = nlabels
return _map_notpure(f, xs, i + 1, invpool2, pool2,
labels2, nlabels)
end
nlabels += 1
labels[i] = nlabels
invpool[vi] = nlabels
push!(pool, vi)
end
end
return PooledArray(RefArray(labels), invpool, pool)
end

function _map_pure(f, x::PooledArray)
ks = collect(keys(x.invpool))
vs = collect(values(x.invpool))
ks1 = map(f, ks)
Expand Down Expand Up @@ -601,14 +670,14 @@ _perm(o::F, z::V) where {F, V} = Base.Order.Perm{F, V}(o, z)

Base.Order.Perm(o::Base.Order.ForwardOrdering, y::PooledArray) = _perm(o, fast_sortable(y))

function Base.repeat(x::PooledArray, m::Integer...)
function Base.repeat(x::PooledArray, m::Integer...)
Threads.atomic_add!(x.refcount, 1)
PooledArray(RefArray(repeat(x.refs, m...)), x.invpool, x.pool, x.refcount)
end

function Base.repeat(x::PooledArray; inner = nothing, outer = nothing)
Threads.atomic_add!(x.refcount, 1)
PooledArray(RefArray(repeat(x.refs; inner = inner, outer = outer)),
PooledArray(RefArray(repeat(x.refs; inner = inner, outer = outer)),
x.invpool, x.pool, x.refcount)
end

Expand Down
69 changes: 69 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ end
@test PooledArrays.fast_sortable(v3) == PooledArray([1, 3, 2, 4])
@test isbitstype(eltype(PooledArrays.fast_sortable(v3)))
Base.Order.Perm(Base.Order.Forward, v3).data == PooledArray([1, 3, 2, 4])

for T in (Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64)
@inferred PooledArray([1, 2, 3], T)
end
for signed in (true, false), compress in (true, false)
@test_throws ErrorException @inferred PooledArray([1, 2, 3],
signed=signed,
compress=compress)
end
end

@testset "pool non-copying constructor and copy tests" begin
Expand Down Expand Up @@ -500,3 +509,63 @@ end
pa2 = repeat(pa1, inner = (2, 1))
@test pa2 == [1 2; 1 2; 3 4; 3 4]
end

@testset "map pure tests" begin
x = PooledArray([1, 2, 3])
x[3] = 1
y = map(-, x, pure=true)
@test refpool(y) == [-1, -2, -3]
@test y == [-1, -2, -1]

y = map(-, x)
@test refpool(y) == [-1, -2]
@test y == [-1, -2, -1]

function f()
i = Ref(0)
return x -> (i[] -= 1; i[])
end

# the order is strange as we iterate invpool which is a Dict
# and it depends on the version of Julia
y = map(f(), x, pure=true)
d = Dict(Set(1:3) .=> -1:-1:-3)
@test refpool(y) == [d[i] for i in 1:3]
@test y == [d[v] for v in x]

y = map(f(), x)
@test refpool(y) == [-1, -2, -3]
@test y == [-1, -2, -3]

x = PooledArray([1, missing, 2])
y = map(identity, x)
@test isequal(y, [1, missing, 2])
@test typeof(y) === PooledVector{Union{Missing, Int}, UInt32, Vector{UInt32}}

x = PooledArray([1, missing, 2], signed=true, compress=true)
y = map(identity, x)
@test isequal(y, [1, missing, 2])
@test typeof(y) === PooledVector{Union{Missing, Int}, Int8, Vector{Int8}}

x = PooledArray(fill(1, 200), signed=true, compress=true)
y = map(f(), x)
@test y == -1:-1:-200
@test typeof(y) === PooledVector{Int, Int, Vector{Int}}

x = PooledArray(reshape(fill(1, 200), 2, :), signed=true, compress=true)
y = map(f(), x)
@test y == reshape(-1:-1:-200, 2, :)
@test typeof(y) === PooledMatrix{Int, Int, Matrix{Int}}

x = PooledArray(fill("a"), signed=true, compress=true)
y = map(f(), x)
@test y == fill(-1)
@test typeof(y) === PooledArray{Int, Int8, 0, Array{Int8, 0}}

@static if VERSION >= v"1.6"
for signed in (true, false), compress in (true, false), len in (1, 100, 1000)
x = PooledArray(fill(1, len), signed=signed, compress=compress)
@inferred PooledVector{Int, Int, Vector{Int}} map(identity, x)
end
end
end

2 comments on commit f87b540

@bkamins
Copy link
Member Author

@bkamins bkamins commented on f87b540 Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/43931

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.3.0 -m "<description of version>" f87b5409e0f214cde8a2a8902f40e5eaf8e97e28
git push origin v1.3.0

Please sign in to comment.