Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added minimal code example to Channels, fix #14312 #17674

Closed
wants to merge 2 commits into from

Conversation

kshyatt
Copy link
Contributor

@kshyatt kshyatt commented Jul 28, 2016

@kshyatt kshyatt added docs This change adds or pertains to documentation parallelism Parallel or distributed computation labels Jul 28, 2016
@@ -492,6 +492,27 @@ and size 10. The channel exists on worker ``pid``\ .
Methods ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait`` on a ``RemoteChannel`` are proxied onto
the backing store on the remote process.

As an example, we can construct a ``RemoteChannel`` to a worker, ``put!``\ an index inside, then ``take!``\ it out::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the \ should only be needed if there's punctuation, spaces should be fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see I don't know RST very well.

@ViralBShah
Copy link
Member

I wonder if we should separate the distributed and parallel computing sections of this page. There are many lower-level primitives that we have for general purpose distributed computing, whereas many users of parallel computing are perhaps content with pmap, @parallel and other similar patterns that we may introduce over time.

@ViralBShah
Copy link
Member

Maybe some of this belongs to the Networking and Streams section? Sorry for creating the noise here, but this PR triggered the thought.

@mweastwood
Copy link
Contributor

I don't think this example is ideal because you are showing that the master process can put! and take! from the channels. We'd like to see the workers taking what the master process put.

Maybe something like this:

addprocs(5)
channels = Dict(worker => RemoteChannel() for worker in workers())
@everywhere take_and_print!(c) = println(take!(c))
for worker in workers()
    put!(channels[worker], worker)
    remotecall_fetch(take_and_print!, worker, channels[worker])
end

Which should output:

    From worker 2:  2
    From worker 3:  3
    From worker 4:  4
    From worker 5:  5
    From worker 6:  6

@kshyatt
Copy link
Contributor Author

kshyatt commented Jul 28, 2016

That's a great point, @mweastwood. Why don't we include both, so that people can generalize from the easy case (master puts and takes) to the harder one?

@@ -492,6 +492,46 @@ and size 10. The channel exists on worker ``pid``\ .
Methods ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait`` on a ``RemoteChannel`` are proxied onto
the backing store on the remote process.

As an example, we can construct a ``RemoteChannel`` to a worker, ``put!`` an index inside, then ``take!`` it out:

.. doctest::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the doctest format is a little pickier than this, most of them look for julia> on the inputs then non-prefixed output like you'd get at the repl

@amitmurthy
Copy link
Contributor

Thanks for doing this (again!).

I was thinking that we could take the same pmap code example in the previous section and rewrite it once using Channels and again using RemoteChannels.

@amitmurthy
Copy link
Contributor

As written with all the remote channels refer to channels on the master only. The workers seem incidental.

An example implementation of pmap with Channels:

    function pmap(f, lst)
        np = nprocs()

        # Create a work queue which takes in a tuple of (id, function).
        work_q = Channel{Tuple}(np)
        results_q = Channel{Tuple}(length(lst))

        # Start a task to feed the work queue
        @async begin
            for (job_id, v) in enumerate(lst)
                put!(work_q, (job_id, ()->f(v...)))
            end
            close(work_q)
        end

        @sync begin
            for p in workers()
                # start as many feeder tasks as workers
                @async begin
                    # each task runs till the work_q is open.
                    while isopen(work_q)
                        job_id, job = take!(work_q)
                        job_result = remotecall_fetch(job, p)
                        put!(results_q, (job_id, job_result))
                    end
                end
            end
        end
        close(results_q)

        return map(x->x[2], sort(collect(results_q); lt=(x,y)->x[1]<y[1]))
    end

Would like it simplified further though. I find Alan's suggestion of not having example code blocks more than a single page quite relevant for easier readabilty.

We could then rewrite this once again using RemoteChannels.

Alternatively, we could go with an example of a simple web service that distributes computation among workers. This would refer to package HTTPServer.jl in the example and distribute incoming requests among workers.

@amitmurthy
Copy link
Contributor

