Skip to content

Commit

Permalink
Added docs, minor tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 20, 2015
1 parent 427dd38 commit 99e05fb
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 195 deletions.
128 changes: 42 additions & 86 deletions base/channels.jl
Original file line number Diff line number Diff line change
@@ -1,32 +1,23 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

abstract ChannelData{T}

type ChannelDataSingle{T} <: ChannelData{T}
full::Bool # Need a separate flag here to handle the case where "nothing" is a valid value in "data"
data::Nullable{T}
end

type ChannelDataMultiple{T} <: ChannelData{T}
data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position
end

abstract AbstractChannel{T}

@enum ChannelState C_OPEN C_CLOSED

type Channel{T, Store} <: AbstractChannel{T}
type Channel{T} <: AbstractChannel{T}
cid::Int
store::ChannelData{T}
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::ChannelState

Channel(store) = new(get_next_channel_id(), store, Condition(), Condition(), C_OPEN)
data::Array{T,1}
szp1::Int # current channel size plus one
sz_max::Int # maximum size of channel
take_pos::Int # read position
put_pos::Int # write position

Channel(elt, szp1, sz_max) = new(get_next_channel_id(), Condition(), Condition(), C_OPEN,
Array(T, szp1), szp1, sz_max, 1, 1)
end

let next_channel_id=1
Expand All @@ -39,13 +30,12 @@ let next_channel_id=1
end

Channel() = Channel(Any)
Channel(T::Type) = Channel{T, ChannelDataSingle{T}}(ChannelDataSingle{T}(false, Nullable{T}()))

Channel(T::Type) = Channel(T::Type, 1)
Channel(sz::Int) = Channel(Any, sz)
function Channel(T::Type, sz::Int)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
csz = sz > 32 ? 32 : sz
Channel{T, ChannelDataMultiple{T}}(ChannelDataMultiple{T}(Array(T, csz+1), csz+1, sz_max, 1, 1))
Channel{T}(T, csz+1, sz_max)
end

function close(c::Channel)
Expand Down Expand Up @@ -81,87 +71,57 @@ function done(c::Channel, state)
end
next(c::Channel, state) = (take!(c), nothing)

function put!{T}(c::Channel{T, ChannelDataMultiple{T}}, v::T)
function put!(c::Channel, v)
!isopen(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
store::ChannelDataMultiple{T} = c.store
d = store.take_pos - store.put_pos
if (d == 1) || (d == -(store.szp1-1))
d = c.take_pos - c.put_pos
if (d == 1) || (d == -(c.szp1-1))
# grow the channel if possible
if (store.szp1 - 1) < store.sz_max
if ((store.szp1-1) * 2) > store.sz_max
store.szp1 = store.sz_max + 1
if (c.szp1 - 1) < c.sz_max
if ((c.szp1-1) * 2) > c.sz_max
c.szp1 = c.sz_max + 1
else
store.szp1 = ((store.szp1-1) * 2) + 1
c.szp1 = ((c.szp1-1) * 2) + 1
end
newdata = Array(eltype(c), store.szp1)
if store.put_pos > store.take_pos
copy!(newdata, 1, store.data, store.take_pos, (store.put_pos - store.take_pos))
store.put_pos = store.put_pos - store.take_pos + 1
newdata = Array(eltype(c), c.szp1)
if c.put_pos > c.take_pos
copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos))
c.put_pos = c.put_pos - c.take_pos + 1
else
len_first_part = length(store.data) - store.take_pos + 1
copy!(newdata, 1, store.data, store.take_pos, len_first_part)
copy!(newdata, len_first_part+1, store.data, 1, store.put_pos-1)
store.put_pos = len_first_part + store.put_pos
len_first_part = length(c.data) - c.take_pos + 1
copy!(newdata, 1, c.data, c.take_pos, len_first_part)
copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1)
c.put_pos = len_first_part + c.put_pos
end
store.take_pos = 1
store.data = newdata
c.take_pos = 1
c.data = newdata
else
wait(c.cond_put)
end
end

store.data[store.put_pos] = v
store.put_pos = (store.put_pos == store.szp1 ? 1 : store.put_pos + 1)
c.data[c.put_pos] = v
c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end

