diff --git a/REQUIRE b/REQUIRE index 2af02b73..6f90f45b 100644 --- a/REQUIRE +++ b/REQUIRE @@ -3,3 +3,4 @@ FactCheck StatsBase Distributions Compat +MessageUtils diff --git a/src/BlackBoxOptim.jl b/src/BlackBoxOptim.jl index 0131ecf7..dced0fe8 100644 --- a/src/BlackBoxOptim.jl +++ b/src/BlackBoxOptim.jl @@ -4,6 +4,7 @@ using Distributions, StatsBase, Compat export Optimizer, AskTellOptimizer, SteppingOptimizer, PopulationOptimizer, bboptimize, bbsetup, compare_optimizers, + ParallelPopulationOptimizer, DiffEvoOpt, de_rand_1_bin, de_rand_1_bin_radiuslimited, adaptive_de_rand_1_bin, adaptive_de_rand_1_bin_radiuslimited, @@ -169,6 +170,7 @@ include("resampling_memetic_search.jl") include("simultaneous_perturbation_stochastic_approximation.jl") include("generating_set_search.jl") include("direct_search_with_probabilistic_descent.jl") +include("parallel_population_optimizer.jl") # Fitness # include("fitness/fitness_types.jl") FIXME merge it with fitness.jl diff --git a/src/optimization_methods.jl b/src/optimization_methods.jl index e9000abc..ce10fa39 100644 --- a/src/optimization_methods.jl +++ b/src/optimization_methods.jl @@ -14,6 +14,7 @@ ValidMethods = @compat Dict{Symbol,Union(Any,Function)}( :simultaneous_perturbation_stochastic_approximation => SimultaneousPerturbationSA2, :generating_set_search => GeneratingSetSearcher, :probabilistic_descent => direct_search_probabilistic_descent, + :parallel_population_optimizer => parallel_population_optimizer, ) MethodNames = sort!(collect(keys(ValidMethods))) diff --git a/src/parallel_population_optimizer.jl b/src/parallel_population_optimizer.jl new file mode 100644 index 00000000..2cd4d628 --- /dev/null +++ b/src/parallel_population_optimizer.jl @@ -0,0 +1,305 @@ +using MessageUtils + +ParallelPopulationOptimizer_DefaultParameters = @compat Dict{Symbol,Any}( + :WorkerMethod => :de_rand_1_bin, # worker population optimization method + :NWorkers => 2, # number of workers + :MigrationSize => 1, # number of "migrant" individual to sent to the master + :MigrationPeriod => 100, # number of worker iterations before sending migrants + :ArchiveCapacity => 10 # ParallelPseudoEvaluator archive capacity +) + +# metrics from worker optimizer +type WorkerMetrics + num_evals::Int # number of function evals + num_steps::Int # number of steps + num_better::Int # number of steps that improved best fitness + num_migrated::Int # number of migrants worker received + num_better_migrated::Int # number of migrants accepted (because fitness improved) + + WorkerMetrics() = new(0, 0, 0, 0, 0) +end + +function Base.copy!(a::WorkerMetrics, b::WorkerMetrics) + a.num_evals = b.num_evals + a.num_steps = b.num_steps + a.num_better = b.num_better + a.num_migrated = b.num_migrated + a.num_better_migrated = b.num_better_migrated + return a +end + +# fake evaluator for ParallelPopulationOptimizer +# it doesn't evaluate itself, but stores some +# metrics from the workers evaluators +type ParallelPseudoEvaluator{F, P<:OptimizationProblem} <: Evaluator{P} + problem::P + archive::TopListArchive{F} + workers_metrics::Vector{WorkerMetrics} # function evals per worker etc + last_fitness::F +end + +num_evals(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_evals, +, 0, ppe.workers_metrics) +num_better(ppe::ParallelPseudoEvaluator) = mapreduce(x -> x.num_better, +, 0, ppe.workers_metrics) + +function ParallelPseudoEvaluator{P<:OptimizationProblem}( + problem::P, nworkers::Int; + archiveCapacity::Int = 10) + ParallelPseudoEvaluator{fitness_type(problem), P}( + problem, + TopListArchive(fitness_scheme(problem), numdims(problem), archiveCapacity), + WorkerMetrics[WorkerMetrics() for i in 1:nworkers], + nafitness(fitness_scheme(problem))) +end + +# message with the candidate passed between the workers and the master +immutable CandidateMessage{F} + worker::Int # origin of candidate + metrics::WorkerMetrics # current origin worker metrics + candi::Candidate{F} +end + +# Parallel population optimizer +# starts nworkers parallel population optimizers. +# At regular interval, the workers send the master process their random population members +# and the master redirects them to the other workers +type ParallelPopulationOptimizer{F, P<:OptimizationProblem} <: SteppingOptimizer + from_workers::MessageUtils.SyncObjRef{RemoteChannel} + to_workers::Vector{MessageUtils.SyncObjRef{RemoteChannel}} + evaluator::ParallelPseudoEvaluator{F, P} + + ParallelPopulationOptimizer(from_workers::MessageUtils.SyncObjRef{RemoteChannel}, + to_workers::Vector{MessageUtils.SyncObjRef{RemoteChannel}}, + evaluator::ParallelPseudoEvaluator{F, P}) = + new(from_workers, to_workers, evaluator) +end + +nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.to_workers) + +# read worker's message, stores the worker metrics and updates best fitness using +function store!{F}(ppe::ParallelPseudoEvaluator{F}, msg::CandidateMessage{F}) + copy!(ppe.workers_metrics[msg.worker], msg.metrics) + if !isnafitness(msg.candi.fitness, fitness_scheme(ppe)) # store only the candidates with the known fitness + add_candidate!(ppe.archive, msg.candi.fitness, msg.candi.params, num_evals(ppe)) + end +end + +# outer parallel population optimizer constructor that +# also spawns worker tasks +function ParallelPopulationOptimizer{P<:OptimizationProblem}( + problem::P, optimizer_generator::Function, nworkers::Int, + migrationSize::Int = 1, migrationPeriod::Int = 100, + archiveCapacity::Int = 10) + F = fitness_type(problem) + info("Constructing parallel optimizer...") + from_workers = channel(T=CandidateMessage{F}) + to_workers = MessageUtils.SyncObjRef{RemoteChannel}[channel(T=CandidateMessage{F}) for i in 1:nworkers] + workers_started = channel(T=Bool) # FIXME do we need to wait for the worker? + for i in 1:nworkers + info("Initializing worker #$i...") + pid = i+1 + @spawnat pid run_worker(i, workers_started, problem, optimizer_generator, + from_workers, to_workers[i], + migrationSize, migrationPeriod) + end + # wait until all the workers are started + # FIXME is it required? + nstarted = 0 + while nstarted < nworkers + take!(workers_started) + nstarted += 1 + end + info("All workers started") + ParallelPopulationOptimizer{F, P}(from_workers, to_workers, + ParallelPseudoEvaluator(problem, length(to_workers); + archiveCapacity = archiveCapacity)) +end + +function parallel_population_optimizer(problem::OptimizationProblem, parameters::Parameters) + param_dict = convert(ParamsDict, parameters) # FIXME convert to dict to avoid serialization problems of DictChain + params = chain(ParallelPopulationOptimizer_DefaultParameters, parameters) + worker_method = params[:WorkerMethod] + info( "Using $worker_method as worker method for parallel optimization") + optimizer_func = ValidMethods[worker_method] + + ParallelPopulationOptimizer(problem, problem -> optimizer_func(problem, param_dict), + params[:NWorkers], params[:MigrationSize], params[:MigrationPeriod], + params[:ArchiveCapacity]) +end + +# redirects candidate to another worker +function redirect{F}(ppopt::ParallelPopulationOptimizer{F}, msg::CandidateMessage{F}) + # redirect to the other parallel task + #println("redirecting from $(msg.worker)") + recv_ix = sample(1:(length(ppopt.to_workers)-1)) + if recv_ix >= msg.worker # index is the origin worker + recv_ix += 1 + end + msg.candi.op = NO_GEN_OP # reset operation and tag to avoid calling adjust!() out-of-context + msg.candi.tag = 0 + put!(ppopt.to_workers[recv_ix], msg) + #println("redirecting done") +end + +function step!(ppopt::ParallelPopulationOptimizer) + #println("main#: n_evals=$(num_evals(ppopt.evaluator))") + last_better = num_better(ppopt.evaluator) + candidate = take!(ppopt.from_workers) + #println("candidate=$candidate") + store!(ppopt.evaluator, candidate) + redirect(ppopt, candidate) + return num_better(ppopt.evaluator) - last_better +end + +# finalize the master: wait for the workers shutdown, +# get their best candidates +function finalize!(ppopt::ParallelPopulationOptimizer, evaluator::ParallelPseudoEvaluator) + # send special terminating candidate with worker -12345 index + F = fitness_type(evaluator) + for to_worker in ppopt.to_workers + put!(to_worker, CandidateMessage{F}(-12345, WorkerMetrics(), Candidate{F}(Individual()))) + end + # wait until all threads finish + # the last candidates being sent are the best in the population + n_finished = 0 + while n_finished < nworkers(ppopt) + msg = take!(ppopt.from_workers) + if msg.worker < 0 # HACK messages with negative worker is the finishing mark + msg = CandidateMessage{F}(-msg.worker, msg.metrics, msg.candi) # fix the worker Id + store!(evaluator, msg) # store the best candidate + n_finished += 1 + info("Worker #$(msg.worker) finished") + end + end + info("Parallel optimizer finished. Metrics per worker: $(evaluator.workers_metrics)") +end + +# wraps the worker's population optimizer +# and communicates with the master +type PopulationOptimizerWrapper{O<:PopulationOptimizer,E<:Evaluator} + id::Int # worker's Id + optimizer::O + evaluator::E + to_master::MessageUtils.SyncObjRef{RemoteChannel} # outgoing candidates + from_master::MessageUtils.SyncObjRef{RemoteChannel} # incoming candidates + migrationSize::Int # size of the migrating group + migrationPeriod::Int # number of iterations between the migrations + + metrics::WorkerMetrics # current metrics + is_stopping::Bool # if the worker is in stopping mode +end + +# out wrapper ctor, +# starts "background" migrants receiver task +function PopulationOptimizerWrapper{O<:PopulationOptimizer,E<:Evaluator}( + id::Int, optimizer::O, evaluator::E, + to_master::MessageUtils.SyncObjRef{RemoteChannel}, + from_master::MessageUtils.SyncObjRef{RemoteChannel}, + migrationSize = 1, migrationPeriod = 100) + wrapper = PopulationOptimizerWrapper{O,E}(id, optimizer, evaluator, + to_master, from_master, + migrationSize, migrationPeriod, + WorkerMetrics(), false) + # task that reads imigrants from the master + @schedule begin + while !wrapper.is_stopping + #println("receiving imigrants...") + recv_imigrants!(wrapper) + #println("imigrants task yield()") + yield() # return to normal worker processing + end + end + + return wrapper +end + +function send_emigrants(wrapper::PopulationOptimizerWrapper) + pop = population(wrapper.optimizer) + # prepare the group of emigrants + migrant_ixs = sample(1:popsize(pop), wrapper.migrationSize, replace=false) + for migrant_ix in migrant_ixs + migrant = acquire_candi(pop, migrant_ix) + # send them outward + wrapper.metrics.num_evals = num_evals(wrapper.evaluator) + put!(wrapper.to_master, CandidateMessage{fitness_type(pop)}(wrapper.id, wrapper.metrics, migrant)) + # FIXME check that we the reuse of candidate does not affect + # the migrants while they wait to be sent + release_candi(pop, migrant) + end +end + +# send the best candidate to the master to acknowledge worker shutdown +function finalize!(wrapper::PopulationOptimizerWrapper) + wrapper.is_stopping = true + # send the best candidate + pop = population(wrapper.optimizer) + best_candi = acquire_candi(pop) + copy!(best_candi.params, best_candidate(wrapper.evaluator.archive)) + best_candi.fitness = best_fitness(wrapper.evaluator.archive) + best_candi.index = -1 # we don't know it + best_candi.tag = 0 + # HACK send negative worker index to acknowledge the worker is finishing + wrapper.metrics.num_evals = num_evals(wrapper.evaluator) + put!(wrapper.to_master, CandidateMessage{fitness_type(pop)}(-wrapper.id, wrapper.metrics, best_candi)) + release_candi(pop, best_candi) +end + +# receive migrants (called from "background" task) +function recv_imigrants!(wrapper::PopulationOptimizerWrapper) + pop = population(wrapper.optimizer) + msg = take!(wrapper.from_master) + if msg.worker == -12345 # special index sent by master to indicate termination + finalize!(wrapper) + return + end + + # assign migrants to random population indices + migrant_ix = sample(1:popsize(pop)) + candidates = Vector{candidate_type(pop)}() + sizehint!(candidates, 2) + push!(candidates, acquire_candi(pop, migrant_ix)) + push!(candidates, acquire_candi(pop, msg.candi)) + candidates[end].index = migrant_ix # override the incoming index + rank_by_fitness!(wrapper.evaluator, candidates) + wrapper.metrics.num_migrated += 1 + wrapper.metrics.num_better_migrated += tell!(wrapper.optimizer, candidates) +end + +# run the wrapper (called in the "main" task) +function run!(wrapper::PopulationOptimizerWrapper) + while !wrapper.is_stopping + wrapper.metrics.num_steps += 1 + #println("$(wrapper.metrics.num_steps)-th iteration") + if wrapper.metrics.num_steps % wrapper.migrationPeriod == 0 + #info("$(myid()): sending migrants") + send_emigrants(wrapper) # before we started processing + #println("run!() task yield()") + yield() # switch to migrants receiving task + end + # normal ask/tell sequence + candidates = ask(wrapper.optimizer) + rank_by_fitness!(wrapper.evaluator, candidates) + wrapper.metrics.num_better += tell!(wrapper.optimizer, candidates) + end +end + +# Function that the master process spawns at each worker process. +# Creates and run the worker wrapper +function run_worker(id::Int, + workers_started::MessageUtils.SyncObjRef{RemoteChannel}, + problem::OptimizationProblem, + optimizer_generator::Function, + to_master::MessageUtils.SyncObjRef{RemoteChannel}, + from_master::MessageUtils.SyncObjRef{RemoteChannel}, + migrationSize, migrationPeriod) + info("Initializing parallel optimization worker #$id at task=$(myid())") + wrapper = PopulationOptimizerWrapper(id, + optimizer_generator(problem), + ProblemEvaluator(problem), + to_master, from_master, + migrationSize, migrationPeriod) + # create immigrants receiving tasks + put!(workers_started, true) + info("Starting worker #$id") + run!(wrapper) + info("Worker #$id stopped") +end diff --git a/test/runtests.jl b/test/runtests.jl index 30d76582..575cd2c3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -18,6 +18,7 @@ my_tests = [ "test_differential_evolution.jl", "test_adaptive_differential_evolution.jl", "test_natural_evolution_strategies.jl", + "test_parallel_population_optimizer.jl", "test_toplevel_bboptimize.jl", "test_smoketest_bboptimize.jl", diff --git a/test/test_parallel_population_optimizer.jl b/test/test_parallel_population_optimizer.jl new file mode 100644 index 00000000..39a78363 --- /dev/null +++ b/test/test_parallel_population_optimizer.jl @@ -0,0 +1,20 @@ +facts("Parallel population optimizer") do + +if nprocs() < 4 + addprocs(4-nprocs()) +end + +@everywhere using BlackBoxOptim, MessageUtils + +rosenbrock2d(x) = (1.0 - x[1])^2 + 100.0 * (x[2] - x[1]^2)^2 + +b, f = bboptimize(rosenbrock2d; method = :parallel_population_optimizer, + search_space = [(-5.0, 5.0), (-2.0, 2.0)], max_time = 100.0, + parameters = @compat Dict{Symbol,Any}( + :ShowTrace => true, + :MigrationSize => 2, + :MigrationPeriod => 100)) +@fact size(b) => (2,) +@fact typeof(f) => Float64 +@fact f => less_than(100.0) +end