Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ReturnIds with NumReturns in TaskInfo to reduce the size #4854

Merged
merged 5 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
actor.increaseTaskCounter(),
actor.getNewActorHandles().toArray(new UniqueId[0]),
ArgumentsBuilder.wrap(args, language == TaskLanguage.PYTHON),
returnIds,
numReturns,
resources,
language,
functionDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
UniqueId actorId = UniqueId.fromByteBuffer(info.actorIdAsByteBuffer());
UniqueId actorHandleId = UniqueId.fromByteBuffer(info.actorHandleIdAsByteBuffer());
int actorCounter = info.actorCounter();
int numReturns = info.numReturns();

// Deserialize new actor handles
UniqueId[] newActorHandles = IdUtil.getUniqueIdsFromByteBuffer(
Expand All @@ -177,8 +178,6 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
args[i] = FunctionArg.passByValue(data);
}
}
// Deserialize return ids
ObjectId[] returnIds = IdUtil.getObjectIdsFromByteBuffer(info.returnsAsByteBuffer());

// Deserialize required resources;
Map<String, Double> resources = new HashMap<>();
Expand All @@ -193,7 +192,7 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
);
return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, returnIds, resources, TaskLanguage.JAVA, functionDescriptor);
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor);
}

private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
Expand All @@ -211,6 +210,7 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer());
final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer());
final int actorCounter = task.actorCounter;
final int numReturnsOffset = task.numReturns;

// Serialize the new actor handles.
int newActorHandlesOffset
Expand All @@ -234,9 +234,6 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
}
int argsOffset = fbb.createVectorOfTables(argsOffsets);

// Serialize returns
int returnsOffset = fbb.createString(IdUtil.concatIds(task.returnIds));

// Serialize required resources
// The required_resources vector indicates the quantities of the different
// resources required by this task. The index in this vector corresponds to
Expand Down Expand Up @@ -292,7 +289,7 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
actorCounter,
newActorHandlesOffset,
argsOffset,
returnsOffset,
numReturnsOffset,
requiredResourcesOffset,
requiredPlacementResourcesOffset,
language,
Expand Down
16 changes: 12 additions & 4 deletions java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.runtime.functionmanager.PyFunctionDescriptor;
import org.ray.runtime.util.IdUtil;

