Skip to content

Commit

Permalink
Replace ReturnIds with NumReturns in TaskInfo to reduce the size (#4854)
Browse files Browse the repository at this point in the history
* Refine TaskInfo

* Fix

* Add a test to print task info size

* Lint

* Refine
  • Loading branch information
guoyuhong authored May 28, 2019
1 parent d7be5a5 commit fa0892f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
actor.increaseTaskCounter(),
actor.getNewActorHandles().toArray(new UniqueId[0]),
ArgumentsBuilder.wrap(args, language == TaskLanguage.PYTHON),
returnIds,
numReturns,
resources,
language,
functionDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
UniqueId actorId = UniqueId.fromByteBuffer(info.actorIdAsByteBuffer());
UniqueId actorHandleId = UniqueId.fromByteBuffer(info.actorHandleIdAsByteBuffer());
int actorCounter = info.actorCounter();
int numReturns = info.numReturns();

// Deserialize new actor handles
UniqueId[] newActorHandles = IdUtil.getUniqueIdsFromByteBuffer(
Expand All @@ -177,8 +178,6 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
args[i] = FunctionArg.passByValue(data);
}
}
// Deserialize return ids
ObjectId[] returnIds = IdUtil.getObjectIdsFromByteBuffer(info.returnsAsByteBuffer());

// Deserialize required resources;
Map<String, Double> resources = new HashMap<>();
Expand All @@ -193,7 +192,7 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
);
return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
args, returnIds, resources, TaskLanguage.JAVA, functionDescriptor);
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor);
}

private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
Expand All @@ -211,6 +210,7 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer());
final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer());
final int actorCounter = task.actorCounter;
final int numReturnsOffset = task.numReturns;

// Serialize the new actor handles.
int newActorHandlesOffset
Expand All @@ -234,9 +234,6 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
}
int argsOffset = fbb.createVectorOfTables(argsOffsets);

// Serialize returns
int returnsOffset = fbb.createString(IdUtil.concatIds(task.returnIds));

// Serialize required resources
// The required_resources vector indicates the quantities of the different
// resources required by this task. The index in this vector corresponds to
Expand Down Expand Up @@ -292,7 +289,7 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
actorCounter,
newActorHandlesOffset,
argsOffset,
returnsOffset,
numReturnsOffset,
requiredResourcesOffset,
requiredPlacementResourcesOffset,
language,
Expand Down
16 changes: 12 additions & 4 deletions java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.ray.runtime.functionmanager.FunctionDescriptor;
import org.ray.runtime.functionmanager.JavaFunctionDescriptor;
import org.ray.runtime.functionmanager.PyFunctionDescriptor;
import org.ray.runtime.util.IdUtil;

