Skip to content

Commit

Permalink
Separate socket locks into send & recv locks
Browse files Browse the repository at this point in the history
  • Loading branch information
halleysfifthinc committed Dec 19, 2024
1 parent 7d1030e commit 03e1056
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ JSON = "0.18,0.19,0.20,0.21,1"
MbedTLS = "0.5,0.6,0.7,1"
SoftGlobalScope = "1"
ZMQ = "1"
julia = "1.6"
julia = "1.10"
8 changes: 5 additions & 3 deletions src/init.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const heartbeat = Ref{Socket}()
const profile = Dict{String,Any}()
const read_stdout = Ref{Base.PipeEndpoint}()
const read_stderr = Ref{Base.PipeEndpoint}()
const socket_locks = Dict{Socket,ReentrantLock}()
const socket_locks_recv = Dict{Socket,ReentrantLock}()
const socket_locks_send = Dict{Socket,ReentrantLock}()

# needed for executing pkg commands on earlier Julia versions
@static if VERSION < v"1.11"
Expand Down Expand Up @@ -97,8 +98,9 @@ function init(args)

# associate a lock with each socket so that multi-part messages
# on a given socket don't get inter-mingled between tasks.
for s in (publish[], raw_input[], requests[], control[], heartbeat[])
socket_locks[s] = ReentrantLock()
for s in (publish[], raw_input[], requests[], control[])
socket_locks_send[s] = ReentrantLock()
socket_locks_recv[s] = ReentrantLock()
end

start_heartbeat(heartbeat[])
Expand Down
10 changes: 2 additions & 8 deletions src/msg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ function show(io::IO, msg::Msg)
end

function send_ipython(socket, m::Msg)
lock(socket_locks[socket])
try
@lock socket_locks_send[socket] begin
@vprintln("SENDING ", m)
for i in m.idents
send(socket, i, more=true)
Expand All @@ -57,14 +56,11 @@ function send_ipython(socket, m::Msg)
send(socket, parent_header, more=true)
send(socket, metadata, more=true)
send(socket, content)
finally
unlock(socket_locks[socket])
end
end

function recv_ipython(socket)
lock(socket_locks[socket])
try
@lock socket_locks_recv[socket] begin
idents = String[]
s = recv(socket, String)
@vprintln("got msg part $s")
Expand All @@ -85,8 +81,6 @@ function recv_ipython(socket)
m = Msg(idents, JSON.parse(header), JSON.parse(content), JSON.parse(parent_header), JSON.parse(metadata))
@vprintln("RECEIVED $m")
return m
finally
unlock(socket_locks[socket])
end
end

Expand Down

0 comments on commit 03e1056

Please sign in to comment.