Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Channels #12042

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions base/REPL.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import Base:
Display,
display,
writemime,
AnyDict
AnyDict,
Channel

import ..LineEdit:
CompletionProvider,
Expand All @@ -36,8 +37,8 @@ abstract AbstractREPL
answer_color(::AbstractREPL) = ""

type REPLBackend
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
in_eval::Bool
ans
backend_task::Task
Expand Down Expand Up @@ -75,7 +76,7 @@ function eval_user_input(ast::ANY, backend::REPLBackend)
end
end

function start_repl_backend(repl_channel::RemoteRef, response_channel::RemoteRef)
function start_repl_backend(repl_channel::Channel, response_channel::Channel)
backend = REPLBackend(repl_channel, response_channel, false, nothing)
backend.backend_task = @schedule begin
# include looks at this to determine the relative include path
Expand Down Expand Up @@ -154,13 +155,13 @@ end

# A reference to a backend
immutable REPLBackendRef
repl_channel::RemoteRef
response_channel::RemoteRef
repl_channel::Channel
response_channel::Channel
end

function run_repl(repl::AbstractREPL)
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel()
response_channel = Channel()
backend = start_repl_backend(repl_channel, response_channel)
run_frontend(repl, REPLBackendRef(repl_channel,response_channel))
backend
Expand Down Expand Up @@ -665,7 +666,7 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep
#
# Usage:
#
# repl_channel,response_channel = RemoteRef(),RemoteRef()
# repl_channel,response_channel = Channel(),Channel()
# start_repl_backend(repl_channel, response_channel)
# setup_interface(REPLDisplay(t),repl_channel,response_channel)
#
Expand Down Expand Up @@ -894,8 +895,8 @@ input_color(r::StreamREPL) = r.input_color
function run_repl(stream::AsyncStream)
repl =
@async begin
repl_channel = RemoteRef()
response_channel = RemoteRef()
repl_channel = Channel()
response_channel = Channel()
start_repl_backend(repl_channel, response_channel)
StreamREPL_frontend(repl, repl_channel, response_channel)
end
Expand Down
138 changes: 138 additions & 0 deletions base/channels.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

abstract AbstractChannel{T}

type Channel{T} <: AbstractChannel{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol

data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position

function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
szp1 = sz > 32 ? 33 : sz+1
new(Condition(), Condition(), :open,
Array(T, szp1), szp1, sz_max, 1, 1)
end
end

const DEF_CHANNEL_SZ=32

Channel() = Channel(DEF_CHANNEL_SZ)
Channel(sz::Int) = Channel{Any}(sz)

closed_exception() = InvalidStateException("Channel is closed.", :closed)
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
c
end
isopen(c::Channel) = (c.state == :open)

type InvalidStateException <: Exception
msg::AbstractString
state
end
InvalidStateException() = InvalidStateException("")
InvalidStateException(msg) = InvalidStateException(msg, 0)

function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
if (c.szp1 - 1) < c.sz_max
if ((c.szp1-1) * 2) > c.sz_max
c.szp1 = c.sz_max + 1
else
c.szp1 = ((c.szp1-1) * 2) + 1
end
newdata = Array(eltype(c), c.szp1)
if c.put_pos > c.take_pos
copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos))
c.put_pos = c.put_pos - c.take_pos + 1
else
len_first_part = length(c.data) - c.take_pos + 1
copy!(newdata, 1, c.data, c.take_pos, len_first_part)
copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1)
c.put_pos = len_first_part + c.put_pos
end
c.take_pos = 1
c.data = newdata
else
wait(c.cond_put)
check_open(c)
end
end

c.data[c.put_pos] = v
c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end

function fetch(c::Channel)
wait(c)
c.data[c.take_pos]
end

function take!(c::Channel)
!isopen(c) && !isready(c) && throw(closed_exception())
while !isready(c)
wait(c.cond_take)
end
v = c.data[c.take_pos]
c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end

isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)

function wait(c::Channel)
while !isready(c)
wait(c.cond_take)
end
nothing
end

function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
end

eltype{T}(c::Channel{T}) = T

function length(c::Channel)
if c.put_pos >= c.take_pos
return c.put_pos - c.take_pos
else
return c.szp1 - c.take_pos + c.put_pos
end
end

size(c::Channel) = c.sz_max

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(size(c)),sz_curr:$(length(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}(Nullable{T}())
function done(c::Channel, state::Ref)
try
# we are waiting either for more data or channel to be closed
state.x = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::Channel{T}, state) = (get(state.x), Ref{Nullable{T}}(Nullable{T}()))

3 changes: 3 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,6 @@ function require_filename(name::AbstractString)
end
const reload = require
export reload

@deprecate RemoteRef() Future()
@deprecate RemoteRef(pid) Future(pid)
6 changes: 5 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export
BufferStream,
CartesianIndex,
CartesianRange,
Channel,
Cmd,
Colon,
Complex,
Expand Down Expand Up @@ -86,7 +87,7 @@ export
Rational,
Regex,
RegexMatch,
RemoteRef,
RegisteredRef,
RepString,
RevString,
RopeString,
Expand Down Expand Up @@ -157,6 +158,7 @@ export
DimensionMismatch,
EOFError,
ErrorException,
InvalidStateException,
KeyError,
LoadError,
MethodError,
Expand Down Expand Up @@ -1194,6 +1196,7 @@ export
addprocs,
ClusterManager,
fetch,
Future,
init_worker,
interrupt,
isready,
Expand All @@ -1205,6 +1208,7 @@ export
pmap,
procs,
put!,
register,
remotecall,
remotecall_fetch,
remotecall_wait,
Expand Down
Loading