/**
* Represents necessary information of a task for scheduling and executing.
Expand Down Expand Up @@ -50,7 +51,10 @@ public class TaskSpec {
// Task arguments.
public final FunctionArg[] args;

// return ids
// number of return objects.
public final int numReturns;
Copy link
Contributor

@jovany-wang jovany-wang May 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this member numReturns ? I think we can use returnIds.size() instead of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jovany-wang We would better to keep this to align with the gcs.fbs? returnIds is not in gcs.fbs now but it is called in many places, so I kept it.


// returns ids.
public final ObjectId[] returnIds;

// The task's resource demands.
Expand Down Expand Up @@ -86,7 +90,7 @@ public TaskSpec(
int actorCounter,
UniqueId[] newActorHandles,
FunctionArg[] args,
ObjectId[] returnIds,
int numReturns,
Map<String, Double> resources,
TaskLanguage language,
FunctionDescriptor functionDescriptor) {
Expand All @@ -101,7 +105,11 @@ public TaskSpec(
this.actorCounter = actorCounter;
this.newActorHandles = newActorHandles;
this.args = args;
this.returnIds = returnIds;
this.numReturns = numReturns;
returnIds = new ObjectId[numReturns];
for (int i = 0; i < numReturns; ++i) {
returnIds[i] = IdUtil.computeReturnId(taskId, i + 1);
}
this.resources = resources;
this.language = language;
if (language == TaskLanguage.JAVA) {
Expand Down Expand Up @@ -145,7 +153,7 @@ public String toString() {
", actorCounter=" + actorCounter +
", newActorHandles=" + Arrays.toString(newActorHandles) +
", args=" + Arrays.toString(args) +
", returnIds=" + Arrays.toString(returnIds) +
", numReturns=" + numReturns +
", resources=" + resources +
", language=" + language +
", functionDescriptor=" + functionDescriptor +
Expand Down
5 changes: 2 additions & 3 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ table TaskInfo {
new_actor_handles: string;
// Task arguments.
args: [Arg];
// Object IDs of return values. This is a long string that concatenate
// all of the return object IDs of this task.
returns: string;
// Number of return objects.
num_returns: int;
// The required_resources vector indicates the quantities of the different
// resources required by this task.
required_resources: [ResourcePair];
Expand Down
14 changes: 4 additions & 10 deletions src/ray/raylet/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,14 @@ TaskSpecification::TaskSpecification(
arguments.push_back(argument->ToFlatbuffer(fbb));
}

// Generate return ids.
std::vector<ray::ObjectID> returns;
for (int64_t i = 1; i < num_returns + 1; ++i) {
returns.push_back(ObjectID::for_task_return(task_id, i));
}

// Serialize the TaskSpecification.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, driver_id), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id),
to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions,
to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter,
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments),
ids_to_flatbuf(fbb, returns), map_to_flatbuf(fbb, required_resources),
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), num_returns,
map_to_flatbuf(fbb, required_resources),
map_to_flatbuf(fbb, required_placement_resources), language,
string_vec_to_flatbuf(fbb, function_descriptor));
fbb.Finish(spec);
Expand Down Expand Up @@ -167,12 +161,12 @@ int64_t TaskSpecification::NumArgs() const {

int64_t TaskSpecification::NumReturns() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return (message->returns()->size() / kUniqueIDSize);
return message->num_returns();
}

ObjectID TaskSpecification::ReturnId(int64_t return_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return ids_from_flatbuf<ObjectID>(*message->returns())[return_index];
return ObjectID::for_task_return(TaskId(), return_index + 1);
}

bool TaskSpecification::ArgByRef(int64_t arg_index) const {
Expand Down
51 changes: 51 additions & 0 deletions src/ray/raylet/task_test.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"

#include "ray/common/common_protocol.h"
#include "ray/raylet/task_spec.h"

namespace ray {
Expand Down Expand Up @@ -47,6 +48,56 @@ TEST(IdPropertyTest, TestIdProperty) {
ASSERT_TRUE(ObjectID::nil().is_nil());
}

TEST(TaskSpecTest, TaskInfoSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test only print task spec size, without asserting anything?
Is it needed to check it in?

std::vector<ObjectID> references = {ObjectID::from_random(), ObjectID::from_random()};
auto arguments_1 = std::make_shared<TaskArgumentByReference>(references);
std::string one_arg("This is an value argument.");
auto arguments_2 = std::make_shared<TaskArgumentByValue>(
reinterpret_cast<const uint8_t *>(one_arg.c_str()), one_arg.size());
std::vector<std::shared_ptr<TaskArgument>> task_arguments({arguments_1, arguments_2});
auto task_id = TaskID::from_random();
{
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
arguments.push_back(argument->ToFlatbuffer(fbb));
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ObjectID::nil()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ActorHandleID::nil()), 0,
ids_to_flatbuf(fbb, std::vector<ObjectID>()), fbb.CreateVector(arguments), 1,
map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
fbb.Finish(spec);
RAY_LOG(ERROR) << "Ordinary task info size: " << fbb.GetSize();
}

{
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
arguments.push_back(argument->ToFlatbuffer(fbb));
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 10,
to_flatbuf(fbb, ActorID::from_random()), to_flatbuf(fbb, ObjectID::from_random()),
10000000, to_flatbuf(fbb, ActorID::from_random()),
to_flatbuf(fbb, ActorHandleID::from_random()), 20,
ids_to_flatbuf(fbb, std::vector<ObjectID>(
{ObjectID::from_random(), ObjectID::from_random()})),
fbb.CreateVector(arguments), 2, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}),
Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
fbb.Finish(spec);
RAY_LOG(ERROR) << "Actor task info size: " << fbb.GetSize();
}
}

} // namespace raylet

} // namespace ray
Expand Down