/**
* Represents necessary information of a task for scheduling and executing.
Expand Down Expand Up @@ -50,7 +51,10 @@ public class TaskSpec {
// Task arguments.
public final FunctionArg[] args;

// return ids
// number of return objects.
public final int numReturns;

// returns ids.
public final ObjectId[] returnIds;

// The task's resource demands.
Expand Down Expand Up @@ -86,7 +90,7 @@ public TaskSpec(
int actorCounter,
UniqueId[] newActorHandles,
FunctionArg[] args,
ObjectId[] returnIds,
int numReturns,
Map<String, Double> resources,
TaskLanguage language,
FunctionDescriptor functionDescriptor) {
Expand All @@ -101,7 +105,11 @@ public TaskSpec(
this.actorCounter = actorCounter;
this.newActorHandles = newActorHandles;
this.args = args;
this.returnIds = returnIds;
this.numReturns = numReturns;
returnIds = new ObjectId[numReturns];
for (int i = 0; i < numReturns; ++i) {
returnIds[i] = IdUtil.computeReturnId(taskId, i + 1);
}
this.resources = resources;
this.language = language;
if (language == TaskLanguage.JAVA) {
Expand Down Expand Up @@ -145,7 +153,7 @@ public String toString() {
", actorCounter=" + actorCounter +
", newActorHandles=" + Arrays.toString(newActorHandles) +
", args=" + Arrays.toString(args) +
", returnIds=" + Arrays.toString(returnIds) +
", numReturns=" + numReturns +
", resources=" + resources +
", language=" + language +
", functionDescriptor=" + functionDescriptor +
Expand Down
5 changes: 2 additions & 3 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ table TaskInfo {
new_actor_handles: string;
// Task arguments.
args: [Arg];
// Object IDs of return values. This is a long string that concatenate
// all of the return object IDs of this task.
returns: string;
// Number of return objects.
num_returns: int;
// The required_resources vector indicates the quantities of the different
// resources required by this task.
required_resources: [ResourcePair];
Expand Down
14 changes: 4 additions & 10 deletions src/ray/raylet/task_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,14 @@ TaskSpecification::TaskSpecification(
arguments.push_back(argument->ToFlatbuffer(fbb));
}

// Generate return ids.
std::vector<ray::ObjectID> returns;
for (int64_t i = 1; i < num_returns + 1; ++i) {
returns.push_back(ObjectID::for_task_return(task_id, i));
}

// Serialize the TaskSpecification.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, driver_id), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id),
to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions,
to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter,
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments),
ids_to_flatbuf(fbb, returns), map_to_flatbuf(fbb, required_resources),
ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), num_returns,
map_to_flatbuf(fbb, required_resources),
map_to_flatbuf(fbb, required_placement_resources), language,
string_vec_to_flatbuf(fbb, function_descriptor));
fbb.Finish(spec);
Expand Down Expand Up @@ -167,12 +161,12 @@ int64_t TaskSpecification::NumArgs() const {

int64_t TaskSpecification::NumReturns() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return (message->returns()->size() / kUniqueIDSize);
return message->num_returns();
}

ObjectID TaskSpecification::ReturnId(int64_t return_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return ids_from_flatbuf<ObjectID>(*message->returns())[return_index];
return ObjectID::for_task_return(TaskId(), return_index + 1);
}

bool TaskSpecification::ArgByRef(int64_t arg_index) const {
Expand Down
51 changes: 51 additions & 0 deletions src/ray/raylet/task_test.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "gtest/gtest.h"

#include "ray/common/common_protocol.h"
#include "ray/raylet/task_spec.h"

namespace ray {
Expand Down Expand Up @@ -47,6 +48,56 @@ TEST(IdPropertyTest, TestIdProperty) {
ASSERT_TRUE(ObjectID::nil().is_nil());
}

TEST(TaskSpecTest, TaskInfoSize) {
std::vector<ObjectID> references = {ObjectID::from_random(), ObjectID::from_random()};
auto arguments_1 = std::make_shared<TaskArgumentByReference>(references);
std::string one_arg("This is an value argument.");
auto arguments_2 = std::make_shared<TaskArgumentByValue>(
reinterpret_cast<const uint8_t *>(one_arg.c_str()), one_arg.size());
std::vector<std::shared_ptr<TaskArgument>> task_arguments({arguments_1, arguments_2});
auto task_id = TaskID::from_random();
{
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
arguments.push_back(argument->ToFlatbuffer(fbb));
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ObjectID::nil()), 0, to_flatbuf(fbb, ActorID::nil()),
to_flatbuf(fbb, ActorHandleID::nil()), 0,
ids_to_flatbuf(fbb, std::vector<ObjectID>()), fbb.CreateVector(arguments), 1,
map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}), Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
fbb.Finish(spec);
RAY_LOG(ERROR) << "Ordinary task info size: " << fbb.GetSize();
}

{
flatbuffers::FlatBufferBuilder fbb;
std::vector<flatbuffers::Offset<Arg>> arguments;
for (auto &argument : task_arguments) {
arguments.push_back(argument->ToFlatbuffer(fbb));
}
// General task.
auto spec = CreateTaskInfo(
fbb, to_flatbuf(fbb, DriverID::from_random()), to_flatbuf(fbb, task_id),
to_flatbuf(fbb, TaskID::from_random()), 10,
to_flatbuf(fbb, ActorID::from_random()), to_flatbuf(fbb, ObjectID::from_random()),
10000000, to_flatbuf(fbb, ActorID::from_random()),
to_flatbuf(fbb, ActorHandleID::from_random()), 20,
ids_to_flatbuf(fbb, std::vector<ObjectID>(
{ObjectID::from_random(), ObjectID::from_random()})),
fbb.CreateVector(arguments), 2, map_to_flatbuf(fbb, {}), map_to_flatbuf(fbb, {}),
Language::PYTHON,
string_vec_to_flatbuf(fbb, {"PackageName", "ClassName", "FunctionName"}));
fbb.Finish(spec);
RAY_LOG(ERROR) << "Actor task info size: " << fbb.GetSize();
}
}

} // namespace raylet

} // namespace ray
Expand Down

0 comments on commit fa0892f

Please sign in to comment.