I took another shot at a simple example for Channels.
It computes the MD5 of files in a directory using an external program md5. Concurrency will be limited to the number of cores on the machine.


    function compute_md5(dir)
        work_q = Channel(Sys.CPU_CORES)
        results_q = Channel(Sys.CPU_CORES)

        # Create a feeder task which asynchronously adds file names to a work queue
        @schedule begin
            for (root, _, files) in walkdir(dir)
                for file in files
                    put!(work_q, joinpath(root,file))
                end
            end

            # close the queue once all filenames have been added. This causes
            # worker tasks iterating over the work_q to exit. 
            close(work_q)
        end

        # Create as many worker tasks as number of cores on the machine.
        # Close the results_q once all tasks have finished.
        # Execute the entire worker task set in a seperate task itself 

        @schedule begin                                         # Schedule a task which starts and waits for worker tasks
            @sync begin                                         # Wait for all worker tasks to finish
                for _ in 1:Sys.CPU_CORES                        # As many worker tasks as cores
                    @async begin                                # One worker task per core
                        for file in work_q                      # Process files from channel till it is closed.
                            strm, process = open(`md5 $file`)   # Launch external program
                            wait(process)                       # Wait for external program to complete
                            put!(results_q, readstring(strm))   # collect output and write out the result
                        end
                    end
                end
            end
            close(results_q)                                    # All worker tasks have finished. Close the results channel. 
        end

        # Println out the results as they are processed.
        for md5 in results_q
            println(md5)
        end
    end

Does this work better for a beginner reading the section on Channels? Note that this does not cover RemoteChannels.

@kshyatt
Copy link
Contributor Author

kshyatt commented Aug 3, 2016

I might show the full example and then go through line by line (might be easier to read for a newbie) explaining what the more complex blocks do. Then you can say "for more information about @async, see...".

Might be nice also to show some sample output.

I think the example is great, and RemoteChannel can wait or perhaps a better choice is to have a Jupyter notebook showing that?

@vtjnash
Copy link
Member

vtjnash commented Aug 3, 2016

Delete the line "wait(process)" to remove a livelock failure

@amitmurthy
Copy link
Contributor

@vtjnash how can that trigger a livelock failure? Just to understand what is happening under the hood.

@vtjnash
Copy link
Member

vtjnash commented Aug 3, 2016

It puts upstream pressure on the writer to prevent it from exiting since there is no active reader to consume the data

@amitmurthy
Copy link
Contributor

The same example above done with RemoteChannels. Should mention that it is applicable when 1) processes are distributed across nodes and 2) the directory is on a network file system.

addprocs()                   # Starts as many workers as cores
                             # For starting workers distributed across machines use
                             # addprocs([(h, :auto) for h in [host1, host2...]])
                             # This will launch workers on host1, host2....
                             # The :auto option results in as many workers as cores on each host.

function compute_md5(dir)
    work_q = RemoteChannel(()->Channel(nworkers()))
    results_q = RemoteChannel(()->Channel(nworkers()))

    # Function to read filename and write md5 values from the remote channels
    # This will be run on each worker.
    md5_func = () -> begin
        while (true)
            file = take!(work_q)

            file == :DONE && break

            strm, process = open(`md5 $file`)           # Launch external program
            put!(results_q, readstring(strm))           # collect output and write out the result
        end
    end

    # Create a feeder task which asynchronously adds file names to the work queue
    nfiles = 0
    @schedule begin
        for (root, _, files) in walkdir(dir)    # `dir` should be on a network file system
                                                # if workers are distributed across nodes.
            for file in files
                put!(work_q, joinpath(root,file))
                nfiles += 1
            end
        end

        # release all the remote tasks
        for _ in 1:nworkers()
            put!(work_q, :DONE)
        end
    end

    # Launch tasks on each worker to process requests.
    for p in procs()
        remotecall(md5_func, p)
    end

    # Print out the results as they are processed.
    while nfiles > 1
        println(take!(results_q))
        nfiles -= 1
    end
end

@amitmurthy
Copy link
Contributor

A better example for RemoteChannel will be something like a HTTPServer farming out individual requests to workers, or a pipeline of image processing tasks, but as these would involve external packages we should just stick with the contrived md5 example.

@amitmurthy
Copy link
Contributor

Also consider adding the following :


The above examples show how channels can be used for inter-task communication, and remote channels for inter-process communication. They are well suited for building work pipelines or distributing work across tasks and processes.

It is to be noted that for simpler scenarios, asyncmap which distributes work across tasks, and pmap which distributes work across nodes work well.

For example, md5 calculations can be performed locally with

filenames=[]
for (root, _, files) in walkdir("..")
    append!(filenames, [joinpath(root,f) for f in files])
end 

asyncmap(f->readstring(open(`md5 $f`)[1]), filenames)

or distributed with

pmap(f->readstring(open(`md5 $f`)[1]), filenames)

@kshyatt
Copy link
Contributor Author

kshyatt commented Dec 22, 2017

The examples we have now are great! This PR has been totally superseded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs This change adds or pertains to documentation parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants