Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic worker options for worker command. #4970

Merged
merged 46 commits into from
Jun 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
815c1da
Add fields for fbs
jovany-wang Jun 10, 2019
21ffb93
WIP
jovany-wang Jun 11, 2019
f1e2469
Fix complition errors
jovany-wang Jun 11, 2019
644df89
Add java part
jovany-wang Jun 12, 2019
a014120
FIx
jovany-wang Jun 12, 2019
5006871
Fix
jovany-wang Jun 12, 2019
04cabf8
Fix
jovany-wang Jun 12, 2019
a67d530
Fix lint
jovany-wang Jun 13, 2019
f3816f7
Refine API
jovany-wang Jun 14, 2019
28b1b05
address comments and add test
jovany-wang Jun 14, 2019
9564ce7
Fix
jovany-wang Jun 14, 2019
f85cbb9
Address comment.
jovany-wang Jun 14, 2019
afe1e33
Address comments.
jovany-wang Jun 15, 2019
569c236
Fix linting
jovany-wang Jun 15, 2019
3a5ee31
Refine
jovany-wang Jun 15, 2019
8f8326c
Fix lint
jovany-wang Jun 16, 2019
23fede1
WIP: address comment.
jovany-wang Jun 18, 2019
b6dba59
Fix java
jovany-wang Jun 18, 2019
43bfcf2
Fix py
jovany-wang Jun 18, 2019
e58c7e5
Refin
jovany-wang Jun 19, 2019
c165a38
Fix
jovany-wang Jun 19, 2019
fcd2ea7
Fix
jovany-wang Jun 19, 2019
54fa36c
Fix linting
jovany-wang Jun 20, 2019
34fb3b8
Fix lint
jovany-wang Jun 20, 2019
34ab434
Address comments
jovany-wang Jun 20, 2019
8948f9d
WIP
jovany-wang Jun 20, 2019
d5657a3
Fix
jovany-wang Jun 20, 2019
b8117b6
Fix
jovany-wang Jun 20, 2019
68b9002
minor refine
jovany-wang Jun 20, 2019
c096b2a
Fix lint
jovany-wang Jun 20, 2019
7755912
Fix raylet test.
jovany-wang Jun 20, 2019
3de7d6a
Fix lint
jovany-wang Jun 21, 2019
ce94f81
Update src/ray/raylet/worker_pool.h
jovany-wang Jun 21, 2019
c10777d
Update java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.…
jovany-wang Jun 21, 2019
01794c3
Address comments.
jovany-wang Jun 21, 2019
1b5ec6c
Address comments.
jovany-wang Jun 21, 2019
2010a41
Fix test.
jovany-wang Jun 21, 2019
28d7902
Update src/ray/raylet/worker_pool.h
jovany-wang Jun 21, 2019
ab178ca
Address comments.
jovany-wang Jun 21, 2019
5c30230
Address comments.
jovany-wang Jun 21, 2019
597d7c2
Fix
jovany-wang Jun 21, 2019
c65e12c
Fix lint
jovany-wang Jun 22, 2019
36429e5
Fix lint
jovany-wang Jun 22, 2019
79e6358
Fix
jovany-wang Jun 22, 2019
cc92527
Address comments.
jovany-wang Jun 22, 2019
313e1df
Fix linting
jovany-wang Jun 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ public class ActorCreationOptions extends BaseTaskOptions {

public final int maxReconstructions;

private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions) {
public final String jvmOptions;

private ActorCreationOptions(Map<String, Double> resources,
int maxReconstructions,
String jvmOptions) {
super(resources);
this.maxReconstructions = maxReconstructions;
this.jvmOptions = jvmOptions;
}

/**
Expand All @@ -25,6 +30,7 @@ public static class Builder {

private Map<String, Double> resources = new HashMap<>();
private int maxReconstructions = NO_RECONSTRUCTION;
private String jvmOptions = "";

public Builder setResources(Map<String, Double> resources) {
this.resources = resources;
Expand All @@ -36,8 +42,13 @@ public Builder setMaxReconstructions(int maxReconstructions) {
return this;
}

public Builder setJvmOptions(String jvmOptions) {
this.jvmOptions = jvmOptions;
return this;
}

public ActorCreationOptions createActorCreationOptions() {
return new ActorCreationOptions(resources, maxReconstructions);
return new ActorCreationOptions(resources, maxReconstructions, jvmOptions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.ray.runtime.task.TaskLanguage;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.IdUtil;
import org.ray.runtime.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -363,8 +364,13 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
}

int maxActorReconstruction = 0;
List<String> dynamicWorkerOptions = ImmutableList.of();
if (taskOptions instanceof ActorCreationOptions) {
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
String jvmOptions = ((ActorCreationOptions) taskOptions).jvmOptions;
if (!StringUtil.isNullOrEmpty(jvmOptions)) {
dynamicWorkerOptions = ImmutableList.of(((ActorCreationOptions) taskOptions).jvmOptions);
}
}

TaskLanguage language;
Expand Down Expand Up @@ -393,7 +399,8 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
numReturns,
resources,
language,
functionDescriptor
functionDescriptor,
dynamicWorkerOptions
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,16 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
JavaFunctionDescriptor functionDescriptor = new JavaFunctionDescriptor(
info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2)
);

// Deserialize dynamic worker options.
List<String> dynamicWorkerOptions = new ArrayList<>();
for (int i = 0; i < info.dynamicWorkerOptionsLength(); ++i) {
dynamicWorkerOptions.add(info.dynamicWorkerOptions(i));
}

return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor);
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions);
}

private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
Expand Down Expand Up @@ -275,6 +282,12 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
functionDescriptorOffset = fbb.createVectorOfTables(functionDescriptorOffsets);
}

int [] dynamicWorkerOptionsOffsets = new int[task.dynamicWorkerOptions.size()];
for (int index = 0; index < task.dynamicWorkerOptions.size(); ++index) {
dynamicWorkerOptionsOffsets[index] = fbb.createString(task.dynamicWorkerOptions.get(index));
}
int dynamicWorkerOptionsOffset = fbb.createVectorOfTables(dynamicWorkerOptionsOffsets);

int root = TaskInfo.createTaskInfo(
fbb,
driverIdOffset,
Expand All @@ -293,7 +306,8 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
requiredResourcesOffset,
requiredPlacementResourcesOffset,
language,
functionDescriptorOffset);
functionDescriptorOffset,
dynamicWorkerOptionsOffset);
fbb.finish(root);
ByteBuffer buffer = fbb.dataBuffer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ private String buildWorkerCommandRaylet() {

cmd.addAll(rayConfig.jvmParameters);

// jvm options
cmd.add("RAY_WORKER_OPTION_0");

// Main class
cmd.add(WORKER_CLASS);
String command = Joiner.on(" ").join(cmd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class TaskSpec {
// Language of this task.
public final TaskLanguage language;

public final List<String> dynamicWorkerOptions;

// Descriptor of the remote function.
// Note, if task language is Java, the type is JavaFunctionDescriptor. If the task language
// is Python, the type is PyFunctionDescriptor.
Expand Down Expand Up @@ -93,7 +95,8 @@ public TaskSpec(
int numReturns,
Map<String, Double> resources,
TaskLanguage language,
FunctionDescriptor functionDescriptor) {
FunctionDescriptor functionDescriptor,
List<String> dynamicWorkerOptions) {
this.driverId = driverId;
this.taskId = taskId;
this.parentTaskId = parentTaskId;
Expand All @@ -106,6 +109,8 @@ public TaskSpec(
this.newActorHandles = newActorHandles;
this.args = args;
this.numReturns = numReturns;
this.dynamicWorkerOptions = dynamicWorkerOptions;

returnIds = new ObjectId[numReturns];
for (int i = 0; i < numReturns; ++i) {
returnIds[i] = IdUtil.computeReturnId(taskId, i + 1);
Expand Down Expand Up @@ -157,6 +162,7 @@ public String toString() {
", resources=" + resources +
", language=" + language +
", functionDescriptor=" + functionDescriptor +
", dynamicWorkerOptions=" + dynamicWorkerOptions +
", executionDependencies=" + executionDependencies +
'}';
}
Expand Down
31 changes: 31 additions & 0 deletions java/test/src/main/java/org/ray/api/test/WorkerJvmOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.ray.api.test;

import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.options.ActorCreationOptions;
import org.testng.Assert;
import org.testng.annotations.Test;

public class WorkerJvmOptionsTest extends BaseTest {

@RayRemote
public static class Echo {
String getOptions() {
return System.getProperty("test.suffix");
}
}

@Test
public void testJvmOptions() {
TestUtils.skipTestUnderSingleProcess();
ActorCreationOptions options = new ActorCreationOptions.Builder()
.setJvmOptions("-Dtest.suffix=suffix")
.createActorCreationOptions();
RayActor<Echo> actor = Ray.createActor(Echo::new, options);
RayObject<String> obj = Ray.call(Echo::getOptions, actor);
Assert.assertEquals(obj.get(), "suffix");
}
}
3 changes: 3 additions & 0 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,7 @@ def build_java_worker_command(
assert java_worker_options is not None

command = "java "

if redis_address is not None:
command += "-Dray.redis.address={} ".format(redis_address)

Expand All @@ -1253,6 +1254,8 @@ def build_java_worker_command(
# Put `java_worker_options` in the last, so it can overwrite the
# above options.
command += java_worker_options + " "

command += "RAY_WORKER_OPTION_0 "
command += "org.ray.runtime.runner.worker.DefaultWorker"

return command
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ constexpr char kObjectTablePrefix[] = "ObjectTable";
/// Prefix for the task table keys in redis.
constexpr char kTaskTablePrefix[] = "TaskTable";

constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = "RAY_WORKER_OPTION_";

#endif // RAY_CONSTANTS_H_
5 changes: 5 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ table TaskInfo {
// For a Python function, it should be: [module_name, class_name, function_name]
// For a Java function, it should be: [class_name, method_name, type_descriptor]
function_descriptor: [string];
// The dynamic options used in the worker command when starting the worker process for
// an actor creation task. If the list isn't empty, the options will be used to replace
// the placeholder strings (`RAY_WORKER_OPTION_0`, `RAY_WORKER_OPTION_1`, etc) in the
// worker command.
dynamic_worker_options: [string];
}

table ResourcePair {
Expand Down
21 changes: 8 additions & 13 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
config.maximum_startup_concurrency, config.worker_commands),
config.maximum_startup_concurrency, gcs_client_,
config.worker_commands),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
Expand Down Expand Up @@ -1723,18 +1724,6 @@ bool NodeManager::AssignTask(const Task &task) {
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec);
if (worker == nullptr) {
// There are no workers that can execute this task.
if (!spec.IsActorTask()) {
// There are no more non-actor workers available to execute this task.
// Start a new worker.
worker_pool_.StartWorkerProcess(spec.GetLanguage());
// Push an error message to the user if the worker pool tells us that it is
// getting too big.
const std::string warning_message = worker_pool_.WarningAboutSize();
if (warning_message != "") {
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
DriverID::Nil(), "worker_pool_large", warning_message, current_time_ms()));
}
}
// We couldn't assign this task, as no worker available.
return false;
}
Expand Down Expand Up @@ -2205,6 +2194,12 @@ void NodeManager::ForwardTask(
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();

if (worker_pool_.HasPendingWorkerForTask(spec.GetLanguage(), task_id)) {
// There is a worker being starting for this task,
// so we shouldn't forward this task to another node.
return;
}

// Get and serialize the task's unforwarded, uncommitted lineage.
Lineage uncommitted_lineage;
if (lineage_cache_.ContainsTask(task_id)) {
Expand Down
12 changes: 9 additions & 3 deletions src/ray/raylet/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ TaskSpecification::TaskSpecification(
const std::vector<std::shared_ptr<TaskArgument>> &task_arguments, int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const Language &language, const std::vector<std::string> &function_descriptor)
const Language &language, const std::vector<std::string> &function_descriptor,
const std::vector<std::string> &dynamic_worker_options)
: spec_() {
flatbuffers::FlatBufferBuilder fbb;

TaskID task_id = GenerateTaskId(driver_id, parent_task_id, parent_counter);

// Add argument object IDs.
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
Expand All @@ -101,7 +101,8 @@ TaskSpecification::TaskSpecification(
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), num_returns,
map_to_flatbuf(fbb, required_resources),
map_to_flatbuf(fbb, required_placement_resources), language,
string_vec_to_flatbuf(fbb, function_descriptor));
string_vec_to_flatbuf(fbb, function_descriptor),
string_vec_to_flatbuf(fbb, dynamic_worker_options));
fbb.Finish(spec);
AssignSpecification(fbb.GetBufferPointer(), fbb.GetSize());
}
Expand Down Expand Up @@ -258,6 +259,11 @@ std::vector<ActorHandleID> TaskSpecification::NewActorHandles() const {
return ids_from_flatbuf<ActorHandleID>(*message->new_actor_handles());
}

std::vector<std::string> TaskSpecification::DynamicWorkerOptions() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return string_vec_from_flatbuf(*message->dynamic_worker_options());
}

} // namespace raylet

} // namespace ray
6 changes: 5 additions & 1 deletion src/ray/raylet/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class TaskSpecification {
/// will default to be equal to the required_resources argument.
/// \param language The language of the worker that must execute the function.
/// \param function_descriptor The function descriptor.
/// \param dynamic_worker_options The dynamic options for starting an actor worker.
TaskSpecification(
const DriverID &driver_id, const TaskID &parent_task_id, int64_t parent_counter,
const ActorID &actor_creation_id, const ObjectID &actor_creation_dummy_object_id,
Expand All @@ -138,7 +139,8 @@ class TaskSpecification {
int64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const Language &language, const std::vector<std::string> &function_descriptor);
const Language &language, const std::vector<std::string> &function_descriptor,
const std::vector<std::string> &dynamic_worker_options = {});

/// Deserialize a task specification from a string.
///
Expand Down Expand Up @@ -214,6 +216,8 @@ class TaskSpecification {
ObjectID ActorDummyObject() const;
std::vector<ActorHandleID> NewActorHandles() const;

std::vector<std::string> DynamicWorkerOptions() const;

private:
/// Assign the specification data from a pointer.
void AssignSpecification(const uint8_t *spec, size_t spec_size);
Expand Down
Loading