Skip to content

Commit

Permalink
refactored message processing [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 9, 2015
1 parent 70857c4 commit b5800db
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 165 deletions.
42 changes: 16 additions & 26 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@ type Channel{T, Many} <: AbstractChannel{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot

function Channel(Ty::Type, szp1::Int, sz_max::Int)
if sz_max==1
store = ChannelDataSingle{Ty}(false, Nullable{T}())
else
store = ChannelDataMultiple{Ty}(Array(Ty, szp1), szp1, sz_max, 1, 1)
end
new(get_next_channel_id(), store, Condition(), Condition())
end
Channel(store) = new(get_next_channel_id(), store, Condition(), Condition())
end

let next_channel_id=1
Expand All @@ -47,17 +40,14 @@ let next_channel_id=1
end
end

Channel() = Channel(1)
Channel() = Channel(Any)
Channel(T::Type) = Channel{T, false}(ChannelDataSingle{T}(false, Nullable{T}()))

Channel(sz::Int) = Channel(Any, sz)
Channel(T::Type) = Channel(T, 1)
function Channel(T::Type, sz::Int)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
csz = sz > 32 ? 32 : sz
if sz_max == 1
Channel{T, false}(T, csz+1, sz_max)
else
Channel{T, true}(T, csz+1, sz_max)
end
Channel{T, true}(ChannelDataMultiple{T}(Array(T, csz+1), csz+1, sz_max, 1, 1))
end

const map_all_channels = Dict{Int, AbstractChannel}()
Expand All @@ -69,12 +59,12 @@ channel_at(pid::Int=myid(), T::Type=Any, sz::Int=1) = remotecall_fetch(pid, crea
close_channel(cid::Int) = (delete!(map_all_channels, cid); nothing)
close(addr::ChannelAddr) = remotecall_fetch(addr.pid, close_channel, addr.cid)

# TODO Implement
# put!(addr::ChannelAddr, v)
# take!(addr::ChannelAddr)
# fetch(addr::ChannelAddr)
# isready(addr::ChannelAddr)
# wait(addr::ChannelAddr)
# TODO faster versions of below
put!(addr::ChannelAddr, v) = remotecall_fetch(addr.pid, (cid, v) -> put!(map_all_channels[cid], v), addr.cid, v)
take!(addr::ChannelAddr) = remotecall_fetch(addr.pid, cid -> take!(map_all_channels[cid], v), addr.cid)
fetch(addr::ChannelAddr) = remotecall_fetch(addr.pid, cid -> fetch(map_all_channels[cid], v), addr.cid)
isready(addr::ChannelAddr) = remotecall_fetch(addr.pid, cid -> isready(map_all_channels[cid], v), addr.cid)
wait(addr::ChannelAddr) = remotecall_fetch(addr.pid, cid -> wait(map_all_channels[cid], v), addr.cid)


function put!{T}(c::Channel{T, true}, v::T)
Expand Down Expand Up @@ -131,20 +121,20 @@ end
fetch(store::ChannelDataSingle) = get(store.data)
fetch(store::ChannelDataMultiple) = store.data[store.take_pos]

function take!(c::Channel)
function take!{T}(c::Channel{T})
while !isready(c)
wait(c.cond_take)
end
v = take!(c.store)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
v::T
end

take!(store::ChannelDataSingle) = (v=get(store.data, nothing); store.data=nothing; store.full=false; v)
function take!(store::ChannelDataMultiple)
take!{T}(store::ChannelDataSingle{T}) = (v=get(store.data); store.data=Nullable{T}(); store.full=false; v)
function take!{T}(store::ChannelDataMultiple{T})
v = store.data[store.take_pos]
store.take_pos = (store.take_pos == store.szp1 ? 1 : store.take_pos + 1)
v
v::T
end

isready{T}(c::Channel{T, false}) = c.store.full
Expand Down
Loading

0 comments on commit b5800db

Please sign in to comment.