-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MemPool.datastore
memory utilization keeps increasing when using DTables with multiple processes
#60
Comments
Do you have a reproducer for this one, just to help me debug it reliably? |
I am working on a better reproducer, but I believe the behavior I pointed out in JuliaParallel/Dagger.jl#445 (related to JuliaParallel/Dagger.jl#438) is essentially the same I am reporting here---all those |
@jpsamaroo Here's a MWE that shows the ever-growing memory utilization. It took me running using Distributed
addprocs(5 - nprocs(); exeflags = "--heap-size-hint=3G")
@everywhere using DTables, DataFrames, CSV
@everywhere const DT = Ref{DTable}()
@everywhere mutable struct DTableCols
key_names
value_names
keys
values
end
function main()
remotecall_fetch(query, 2)
end
@everywhere function query()
dt1 = load_dt()
dt2 = add_value_col!(dt1)
dt3 = update_value_col!(dt2)
@info "" length(dt3)
dt4 = calc_value_cols(dt3)
dt5 = select(dt4, [6; 12; 103:113]...; copycols = false)
dt_agg = aggregate_dt(dt5)
return fetch(dt_agg)
end
@everywhere function load_dt()
isassigned(DT) && return DT[]
file = "file.csv"
GC.enable(false)
dt = DTable(x -> CSV.File(x), [file]; tabletype = DataFrame)
GC.enable(true)
DT[] = dt
return dt
end
@everywhere function add_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:102)
dt_cols.value_names = [dt_cols.value_names; "RAND"]
dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1])))
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function create_dt_cols(dt, key_cols, value_cols)
df = fetch(dt)
key_names = names(df)[key_cols]
value_names = names(df)[value_cols]
keys = [df[!, i] for i in key_cols]
values = [df[!, i] for i in value_cols]
return DTableCols(key_names, value_names, keys, values)
end
@everywhere function create_dt_from_cols(dt_cols; is_sorted = false)
df = DataFrame(
(dt_cols.key_names .=> dt_cols.keys)...,
(dt_cols.value_names .=> dt_cols.values)...;
copycols = false,
)
is_sorted || sort!(df)
return DTable(df)
end
@everywhere function update_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
dt_cols.values = (
dt_cols.values[1:10]...,
rand(length(dt_cols.values[1])),
dt_cols.values[12:end]...,
)
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function calc_value_cols(dt)
newvals = Vector{Float64}[]
for i = 1:10
v = calc_new_value(dt, i)
push!(newvals, v)
end
return append_value_cols(dt, newvals)
end
@everywhere function calc_new_value(dt, i)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
return abs.(dt_cols.values[i])
end
@everywhere function append_value_cols(dt, newvals)
df = fetch(dt)
for (i, v) in enumerate(newvals)
setproperty!(df, "NEW$i", v)
end
return DTable(df)
end
@everywhere function aggregate_dt(dt)
key_names = [Symbol("6"), Symbol("12")]
gdt = groupby(fetch(dt), key_names)
gkeys = sort!(collect(keys(gdt)))
key_pairs = key_names .=> invert(gkeys)
value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)]
sums = fetch(reduce(+, gdt; cols = value_names))
sorted = sortperm(invert(sums[key_names]))
value_pairs = map(value_names) do value
value => sums[Symbol(:result_, value)][sorted]
end
return DTable(DataFrame(key_pairs..., value_pairs...))
end
@everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])]
@everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols)
NamedTuple(col => reduce(f, df[!, col]) for col in cols)
end
@everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols)
gkeys = keys(gdt)
dims = keys(gkeys[1])
merge(
NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims),
NamedTuple(
Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys]
for col in cols
),
)
end |
One thing I remembered was that when I was benchmarking dtables.jl around release time I had a really bad time with running it in wsl2. Let's keep this in mind when looking at this, Linux will behave differently for sure. I'll try to have a look at it this week |
I just ran the exact same code as in #61 (which in turn is the same as the MWE above but with a call to julia> include("mwe.jl"); for i = 1:100 (i % 10 == 0 && @show(i)); main() end
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 10
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 20
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
i = 30
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
From worker 2: ┌ Info:
From worker 2: └ length(dt3) = 233930
ERROR: On worker 2:
AssertionError: Failed to migrate 183.839 MiB for ref 1624
Stacktrace:
[1] #105
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:887
[2] with_lock
@ ~/.julia/packages/MemPool/l9nLj/src/lock.jl:80
[3] #sra_migrate!#103
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:849
[4] sra_migrate!
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:826 [inlined]
[5] write_to_device!
@ ~/.julia/packages/MemPool/l9nLj/src/storage.jl:817
[6] #poolset#160
@ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:386
[7] #tochunk#139
@ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:267
[8] tochunk (repeats 2 times)
@ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:259 [inlined]
[9] #DTable#1
@ ~/.julia/packages/DTables/BjdY2/src/table/dtable.jl:38
[10] DTable
@ ~/.julia/packages/DTables/BjdY2/src/table/dtable.jl:28
[11] #create_dt_from_cols#9
@ ~/tmp/mwe.jl:76
[12] create_dt_from_cols
@ ~/tmp/mwe.jl:68 [inlined]
[13] add_value_col!
@ ~/tmp/mwe.jl:53
[14] query
@ ~/tmp/mwe.jl:26
[15] #invokelatest#2
@ ./essentials.jl:819 [inlined]
[16] invokelatest
@ ./essentials.jl:816
[17] #110
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285
[18] run_work_thunk
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:70
[19] macro expansion
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/process_messages.jl:285 [inlined]
[20] #109
@ ./task.jl:514
Stacktrace:
[1] remotecall_fetch(::Function, ::Distributed.Worker; kwargs::Base.Pairs{Symbol, Union{}, Tuple{}, NamedTuple{(), Tuple{}}})
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:465
[2] remotecall_fetch(::Function, ::Distributed.Worker)
@ Distributed ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
[3] #remotecall_fetch#162
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[4] remotecall_fetch
@ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
[5] main
@ ~/tmp/mwe.jl:19 [inlined]
[6] top-level scope
@ ./REPL[1]:1 I wonder if this is essentially the same issue as the OP, where data is being kept longer than it should. Just in this case, instead of a process getting killed, MemPool errors because we end up exceeding the 20 GB of disk space I said MemPool could use. If it is the same issue, then WSL 2 memory management shouldn't have anything to do with this. |
I ran the MWE (with and without enabling disk caching) on Windows (not WSL 2).
So there definitely is a difference in behavior between WSL 2 and Windows. |
I did get it reproduced twice with the MemPool fix from the other issue julia> d = DTable((a=rand(Int, N1),), N1 ÷ 100)
ERROR: AssertionError: Failed to migrate 10.240 MiB for ref 5646
Stacktrace:
[1] (::MemPool.var"#105#113"{Bool, MemPool.SimpleRecencyAllocator, MemPool.RefState, Int64})()
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:920
[2] with_lock(f::MemPool.var"#105#113"{Bool, MemPool.SimpleRecencyAllocator, MemPool.RefState, Int64}, lock::MemPool.NonReentrantLock, cond::Bool)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:80
[3] sra_migrate!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64, to_mem::Missing; read::Bool, locked::Bool)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:882
[4] sra_migrate!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64, to_mem::Missing)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:859 [inlined]
[5] write_to_device!(sra::MemPool.SimpleRecencyAllocator, state::MemPool.RefState, ref_id::Int64)
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:850
[6]
@ MemPool C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:386
[7] tochunk(x::@NamedTuple{a::Vector{Int64}}, proc::OSProc, scope::AnyScope; persist::Bool, cache::Bool, device::Nothing, kwargs::@Kwargs{})
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:267
[8] tochunk
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:259 [inlined]
[9] tochunk(x::@NamedTuple{a::Vector{Int64}})
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:259
[10] DTable(table::@NamedTuple{a::Vector{Int64}}, chunksize::Int64; tabletype::Nothing, interpartition_merges::Bool)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:122
[11] DTable(table::@NamedTuple{a::Vector{Int64}}, chunksize::Int64)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:61
[12] top-level scope
@ REPL[56]:1
Some type information was truncated. Use `show(err)` to see complete types.
julia> map(x -> (r=x.a + 1,), d) |> fetch
ERROR: ThunkFailedException:
Root Exception Type: CapturedException
Root Exception:
AssertionError: Failed to migrate 10.240 MiB for ref 5051
Stacktrace:
[1] #105
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:920
[2] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:83
[3] #sra_migrate!#103
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:882
[4] #120
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:1001
[5] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:80
[6] with_lock
@ C:\Users\krynjupc\.julia\dev\MemPool\src\lock.jl:78
[7] read_from_device
@ C:\Users\krynjupc\.julia\dev\MemPool\src\storage.jl:991 [inlined]
[8] _getlocal
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:433
[9] #174
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:425
[10] #invokelatest#2
@ .\essentials.jl:899
[11] invokelatest
@ .\essentials.jl:896
[12] #110
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:286
[13] run_work_thunk
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:70
[14] #109
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\process_messages.jl:286
Stacktrace:
[1] #remotecall_fetch#159
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:465
[2] remotecall_fetch
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:454
[3] remotecall_fetch
@ C:\Users\krynjupc\AppData\Local\Programs\Julia-1.11.0-DEV\share\julia\stdlib\v1.11\Distributed\src\remotecall.jl:492 [inlined]
[4] #173
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:424 [inlined]
[5] forwardkeyerror
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:409
[6] poolget
@ C:\Users\krynjupc\.julia\dev\MemPool\src\datastore.jl:423
[7] move
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:98
[8] move
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\chunks.jl:96
[9] #invokelatest#2
@ .\essentials.jl:899 [inlined]
[10] invokelatest
@ .\essentials.jl:896 [inlined]
[11] #154
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1475
Stacktrace:
[1] wait
@ .\task.jl:354 [inlined]
[2] fetch
@ .\task.jl:374 [inlined]
[3] fetch_report
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\util.jl:241
[4] do_task
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1502
[5] #132
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\sch\Sch.jl:1243
Root Thunk: Thunk(id=2224, #38(Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}},
UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), #129))
Inner Thunk: Thunk(id=2325, isnonempty(Thunk[2224](#38, Any[Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}}, UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), var"#129#130"()])))
This Thunk: Thunk(id=2325, isnonempty(Thunk[2224](#38, Any[Dagger.WeakChunk(1, 5051, WeakRef(Dagger.Chunk{@NamedTuple{a::Vector{Int64}}, DRef, OSProc, AnyScope}(@NamedTuple{a::Vector{Int64}}, UnitDomain(), DRef(1, 5051, 0x0000000000a3d738), OSProc(1), AnyScope(), false))), var"#129#130"()])))
Stacktrace:
[1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
@ Dagger C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:16
[2] fetch
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:11 [inlined]
[3] #fetch#75
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:58 [inlined]
[4] fetch
@ C:\Users\krynjupc\.julia\packages\Dagger\M13n0\src\eager_thunk.jl:54 [inlined]
[5] #10
@ C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:233 [inlined]
[6] filter(f::DTables.var"#10#13"{Vector{Dagger.EagerThunk}}, a::Vector{Tuple{Int64, Union{Dagger.EagerThunk, Dagger.Chunk}}})
@ Base .\array.jl:2673
[7] trim!(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:233
[8] trim(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:242
[9] retrieve_partitions
@ C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:179 [inlined]
[10] fetch(d::DTable)
@ DTables C:\Users\krynjupc\.julia\packages\DTables\BjdY2\src\table\dtable.jl:167
[11] |>(x::DTable, f::typeof(fetch))
@ Base .\operators.jl:917
[12] top-level scope
@ REPL[57]:1
|
reproducer, no files needed ENV["JULIA_MEMPOOL_EXPERIMENTAL_FANCY_ALLOCATOR"] = "true"
ENV["JULIA_MEMPOOL_EXPERIMENTAL_MEMORY_BOUND"] = string(2 * (2^30)) # 2GB
# ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"] = "C:\\Users\\krynjupc\\.mempool\\demo_session_$(rand(Int))"
using Distributed
@info(
"Execution environment details",
julia_version=VERSION,
n_workers=Distributed.nworkers(),
n_procs=Distributed.nprocs(),
n_threads=Threads.nthreads(),
)
function view_cache()
!isdir(ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"]) && return []
map(
x -> (basename(x), round(filesize(x) / 2^20, digits=2)),
readdir(ENV["JULIA_MEMPOOL_EXPERIMENTAL_DISK_CACHE"], join=true)
)
end
using DTables
DTables.enable_disk_caching!()
using MemPool
using Dagger
N1 = 2^27 # 1GB
d = DTable((a=rand(Int, N1),), N1 ÷ 100)
map(x -> (r=x.a + 1,), d) |> fetch
MemPool.GLOBAL_DEVICE[]
view_cache()
|
Can't reproduce with the fix JuliaData/MemPool.jl#74 Will cut a release soon |
I just tested the new releases of DTables.jl/Dagger.jl/MemPool.jl using the reproducer I mentioned above. Without disk caching enabled:
With
So, it looks like the issue is not entirely resolved yet. |
@StevenWhitaker when this occurs, how much does |
@jpsamaroo I ran the following code to grab the info you requested. Let me know if you need any other info. julia> include("mwe.jl"); for i = 1:200
totalmem = Base.format_bytes(Sys.total_physical_memory())
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
DTables.Dagger.MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes # Remove `device_size` when disk caching not enabled
@info "" i device_size worker_memusage totalmemusage totalmem
main()
end Without disk caching enabled on WSL 2:
With
So, it looks like the device is running out of memory, but why? |
Can you try throwing in some |
I added |
Any updates yet on this front? |
@StevenWhitaker can you try out JuliaData/MemPool.jl#75 and see if it delays the OOM further or eliminates it? You'll probably want to fiddle with the |
@jpsamaroo I tried using the default value of Error message
The error looks different, but it probably is the same error (i.e., OOM manager kills one of my Julia processes) because I notice one process is missing after the error. Oh, and now I see the Note that the error occurred sooner with 10 GB reserved. This is with disk caching disabled; I'm assuming JuliaData/MemPool.jl#75 does nothing about the disk caching errors I've also been experiencing. |
The error shown should be fixed by JuliaParallel/Dagger.jl#450 (comment), you need to be using Julia 1.11 with my Distributed PR at all times for reliable distributed computing. And yes, it doesn't yet handle disk caching, but I can add support for that (we'll nicely ask the LRU to swap more stuff to disk if it can). |
Oh, I thought JuliaLang/Distributed.jl#4 was only relevant if running code with multiple threads, but I'm running With disk caching enabled, as far as I can tell, my issue isn't running out of RAM, it's running out of allotted disk space. So the LRU needs to delete stuff that is no longer needed, but it seems like that isn't happening. |
My reproducer still fails. With JuliaLang/Distributed.jl#4 and JuliaData/MemPool.jl#75 and the most recent nightly build of Julia: `MemPool.MEM_RESERVED[]`: Default
`MemPool.MEM_RESERVED[]`: 2 GB
`MemPool.MEM_RESERVED[]`: 10 GB
I can try with a different Julia commit if you think the issue here is the particular commit I used. (And let me know if you have a commit in mind.) |
@jpsamaroo Happy Holidays! Any updates on this front? And to reiterate some questions I had:
Is my assumption wrong, i.e., even with one thread we need that Distributed.jl fix?
Any thoughts on this? |
I tried a couple of variations of my reproducer to collect a few more data points in case it helps with this issue: I realized that I accidentally had been using multiple threads on the worker processes (because I didn't realize the
I then decided to remove the two
I then tried wrapping the call to
There are no memory or disk caching problems when I don't add any processes, regardless of whether or not multiple threads are used. TL;DR Everything I tried still resulted in failure, except removing processes altogether. |
Happy Holidays!
Probably yes, as it may still be a race with multiple async tasks. I haven't really experienced that, but it can probably still occur.
I still need to implement this in JuliaData/MemPool.jl#75 - I have a long TODO list from before the holidays, so I'm slowly working down it. Thanks for you patience 😄 W.r.t the non-disk-caching OOMs, I've put together JuliaData/MemPool.jl#76, which together with JuliaData/MemPool.jl#75 and https://github.com/JuliaParallel/Dagger.jl/tree/jps/chained-dtors significantly reduces the amount of memory that Dagger keeps around, and also forces GC calls when we're running out of memory (tunable with the new I'll keep at this, but thank you for the detailed updates and patience while I work through these issues! |
Thanks for your work, and no worries about having a long TODO list! I just tried adding JuliaData/MemPool.jl#75 and https://github.com/JuliaParallel/Dagger.jl/tree/jps/chained-dtors to my environment, in conjunction with DTables v0.4.3, and DTables failed to precompile (segfault). I saw this on Julia 1.9.4 and 1.10.0. (But it precompiled fine on 1.11 (2024-01-11 nightly), even without your Distributed fix.) Any thoughts on why DTables would fail to precompile? |
Odd, can you provide a stacktrace of the segfault? |
In Julia 1.10.0, but it looked the same in Julia 1.9.4. This is the log when precompiling after updating packages (it's very long, so I had to truncate): After updating
And here's the segfault that comes when calling Segfault
|
Yeah I'm dealing with these errors too now that I've opened the PR. I'll get those fixed and then let you know when you can try it. |
@jpsamaroo I just saw JuliaLang/julia#40626. Does this issue affect Dagger.jl at all, i.e., could this issue be the cause of what I'm seeing with memory seeming like it doesn't get freed? |
Huh, I hadn't thought of that, but now that you mention it, every Dagger task does rely on Also, the |
@jpsamaroo I've been trying several different things, so here's a report of my findings.
Comparison to MemPool.jl`dtables.jl` (no significant changes to the original MWE I posted)
@everywhere using DTables, DataFrames, CSV
if @isdefined(USE_DISK_CACHING) && USE_DISK_CACHING
@info "disk caching enabled"
enable_disk_caching!(8, 20 * 2^10) # 8% max memory to lead to ~512 MiB per process to match custom code
else
@info "no disk caching"
end
@everywhere const DT = Ref{DTable}()
@everywhere mutable struct DTableCols
key_names
value_names
keys
values
end
function main()
remotecall_fetch(query, 2)
end
@everywhere function query()
dt1 = load_dt()
dt2 = add_value_col!(dt1)
dt3 = update_value_col!(dt2)
@info "" length(dt3)
dt4 = calc_value_cols(dt3)
dt5 = select(dt4, [6; 12; 103:113]...; copycols = false)
dt_agg = aggregate_dt(dt5)
return fetch(dt_agg)
end
@everywhere function load_dt()
isassigned(DT) && return DT[]
file = "file.csv"
dt = DTable(x -> CSV.File(x), [file]; tabletype = DataFrame)
DT[] = dt
return dt
end
@everywhere function add_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:102)
dt_cols.value_names = [dt_cols.value_names; "RAND"]
dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1])))
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function create_dt_cols(dt, key_cols, value_cols)
df = fetch(dt)
key_names = names(df)[key_cols]
value_names = names(df)[value_cols]
keys = [df[!, i] for i in key_cols]
values = [df[!, i] for i in value_cols]
return DTableCols(key_names, value_names, keys, values)
end
@everywhere function create_dt_from_cols(dt_cols; is_sorted = false)
df = DataFrame(
(dt_cols.key_names .=> dt_cols.keys)...,
(dt_cols.value_names .=> dt_cols.values)...;
copycols = false,
)
is_sorted || sort!(df)
return DTable(df)
end
@everywhere function update_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
dt_cols.values = (
dt_cols.values[1:10]...,
rand(length(dt_cols.values[1])),
dt_cols.values[12:end]...,
)
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function calc_value_cols(dt)
newvals = Vector{Float64}[]
for i = 1:10
v = calc_new_value(dt, i)
push!(newvals, v)
end
return append_value_cols(dt, newvals)
end
@everywhere function calc_new_value(dt, i)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
return abs.(dt_cols.values[i])
end
@everywhere function append_value_cols(dt, newvals)
df = fetch(dt)
for (i, v) in enumerate(newvals)
setproperty!(df, "NEW$i", v)
end
return DTable(df)
end
@everywhere function aggregate_dt(dt)
key_names = [Symbol("6"), Symbol("12")]
gdt = groupby(fetch(dt), key_names)
gkeys = sort!(collect(keys(gdt)))
key_pairs = key_names .=> invert(gkeys)
value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)]
sums = fetch(reduce(+, gdt; cols = value_names))
sorted = sortperm(invert(sums[key_names]))
value_pairs = map(value_names) do value
value => sums[Symbol(:result_, value)][sorted]
end
return DTable(DataFrame(key_pairs..., value_pairs...))
end
@everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])]
@everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols)
NamedTuple(col => reduce(f, df[!, col]) for col in cols)
end
@everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols)
gkeys = keys(gdt)
dims = keys(gkeys[1])
merge(
NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims),
NamedTuple(
Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys]
for col in cols
),
)
end `custom.jl` (same as `dtables.jl` except for the custom `struct`)
@everywhere using MemPool, DataFrames, CSV
if @isdefined(USE_DISK_CACHING) && USE_DISK_CACHING
@info "disk caching enabled"
@everywhere let
total_mem = Sys.total_memory() ÷ 2
# mem_per_proc = Int(total_mem ÷ nprocs()) # This is too much memory for testing!
mem_per_proc = 512 * 2^20
config = MemPool.DiskCacheConfig(; toggle = true, membound = mem_per_proc, diskbound = 20 * 2^30)
MemPool.setup_global_device!(config)
end
else
@info "no disk caching"
end
@everywhere struct DCTable
ref::DRef
DCTable(df::DataFrame) = new(poolset(df))
end
# Call `copy` to be a fairer comparison to DTables.jl's `fetch`.
# Code is much faster without `copy`!
@everywhere Base.fetch(dt::DCTable) = copy(poolget(dt.ref))
@everywhere Base.length(dt::DCTable) = nrow(fetch(dt))
@everywhere function DataFrames.select(dt::DCTable, args...; kwargs...)
df = fetch(dt)
selected = select(df, args...; kwargs...)
DCTable(selected)
end
@everywhere const DT = Ref{DCTable}()
@everywhere mutable struct DCTableCols
key_names
value_names
keys
values
end
function main()
remotecall_fetch(query, 2)
end
@everywhere function query()
dt1 = load_dt()
dt2 = add_value_col!(dt1)
dt3 = update_value_col!(dt2)
@info "" length(dt3)
dt4 = calc_value_cols(dt3)
dt5 = select(dt4, [6; 12; 103:113]...; copycols = false)
dt_agg = aggregate_dt(dt5)
return fetch(dt_agg)
end
@everywhere function load_dt()
isassigned(DT) && return DT[]
file = "file.csv"
df = CSV.read(file, DataFrame)
dt = DCTable(df)
DT[] = dt
return dt
end
@everywhere function add_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:102)
dt_cols.value_names = [dt_cols.value_names; "RAND"]
dt_cols.values = (dt_cols.values..., rand(length(dt_cols.values[1])))
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function create_dt_cols(dt, key_cols, value_cols)
df = fetch(dt)
key_names = names(df)[key_cols]
value_names = names(df)[value_cols]
keys = [df[!, i] for i in key_cols]
values = [df[!, i] for i in value_cols]
return DCTableCols(key_names, value_names, keys, values)
end
@everywhere function create_dt_from_cols(dt_cols; is_sorted = false)
df = DataFrame(
(dt_cols.key_names .=> dt_cols.keys)...,
(dt_cols.value_names .=> dt_cols.values)...;
copycols = false,
)
is_sorted || sort!(df)
return DCTable(df)
end
@everywhere function update_value_col!(dt)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
dt_cols.values = (
dt_cols.values[1:10]...,
rand(length(dt_cols.values[1])),
dt_cols.values[12:end]...,
)
return create_dt_from_cols(dt_cols; is_sorted = true)
end
@everywhere function calc_value_cols(dt)
newvals = Vector{Float64}[]
for i = 1:10
v = calc_new_value(dt, i)
push!(newvals, v)
end
return append_value_cols(dt, newvals)
end
@everywhere function calc_new_value(dt, i)
dt_cols = create_dt_cols(dt, 1:48, 49:103)
return abs.(dt_cols.values[i])
end
@everywhere function append_value_cols(dt, newvals)
df = fetch(dt)
for (i, v) in enumerate(newvals)
setproperty!(df, "NEW$i", v)
end
return DCTable(df)
end
@everywhere function aggregate_dt(dt)
key_names = [Symbol("6"), Symbol("12")]
gdt = groupby(fetch(dt), key_names)
gkeys = sort!(collect(keys(gdt)))
key_pairs = key_names .=> invert(gkeys)
value_names = [[Symbol("RAND")]; Symbol.("NEW", 1:10)]
sums = fetch(reduce(+, gdt; cols = value_names))
sorted = sortperm(invert(sums[key_names]))
value_pairs = map(value_names) do value
value => sums[Symbol(:result_, value)][sorted]
end
return DCTable(DataFrame(key_pairs..., value_pairs...))
end
@everywhere invert(x) = [[x[j][i] for j = 1:length(x)] for i = 1:length(x[1])]
@everywhere function Base.reduce(f, df::DataFrames.AbstractDataFrame; cols)
NamedTuple(col => reduce(f, df[!, col]) for col in cols)
end
@everywhere function Base.reduce(f, gdt::DataFrames.GroupedDataFrame; cols)
gkeys = keys(gdt)
dims = keys(gkeys[1])
merge(
NamedTuple(dim => getproperty.(gkeys, dim) for dim in dims),
NamedTuple(
Symbol(:result_, col) => [reduce(f, gdt[k]; cols = [col])[col] for k in gkeys]
for col in cols
),
)
end Results: For each of the following I started Julia 1.9.4 with
julia> include("dtables.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
@info "$i" worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ worker_memusage = "24.741 GiB"
└ totalmemusage = "28.720 GiB"
129.672974 seconds (9.29 M allocations: 590.653 MiB, 0.12% gc time, 2.99% compilation time: 22% of which was recompilation)
julia> include("custom.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
@info "$i" worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ worker_memusage = "9.517 GiB"
└ totalmemusage = "11.166 GiB"
19.154547 seconds (882.60 k allocations: 61.428 MiB, 2.22% compilation time)
julia> USE_DISK_CACHING = true; include("dtables.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
DTables.Dagger.MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes
@info "$i" device_size worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ device_size = "10.812 GiB"
│ worker_memusage = "8.309 GiB"
└ totalmemusage = "12.879 GiB"
130.644326 seconds (9.75 M allocations: 621.888 MiB, 0.16% gc time, 3.08% compilation time: 21% of which was recompilation)
julia> USE_DISK_CACHING = true; include("custom.jl"); @time for i = 1:50
memusage = map(procs()) do id
remotecall_fetch(id) do
parse(Int, split(read(`ps -p $(getpid()) -o rss`, String), "\n")[end-1]) * 1000
end
end
totalmemusage = Base.format_bytes(sum(memusage))
worker_memusage = Base.format_bytes(memusage[2])
device_size = remotecall_fetch(2) do
MemPool.GLOBAL_DEVICE[].device_size[]
end |> Base.format_bytes
@info "$i" device_size worker_memusage totalmemusage
main()
end
⋮
┌ Info: 50
│ device_size = "8.436 GiB"
│ worker_memusage = "6.284 GiB"
└ totalmemusage = "7.925 GiB"
39.369563 seconds (921.19 k allocations: 63.999 MiB, 0.03% gc time, 1.27% compilation time) Project
|
I'd like to try to reproduce this locally so I can figure out where Dagger is adding overwhelming overhead (which in general it should not, when working with sufficiently large files). Can you post your |
Here's a script that generates a .csv file that can reproduce the above behavior: using CSV, DataFrames, InlineStrings, Random
const NROWS = 233930
const NCOLS = 102
const ELTYPE = [Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, InlineStrings.String1, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, Int64, InlineStrings.String15, Int64, Int64, Int64, Int64, InlineStrings.String1, Int64, Int64, Int64, Int64, Int64]
const NUM_UNIQUE = [12, 1, 1, 1, 1, 3, 12, 2, 4, 1, 12, 9, 13, 32, 1292, 13, 32, 493, 2, 3, 3, 3, 3, 2, 3, 2, 367, 462, 8, 369, 192, 28, 28, 193, 43, 243, 243, 48871, 4, 8, 10, 2, 3, 3, 5, 3, 3, 5]
function generate()
Random.seed!(0)
input = map(1:NCOLS) do i
name = string(i)
if i <= length(ELTYPE)
if ELTYPE[i] isa AbstractString
# Oops, this branch is never taken, but the resulting file still reproduces the issue.
col = string.(rand(1:NUM_UNIQUE[i]-1, NROWS))
col[1:12] .= " "
return name => ELTYPE[i].(col)
else
col = rand(1:NUM_UNIQUE[i], NROWS)
return name => col
end
else
col = rand(NROWS)
return name => col
end
end
df = DataFrame(input)
sort!(@view(df[13:end, :]), 1:length(ELTYPE))
CSV.write("file.csv", df)
return df
end
generate() |
I have the following setup:
julia --project -t1 --heap-size-hint=3G
addprocs(4; exeflags = "--heap-size-hint=3G")
The actual query includes loading a table from a .csv file into a
DTable
(with aDataFrame
table type). Operations includeselect
ing columns,fetch
ing the table into aDataFrame
for adding/removing rows/columns and other processing as needed, and re-wrapping the table in aDTable
to later be processed further. At the end of processing, the result is returned as aDataFrame
.The .csv file contains a table with 233930 rows and 102 columns: 1 column of
InlineStrings.String15
, 2 columns ofInlineStrings.String1
, 45 columns ofInt64
, and 54 columns ofFloat64
.The issue: I noticed that if I keep running the same query repeatedly, the
MemPool.datastore
on worker 2 consumes more and more memory, as determined byEventually, the memory usage grows enough to cause my WSL 2 Linux OOM manager to kill worker 2, crashing my program.
Notably, I do not observe this growth in memory usage in the following scenarios:
addprocs
), orDataFrame
s exclusively (i.e., not using DTables.jl at all).I do observe this growth in memory usage in the following additional scenarios:
NamedTuple
as the table type for theDTable
s, orMemPool.datastore
on worker 1 (not worker 2) is what consumes more and more memory. However, I never ran into any issues with the OOM manager killing my processes.)I'm posting this issue in DTables.jl in case there's something DTables.jl is doing that somehow causes the MemPool.jl data store to keep references around longer than expected, but of course please transfer this issue to Dagger.jl or MemPool.jl as needed.
Please let me know if there is any other information that would help with finding the root cause of this issue.
The text was updated successfully, but these errors were encountered: