Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel and RemoteChannel #8507

Closed
wants to merge 3 commits into from
Closed

Channel and RemoteChannel #8507

wants to merge 3 commits into from

Conversation

amitmurthy
Copy link
Contributor

Revisiting the idea to have support for "channels" in Base.

  • A Channel is used for inter-task communication. Implementation is in task.jl
  • Channels have a size, i.e., the maximum number of elements that can be put! without blocking. Defaults to 1.
  • They also have a type associated with it. Defaults to Any.
  • Channels would be a much cleaner interface for the recent ClusterManager interface change, where multiple tasks work in parallel totally asynchronously, unlike the produce/consume pattern which works best where both the producer and consumer work in lock-step.
  • put! blocks when a channel is full.
  • take! and wait block when it is empty.
  • isready denotes availability of data in the channel
  • RemoteRef has been renamed to RemoteChannel. Like local channels, they too have a size and type associated with it. Defaults to 1 and Any. Unlike a Channel, a RemoteChannel is accessible across processes.

TODO:

  • documentation
  • more tests
  • Change the backing object of a RemoteChannel to use a Channel

@StefanKarpinski
Copy link
Member

I like this a lot. I wonder if Channel should be an abstract type with LocalChannel and RemoteChannel as concrete subtypes. Alternatively, AbstractChannel with Channel and RemoteChannel as is done here.

@amitmurthy
Copy link
Contributor Author

My preference would be for the latter, i.e., AbstractChannel with Channel and RemoteChannel .

@JeffBezanson
Copy link
Member

👍

Shouldn't Channel have a type parameter?

@amitmurthy
Copy link
Contributor Author

Channel is not a parameterized type if that is what you are asking.

The constructors available are

Channel() = Channel(1)
Channel(sz::Int) = Channel(Any, sz)
Channel(T::Type) = Channel(T, 1)
Channel(T::Type, sz::Int)

The channel type defines the type of the backing vector, and put! fails if you add an element that cannot be converted to the channel type.

@JeffBezanson
Copy link
Member

Why not make T a type parameter? It seems obvious to me that it should be.

@amitmurthy
Copy link
Contributor Author

OK.

@shashi
Copy link
Contributor

shashi commented Sep 29, 2014

Nice! I have pretty much the same implmementation of Channels here: https://github.com/shashi/CSP.jl/blob/master/src/CSP.jl operators on it are send, recv, map, reduce, filter, merge and there's also select to watch multiple channels and act when one of them can recv. Channel{T} is a channel that can transmit values of type T. Default buffer size is 0.

Very similar to Channels in Golang or Clojure's core.async

My thinking was that Channels should be decoupled from Transports, i.e. instead of having a RemoteChannel type, it would be nice to just have Channels and be able to pipe channels in and out of different transports (e.g. a TCP transport or a ZMQ socket). A transport is just any object which supports send(::T) and recv(::T)

This is nice because then you get the power of map, reduce, filter etc without having to rewrite it for each kind of channel. Connecting things on different machines becomes as easy as it should be, I think.

@amitmurthy
Copy link
Contributor Author

RemoteChannels are for inter-process (worker) communication while Channels are for inter-task communication. The Channel implementation uses a circular buffer and grows on demand upto its maximum size.

RemoteChannels are in effect just a remote reference to a single, common Channel.

Serializing/deserializing just sends the reference coordinates (where, whence, id) between processes.

So, "pipe channels in and out of different transports" does not make sense, unless you mean piping references to the channel - the channel data should only ever be on one process.

In future, we could make it possible to seamlessly "send" regular Channels across processes by automatically creating references and sending RemoteChannel references across instead. So, any code using AbstractChannel will work in both circumstances.

@ViralBShah
Copy link
Member

Decoupling channels from transports as we go forward does seem like a good idea. One could then even use an MPI transport, in addition to tcp/zmq etc.

@amitmurthy
Copy link
Contributor Author

Why use a circular buffer when....

I assume that push! and shift! will result in a memmove sooner or later. For larger, active channels it could have a measurable cost.

N tasks (or processes) could each be waiting on the same channel object

c::Channel{Request}

for x in 1:N
  @schedule begin 
     while true
       serve(take!(c))
    end
  end
end

Each time a request is added to c, only one of the take! calls will return.

With threading, we would require appropriate locking in the implementation.

@shashi
Copy link
Contributor

shashi commented Sep 29, 2014

Oops, I deleted my comment by mistake. That makes sense! Thanks.

Edit: For the record, previous comment was:
What I meant was "piping data from channels to transports, and vice versa" I guess. A Transport will need to be more than just a type with send and recv, it would also have to have some way of mapping messages to channels, using some custom headers maybe.

I don't really understand what sending Channels across processes entails.

Why use a circular buffer when push!, shift! and length feel more natural and can cut down on a lot of bookkeeping?

I am confused about fetch and take!... Say I have a web server with N tasks (or eventually, say with RemoteChannels, N processes or threads) listening for requests on a Channel{Request} object

# a worker
c::Channel{Request}
@async while true
    serve(take!(c))
end

Now when a Request is put on the channel, all the N tasks get notified. They will all try to shift messages off the queue until there is nothing left on it, correct? Will this work safely if @async spawned a new thread instead of a task? It would be awesome if we get guaranteed request-stealing kind of load balancing for free here.

@amitmurthy
Copy link
Contributor Author

In order to have a single Channel type across tasks and processes, how about the following:

  • A single Channel type
  • Channel(...) returns a local Channel. The type is not exported and hence cannot be
    instantiated directy by user code.
  • channel(...) constructs a local channel and returns a ChannelRef{Local}
  • channel(pid, ...) constructs a local channel at pid and returns a ChannelRef{Remote}
  • ChannelRef is exported, it is just a handle to a channel.
  • ChannelRef{Local} is a wrapper around the underlying Channel object,
    and put!, take!, etc are all passthroughs
  • ChannelRef{Remote} is the usual (where, whence, id) tuple
  • Channel objects are not serializable.

Users will always instantiate channels with the channel() methods, which return
serializable ChannelRefs.

The act of sending a ChannelRef{Local} to a different worker will result in

  • Creation of a ChannelRef{Remote} object refering to the same Channel as referenced by the ChannelRef{Local}
  • Appropriate tracking datastructures in multi.jl being updated. Need to ensure that the same object is not added multiple times
  • The resulting ChannelRef{Remote} being serialized/deserialized to the remote process

@shashi
Copy link
Contributor

shashi commented Sep 29, 2014

This sounds good! I guess transports are a different story unrelated to this PR as they should be. 👍 And ChannelRef seems like the bookkeeping necessary along with a transport which can do send and recv. What are Local and Remote in ChannelRef{Remote} and ChannelRef{Local}? Did you mean something like ChannelRef{remote} where remote is a boolean type parameter?

Would it be really hard to get rid of fetch and only have take!? We can then probably have a FanoutChannel which specifically implements fetch. In general, it will be nice not to wake all listening tasks up just to immediately suspend them again in the N-worker web server example.

@amitmurthy
Copy link
Contributor Author

Yes, just a boolean type parameter defining the type of reference being held by ChannelRef.

As for fetch, in Base I think only darrays and shared arrays are using it.

We could also parametrize on size, and have a different implementation for channels of size 1. And implement fetch only for channels of size 1 where it does make sense to have it.

@JeffBezanson
Copy link
Member

I notice RemoteValue (now RemoteQ) contains nearly the same state as a Channel; perhaps it should wrap a Channel.

I don't think it's any better to have ChannelRef{Remote}. ChannelRef{Local}, and Channel than it is to have Channel, RemoteChannel, and AbstractChannel.

@amitmurthy
Copy link
Contributor Author

Yes. That is the third TODO mentioned in PR description above.

@jiahao jiahao force-pushed the master branch 2 times, most recently from 2ef98c5 to 0388647 Compare October 5, 2014 00:57
@amitmurthy
Copy link
Contributor Author

Have made changes as discussed above.

A few minor things :

  • Channel constructors can be any of, Channel(), Channel(T), Channel(sz) or Channel(T, sz) where T and sz default to Any and 1 respectively. OTOH, RemoteChannel uses keyword arguments for type and size : RemoteChannel(; T=Any, sz=1) and RemoteChannel(pid; T=Any, sz=1), because of the pid argument.

I was just wondering whether, in order to keep Channel and RemoteChannel constructors similar, should we make the pid a keyword argument instead. The flipside of this is that it would badly break any existing code using RemoteRef(pid)

  • There has been some confusion expressed on the mailing list w.r.t. shared arrays, where, because of the name SharedArray, it was assumed that the entire array is sent across the network when sending a shared array object over.

This is not the case, as only references to the shared array are sent over, but the name suggests otherwise.

