Skip to content

Commit

Permalink
Add dynamic worker options for worker command. (#4970)
Browse files Browse the repository at this point in the history
* Add fields for fbs

* WIP

* Fix complition errors

* Add java part

* FIx

* Fix

* Fix

* Fix lint

* Refine API

* address comments and add test

* Fix

* Address comment.

* Address comments.

* Fix linting

* Refine

* Fix lint

* WIP: address comment.

* Fix java

* Fix py

* Refin

* Fix

* Fix

* Fix linting

* Fix lint

* Address comments

* WIP

* Fix

* Fix

* minor refine

* Fix lint

* Fix raylet test.

* Fix lint

* Update src/ray/raylet/worker_pool.h

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Update java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Address comments.

* Address comments.

* Fix test.

* Update src/ray/raylet/worker_pool.h

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Address comments.

* Address comments.

* Fix

* Fix lint

* Fix lint

* Fix

* Address comments.

* Fix linting
  • Loading branch information
jovany-wang authored Jun 23, 2019
1 parent 2e342ef commit e33d0ea
Show file tree
Hide file tree
Showing 15 changed files with 290 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ public class ActorCreationOptions extends BaseTaskOptions {

public final int maxReconstructions;

private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions) {
public final String jvmOptions;

private ActorCreationOptions(Map<String, Double> resources,
int maxReconstructions,
String jvmOptions) {
super(resources);
this.maxReconstructions = maxReconstructions;
this.jvmOptions = jvmOptions;
}

/**
Expand All @@ -25,6 +30,7 @@ public static class Builder {

private Map<String, Double> resources = new HashMap<>();
private int maxReconstructions = NO_RECONSTRUCTION;
private String jvmOptions = "";

public Builder setResources(Map<String, Double> resources) {
this.resources = resources;
Expand All @@ -36,8 +42,13 @@ public Builder setMaxReconstructions(int maxReconstructions) {
return this;
}

public Builder setJvmOptions(String jvmOptions) {
this.jvmOptions = jvmOptions;
return this;
}

public ActorCreationOptions createActorCreationOptions() {
return new ActorCreationOptions(resources, maxReconstructions);
return new ActorCreationOptions(resources, maxReconstructions, jvmOptions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.ray.runtime.task.TaskLanguage;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.IdUtil;
import org.ray.runtime.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -363,8 +364,13 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
}

int maxActorReconstruction = 0;
List<String> dynamicWorkerOptions = ImmutableList.of();
if (taskOptions instanceof ActorCreationOptions) {
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
String jvmOptions = ((ActorCreationOptions) taskOptions).jvmOptions;
if (!StringUtil.isNullOrEmpty(jvmOptions)) {
dynamicWorkerOptions = ImmutableList.of(((ActorCreationOptions) taskOptions).jvmOptions);
}
}

TaskLanguage language;
Expand Down Expand Up @@ -393,7 +399,8 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
numReturns,
resources,
language,
functionDescriptor
functionDescriptor,
dynamicWorkerOptions
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,16 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
JavaFunctionDescriptor functionDescriptor = new JavaFunctionDescriptor(
info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2)
);

// Deserialize dynamic worker options.
List<String> dynamicWorkerOptions = new ArrayList<>();
for (int i = 0; i < info.dynamicWorkerOptionsLength(); ++i) {
dynamicWorkerOptions.add(info.dynamicWorkerOptions(i));
}

return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor);
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions);
}

private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
Expand Down Expand Up @@ -275,6 +282,12 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
functionDescriptorOffset = fbb.createVectorOfTables(functionDescriptorOffsets);
}

int [] dynamicWorkerOptionsOffsets = new int[task.dynamicWorkerOptions.size()];
for (int index = 0; index < task.dynamicWorkerOptions.size(); ++index) {
dynamicWorkerOptionsOffsets[index] = fbb.createString(task.dynamicWorkerOptions.get(index));
}
int dynamicWorkerOptionsOffset = fbb.createVectorOfTables(dynamicWorkerOptionsOffsets);

int root = TaskInfo.createTaskInfo(
fbb,
driverIdOffset,
Expand All @@ -293,7 +306,8 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
requiredResourcesOffset,
requiredPlacementResourcesOffset,
language,
functionDescriptorOffset);
functionDescriptorOffset,
dynamicWorkerOptionsOffset);
fbb.finish(root);
ByteBuffer buffer = fbb.dataBuffer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ private String buildWorkerCommandRaylet() {

cmd.addAll(rayConfig.jvmParameters);

// jvm options
cmd.add("RAY_WORKER_OPTION_0");

// Main class
cmd.add(WORKER_CLASS);
String command = Joiner.on(" ").join(cmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class TaskSpec {
// Language of this task.
public final TaskLanguage language;

public final List<String> dynamicWorkerOptions;

// Descriptor of the remote function.
// Note, if task language is Java, the type is JavaFunctionDescriptor. If the task language
// is Python, the type is PyFunctionDescriptor.
Expand Down Expand Up @@ -93,7 +95,8 @@ public TaskSpec(
int numReturns,
Map<String, Double> resources,
TaskLanguage language,
FunctionDescriptor functionDescriptor) {
FunctionDescriptor functionDescriptor,
List<String> dynamicWorkerOptions) {
this.driverId = driverId;
this.taskId = taskId;
this.parentTaskId = parentTaskId;
Expand All @@ -106,6 +109,8 @@ public TaskSpec(
this.newActorHandles = newActorHandles;
this.args = args;
this.numReturns = numReturns;
this.dynamicWorkerOptions = dynamicWorkerOptions;

returnIds = new ObjectId[numReturns];
for (int i = 0; i < numReturns; ++i) {
returnIds[i] = IdUtil.computeReturnId(taskId, i + 1);
Expand Down Expand Up @@ -157,6 +162,7 @@ public String toString() {
", resources=" + resources +
", language=" + language +
", functionDescriptor=" + functionDescriptor +
", dynamicWorkerOptions=" + dynamicWorkerOptions +
", executionDependencies=" + executionDependencies +
'}';
}
Expand Down
31 changes: 31 additions & 0 deletions java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.ray.api.test;

import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.options.ActorCreationOptions;
import org.testng.Assert;
import org.testng.annotations.Test;

public class WorkerJvmOptionsTest extends BaseTest {

@RayRemote
public static class Echo {
String getOptions() {
return System.getProperty("test.suffix");
}
}

@Test
public void testJvmOptions() {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options = new ActorCreationOptions.Builder()
.setJvmOptions("-Dtest.suffix=suffix")
.createActorCreationOptions();
RayActor<Echo> actor = Ray.createActor(Echo::new, options);
RayObject<String> obj = Ray.call(Echo::getOptions, actor);
Assert.assertEquals(obj.get(), "suffix");
}
}
3 changes: 3 additions & 0 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ def build_java_worker_command(
assert java_worker_options is not None

command = "java "

if redis_address is not None:
command += "-Dray.redis.address={} ".format(redis_address)

Expand All @@ -1253,6 +1254,8 @@ def build_java_worker_command(
# Put `java_worker_options` in the last, so it can overwrite the
# above options.
command += java_worker_options + " "

command += "RAY_WORKER_OPTION_0 "
command += "org.ray.runtime.runner.worker.DefaultWorker"

return command
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ constexpr char kObjectTablePrefix[] = "ObjectTable";
/// Prefix for the task table keys in redis.
constexpr char kTaskTablePrefix[] = "TaskTable";

constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = "RAY_WORKER_OPTION_";

#endif // RAY_CONSTANTS_H_
5 changes: 5 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ table TaskInfo {
// For a Python function, it should be: [module_name, class_name, function_name]
// For a Java function, it should be: [class_name, method_name, type_descriptor]
function_descriptor: [string];
// The dynamic options used in the worker command when starting the worker process for
// an actor creation task. If the list isn't empty, the options will be used to replace
// the placeholder strings (`RAY_WORKER_OPTION_0`, `RAY_WORKER_OPTION_1`, etc) in the
// worker command.
dynamic_worker_options: [string];
}

table ResourcePair {
Expand Down
21 changes: 8 additions & 13 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
config.maximum_startup_concurrency, config.worker_commands),
config.maximum_startup_concurrency, gcs_client_,
config.worker_commands),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
Expand Down Expand Up @@ -1723,18 +1724,6 @@ bool NodeManager::AssignTask(const Task &task) {
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec);
if (worker == nullptr) {
// There are no workers that can execute this task.
if (!spec.IsActorTask()) {
// There are no more non-actor workers available to execute this task.
// Start a new worker.
worker_pool_.StartWorkerProcess(spec.GetLanguage());
// Push an error message to the user if the worker pool tells us that it is
// getting too big.
const std::string warning_message = worker_pool_.WarningAboutSize();
if (warning_message != "") {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
DriverID::Nil(), "worker_pool_large", warning_message, current_time_ms()));
}
}
// We couldn't assign this task, as no worker available.
return false;
}
Expand Down Expand Up @@ -2205,6 +2194,12 @@ void NodeManager::ForwardTask(
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();

if (worker_pool_.HasPendingWorkerForTask(spec.GetLanguage(), task_id)) {
// There is a worker being starting for this task,
// so we shouldn't forward this task to another node.
return;
}

// Get and serialize the task's unforwarded, uncommitted lineage.
Lineage uncommitted_lineage;
if (lineage_cache_.ContainsTask(task_id)) {
Expand Down
12 changes: 9 additions & 3 deletions src/ray/raylet/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ TaskSpecification::TaskSpecification(
const std::vector<std::shared_ptr<TaskArgument>> &task_arguments, int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const Language &language, const std::vector<std::string> &function_descriptor)
const Language &language, const std::vector<std::string> &function_descriptor,
const std::vector<std::string> &dynamic_worker_options)
: spec_() {
flatbuffers::FlatBufferBuilder fbb;

TaskID task_id = GenerateTaskId(driver_id, parent_task_id, parent_counter);

// Add argument object IDs.
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
Expand All @@ -101,7 +101,8 @@ TaskSpecification::TaskSpecification(
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), num_returns,
map_to_flatbuf(fbb, required_resources),
map_to_flatbuf(fbb, required_placement_resources), language,
string_vec_to_flatbuf(fbb, function_descriptor));
string_vec_to_flatbuf(fbb, function_descriptor),
string_vec_to_flatbuf(fbb, dynamic_worker_options));
fbb.Finish(spec);
AssignSpecification(fbb.GetBufferPointer(), fbb.GetSize());
}
Expand Down Expand Up @@ -258,6 +259,11 @@ std::vector<ActorHandleID> TaskSpecification::NewActorHandles() const {
return ids_from_flatbuf<ActorHandleID>(*message->new_actor_handles());
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return string_vec_from_flatbuf(*message->dynamic_worker_options());
}

} // namespace raylet

} // namespace ray
6 changes: 5 additions & 1 deletion src/ray/raylet/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class TaskSpecification {
/// will default to be equal to the required_resources argument.
/// \param language The language of the worker that must execute the function.
/// \param function_descriptor The function descriptor.
/// \param dynamic_worker_options The dynamic options for starting an actor worker.
TaskSpecification(
const DriverID &driver_id, const TaskID &parent_task_id, int64_t parent_counter,
const ActorID &actor_creation_id, const ObjectID &actor_creation_dummy_object_id,
Expand All @@ -138,7 +139,8 @@ class TaskSpecification {
int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const Language &language, const std::vector<std::string> &function_descriptor);
const Language &language, const std::vector<std::string> &function_descriptor,
const std::vector<std::string> &dynamic_worker_options = {});

/// Deserialize a task specification from a string.
///
Expand Down Expand Up @@ -214,6 +216,8 @@ class TaskSpecification {
ObjectID ActorDummyObject() const;
std::vector<ActorHandleID> NewActorHandles() const;

std::vector<std::string> DynamicWorkerOptions() const;

private:
/// Assign the specification data from a pointer.
void AssignSpecification(const uint8_t *spec, size_t spec_size);
Expand Down
Loading

0 comments on commit e33d0ea

Please sign in to comment.