function put!{T}(c::Channel{T, ChannelDataSingle{T}}, v::T)
!isopen(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
store::ChannelDataSingle{T} = c.store
if store.full
wait(c.cond_put)
end
store.data = v
store.full = true
notify(c.cond_take, nothing, true, false)
v
end

function fetch(c::Channel)
wait(c)
fetch(c.store)
c.data[c.take_pos]
end

fetch(store::ChannelDataSingle) = get(store.data)
fetch(store::ChannelDataMultiple) = store.data[store.take_pos]

function take!{T, S}(c::Channel{T, S})
!isopen(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
function take!(c::Channel)
!isopen(c) && !isready(c) && throw(InvalidStateException("Channel is closed.", C_CLOSED))
while !isready(c)
wait(c.cond_take)
end
store::S = c.store
v = take!(store)
v = c.data[c.take_pos]
c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end

function take!{T}(store::ChannelDataSingle{T})
assert(store.full == true)
v=get(store.data, nothing) # Nothing is a valid value to store!
store.data=Nullable{T}()
store.full=false
v
end
function take!(store::ChannelDataMultiple)
v = store.data[store.take_pos]
store.take_pos = (store.take_pos == store.szp1 ? 1 : store.take_pos + 1)
v
end

isready{T}(c::Channel{T, ChannelDataSingle{T}}) = c.store.full
isready{T}(c::Channel{T, ChannelDataMultiple{T}}) = (store = c.store; store.take_pos == store.put_pos ? false : true)
isready(c::Channel) = (c.take_pos == c.put_pos ? false : true)

function wait(c::Channel)
while !isready(c)
Expand All @@ -178,18 +138,14 @@ end

eltype{T}(c::Channel{T}) = T

function length{T}(c::Channel{T, ChannelDataMultiple{T}})
store=c.store
if store.put_pos >= store.take_pos
return store.put_pos - store.take_pos
function length(c::Channel)
if c.put_pos >= c.take_pos
return c.put_pos - c.take_pos
else
return store.szp1 - store.take_pos + store.put_pos
return c.szp1 - c.take_pos + c.put_pos
end
end

length{T}(c::Channel{T, ChannelDataSingle{T}}) = c.store.full ? 1 : 0

size{T}(c::Channel{T, ChannelDataSingle{T}}) = 1
size{T}(c::Channel{T, ChannelDataMultiple{T}}) = c.store.sz_max
size(c::Channel) = c.sz_max

show(io::IO, c::Channel) = print(io, "$(typeof(c))(id: $(c.cid), size: $(size(c)), num_elements: $(length(c)))")
show(io::IO, c::Channel) = print(io, "$(typeof(c))(id:$(c.cid),sz_max:$(size(c)),sz_curr:$(length(c)))")
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ export
Rational,
Regex,
RegexMatch,
# RemoteRef,
RepString,
RevString,
RopeString,
Expand Down
2 changes: 2 additions & 0 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ Future(w::LocalProcess) = Future(w.id)
Future(w::Worker) = Future(w.id)
Future(pid::Integer) = Future{Any}(pid, myid(), next_ref_id(), Any)

show(io::IO, f::Future) = print(io, "Future($(f.where),$(f.whence),$(f.id))")

type ChannelRef{T} <: AbstractRemoteRef{T}
where::Int
whence::Int
Expand Down
84 changes: 43 additions & 41 deletions doc/manual/control-flow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -600,47 +600,49 @@ Built-in :exc:`Exception`\ s
:exc:`Exception`\ s are thrown when an unexpected condition has occurred. The
built-in :exc:`Exception`\ s listed below all interrupt the normal flow of control.

+---------------------------+
| :exc:`Exception` |
+===========================+
| :exc:`ArgumentError` |
+---------------------------+
| :exc:`BoundsError` |
+---------------------------+
| :exc:`DivideError` |
+---------------------------+
| :exc:`DomainError` |
+---------------------------+
| :exc:`EOFError` |
+---------------------------+
| :exc:`ErrorException` |
+---------------------------+
| :exc:`InexactError` |
+---------------------------+
| :exc:`InterruptException` |
+---------------------------+
| :exc:`KeyError` |
+---------------------------+
| :exc:`LoadError` |
+---------------------------+
| :exc:`OutOfMemoryError` |
+---------------------------+
| :exc:`ReadOnlyMemoryError`|
+---------------------------+
| :exc:`MethodError` |
+---------------------------+
| :exc:`OverflowError` |
+---------------------------+
| :exc:`ParseError` |
+---------------------------+
| :exc:`SystemError` |
+---------------------------+
| :exc:`TypeError` |
+---------------------------+
| :exc:`UndefRefError` |
+---------------------------+
| :exc:`UndefVarError` |
+---------------------------+
+------------------------------+
| :exc:`Exception` |
+==============================+
| :exc:`ArgumentError` |
+------------------------------+
| :exc:`BoundsError` |
+------------------------------+
| :exc:`DivideError` |
+------------------------------+
| :exc:`DomainError` |
+------------------------------+
| :exc:`EOFError` |
+------------------------------+
| :exc:`ErrorException` |
+------------------------------+
| :exc:`InexactError` |
+------------------------------+
| :exc:`InterruptException` |
+------------------------------+
| :exc:`InvalidStateException` |
+------------------------------+
| :exc:`KeyError` |
+------------------------------+
| :exc:`LoadError` |
+------------------------------+
| :exc:`OutOfMemoryError` |
+------------------------------+
| :exc:`ReadOnlyMemoryError` |
+------------------------------+
| :exc:`MethodError` |
+------------------------------+
| :exc:`OverflowError` |
+------------------------------+
| :exc:`ParseError` |
+------------------------------+
| :exc:`SystemError` |
+------------------------------+
| :exc:`TypeError` |
+------------------------------+
| :exc:`UndefRefError` |
+------------------------------+
| :exc:`UndefVarError` |
+------------------------------+


For example, the :func:`sqrt` function throws a :exc:`DomainError` if applied to a
Expand Down
Loading

0 comments on commit 99e05fb

Please sign in to comment.