Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core worker] add task submission & execution interface #4922

Merged
merged 18 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ cc_library(
],
exclude = [
"src/ray/core_worker/*_test.cc",
"src/ray/core_worker/mock_worker.cc",
],
),
hdrs = glob([
Expand All @@ -127,7 +128,15 @@ cc_library(
],
)

# This test is run by src/ray/test/run_core_worker_tests.sh
cc_binary(
name = "mock_worker",
srcs = ["src/ray/core_worker/mock_worker.cc"],
copts = COPTS,
deps = [
":core_worker_lib",
],
)

cc_binary(
name = "core_worker_test",
srcs = ["src/ray/core_worker/core_worker_test.cc"],
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace ray {
enum class WorkerType { WORKER, DRIVER };

/// Language of Ray tasks and workers.
enum class Language { PYTHON, JAVA };
enum class WorkerLanguage { PYTHON, JAVA };

/// Information about a remote function.
struct RayFunction {
/// Language of the remote function.
const Language language;
const WorkerLanguage language;
/// Function descriptor of the remote function.
const std::vector<std::string> function_descriptor;
};
Expand Down
3 changes: 1 addition & 2 deletions src/ray/core_worker/context.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

#include "context.h"
#include "ray/core_worker/context.h"

namespace ray {

Expand All @@ -23,7 +23,6 @@ struct WorkerThreadContext {
void SetCurrentTask(const raylet::TaskSpecification &spec) {
SetCurrentTask(spec.TaskId());
}

private:
/// The task ID for current task.
TaskID current_task_id;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/context.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef RAY_CORE_WORKER_CONTEXT_H
#define RAY_CORE_WORKER_CONTEXT_H

#include "common.h"
#include "ray/core_worker/common.h"
#include "ray/raylet/task_spec.h"

namespace ray {
Expand Down
30 changes: 20 additions & 10 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
#include "core_worker.h"
#include "context.h"
#include "ray/core_worker/core_worker.h"
#include "ray/core_worker/context.h"

namespace ray {

CoreWorker::CoreWorker(const enum WorkerType worker_type, const enum Language language,
CoreWorker::CoreWorker(const enum WorkerType worker_type,
const enum WorkerLanguage language,
const std::string &store_socket, const std::string &raylet_socket,
DriverID driver_id)
: worker_type_(worker_type),
language_(language),
worker_context_(worker_type, driver_id),
store_socket_(store_socket),
raylet_socket_(raylet_socket),
is_initialized_(false),
task_interface_(*this),
object_interface_(*this),
task_execution_interface_(*this) {}
task_execution_interface_(*this) {
switch (language_) {
case ray::WorkerLanguage::JAVA:
task_language_ = ::Language::JAVA;
break;
case ray::WorkerLanguage::PYTHON:
task_language_ = ::Language::PYTHON;
break;
default:
RAY_LOG(FATAL) << "Unsupported worker language: " << static_cast<int>(language_);
break;
}
}

Status CoreWorker::Connect() {
// connect to plasma.
RAY_ARROW_RETURN_NOT_OK(store_client_.Connect(store_socket_));

// connect to raylet.
::Language lang = ::Language::PYTHON;
if (language_ == ray::Language::JAVA) {
lang = ::Language::JAVA;
}

// TODO: currently RayletClient would crash in its constructor if it cannot
// connect to Raylet after a number of retries, this needs to be changed
// so that the worker (java/python .etc) can retrieve and handle the error
// instead of crashing.
raylet_client_ = std::unique_ptr<RayletClient>(
new RayletClient(raylet_socket_, worker_context_.GetWorkerID(),
(worker_type_ == ray::WorkerType::WORKER),
worker_context_.GetCurrentDriverID(), lang));
worker_context_.GetCurrentDriverID(), task_language_));
is_initialized_ = true;
return Status::OK();
}

Expand Down
25 changes: 17 additions & 8 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#ifndef RAY_CORE_WORKER_CORE_WORKER_H
#define RAY_CORE_WORKER_CORE_WORKER_H

#include "common.h"
#include "context.h"
#include "object_interface.h"
#include "ray/common/buffer.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/object_interface.h"
#include "ray/core_worker/task_execution.h"
#include "ray/core_worker/task_interface.h"
#include "ray/raylet/raylet_client.h"
#include "task_execution.h"
#include "task_interface.h"

namespace ray {

Expand All @@ -20,7 +20,7 @@ class CoreWorker {
///
/// \param[in] worker_type Type of this worker.
/// \param[in] langauge Language of this worker.
CoreWorker(const WorkerType worker_type, const Language language,
CoreWorker(const WorkerType worker_type, const WorkerLanguage language,
const std::string &store_socket, const std::string &raylet_socket,
DriverID driver_id = DriverID::Nil());

Expand All @@ -31,7 +31,7 @@ class CoreWorker {
enum WorkerType WorkerType() const { return worker_type_; }

/// Language of this worker.
enum Language Language() const { return language_; }
enum WorkerLanguage Language() const { return language_; }

/// Return the `CoreWorkerTaskInterface` that contains the methods related to task
/// submisson.
Expand All @@ -50,7 +50,10 @@ class CoreWorker {
const enum WorkerType worker_type_;

/// Language of this worker.
const enum Language language_;
const enum WorkerLanguage language_;

/// Language of this worker as specified in flatbuf (used by task spec).
::Language task_language_;

/// Worker context per thread.
WorkerContext worker_context_;
Expand All @@ -64,9 +67,15 @@ class CoreWorker {
/// Plasma store client.
plasma::PlasmaClient store_client_;

/// Mutex to protect store_client_.
std::mutex store_client_mutex_;

/// Raylet client.
std::unique_ptr<RayletClient> raylet_client_;

/// Whether this worker has been initialized.
bool is_initialized_;

/// The `CoreWorkerTaskInterface` instance.
CoreWorkerTaskInterface task_interface_;

Expand Down
Loading