In order to avoid a similar confusion here, I am wondering whether to change RemoteChannel to ChannelRef - it will make it clear that a remote channel is only a reference to a channel on a particular worker and not the channel itself.

  • Related to the point above, a remote channel just wraps a channel, so while we still have an AbstractChannel, a RemoteChannel should probably not be a subtype of AbstractChannel.

@amitmurthy amitmurthy changed the title WIP/RFC: Channel and RemoteChannel Channel and RemoteChannel Oct 8, 2014
@amitmurthy
Copy link
Contributor Author

Bump @JeffBezanson .

I will rebase and resolve conflicts if this is OK.

@ViralBShah
Copy link
Member

Bump. It would be good to merge to unblock follow-on work.

@ViralBShah
Copy link
Member

Bump again. @JeffBezanson @StefanKarpinski

Should we get this in and tweak it later?

@amitmurthy
Copy link
Contributor Author

Lets hold off for a few days. I am working on abstracting out the transport part from base/multi.jl and I need to provide a channel like interface there.

More like a PipeBuffer, but one that supports waiting and notifications, i.e., an AsyncStream object that is in-process and works off a buffer. Not an interface to an OS provided object.

That will bring up the issue of what the correct interface for a Channel should be - a RemoteRef one or an AsyncStream one.

Let me get that PR out and we can then continue this discussion.

@amitmurthy
Copy link
Contributor Author

#9046 introduces a new type BufferStream. Its intent and implementation is quite different from Channels.

This PR is ready for review.

@tkelman
Copy link
Contributor

tkelman commented Nov 26, 2014

This has trailing whitespace so fails travis, and errors during bootstrap on windows. dunno why that happened, seems better now

tonotify = []
# delete this worker from our RemoteChannel client sets
ids = {}
tonotify = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated syntax

@amitmurthy
Copy link
Contributor Author

Fixed. Thanks.

@amitmurthy
Copy link
Contributor Author

Travis build errors seem unrelated. Win32 went through.

@ViralBShah
Copy link
Member

Isn't this now in a package? Can it be closed?

@amitmurthy
Copy link
Contributor Author

Similar functionality is available in https://github.com/JuliaParallel/MessageUtils.jl

However, I do feel that we should have channel functionality in Base itself. Channels are a very useful feature for inter-task communications where producers / consumers do not have to co-ordinate with each other.

And it changes RemoteRef from “shared queue of length 1” to "shared queue of an arbitrary length".

I can rework this PR against the current master. One change I would like is to retain the name RemoteRef as an alias for RemoteChannel(1), i.e. a RemoteChannel of length one. Too much of user code out there to unnecessarily annoy them.

@ViralBShah
Copy link
Member

Is this something we should be targeting for 0.4?

@StefanKarpinski
Copy link
Member

I think this is a good idea, but I worry about how multithreading is going to affect the API. Go has a channel abstraction that is only for talking between threads in the same process. This is similar, but for talking between tasks either on the same process or across processes (potentially across a network). Do we think this model will survive as is once we have threading?

@amitmurthy
Copy link
Contributor Author

Yes. Will update this PR.

@amitmurthy
Copy link
Contributor Author

Actually Channels are optimized for inter-task communication. While RemoteChannel is more like a handle to a Channel on a different process.

In Erlang, every task has its own message queue, and it is trivial to send a message to a remote task's message queue. Architectures that rely on messaging between lots of tasks will have a need for channels. Multi-threading will reduce the need for shared arrays, but for message driven architectures, where users do not want to deal with locking/unlocking shared resources, message queues are a better model.

A contrived example with tasks and threads - but actually quite a common pattern.

  • A Julia web app process has, say, 10 database I/O threads, each of which executes blocking SQL calls. The actual request to be executed is read from a common input channel.
  • Incoming HTTP requests, say upto a 1000 concurrent requests, all run under a single thread, but in 1000 different tasks.
  • When a request needs database I/O, the request, along with a response channel object are wrapped in a message and dumped onto the database channel, after which the task waits on the response channel.
  • The database I/O threads simply work off the queue and results are returned as messages into the response channel.

@amitmurthy
Copy link
Contributor Author

Message queue based communication between lots of very lightweight tasks - which are usually waiting for some event to happen - I/O, timer, etc is different from multi-threading for taking advantage of multiple cores.

Tasks blocking on libuv events are OK. We can have thousands of them in the same thread. Multi-threading will be used for either spreading computation across cores or b) as in the database example above, since, AFAIK, the standard model in ODBC is thread blocking calls.

@amitmurthy
Copy link
Contributor Author

Closed in favor of #12042

@amitmurthy amitmurthy closed this Jul 7, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants