Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] Task events backend - worker task event buffer implementation [1/n] #30867

Merged
merged 15 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,20 @@ cc_test(
tags = ["team:core"],
deps = [
":core_worker_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "task_event_buffer_test",
size = "small",
srcs = ["src/ray/core_worker/test/task_event_buffer_test.cc"],
copts = COPTS,
tags = ["team:core"],
deps = [
":core_worker_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
14 changes: 14 additions & 0 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ class MockErrorInfoAccessor : public ErrorInfoAccessor {
namespace ray {
namespace gcs {

class MockTaskInfoAccessor : public TaskInfoAccessor {
public:
MOCK_METHOD(Status,
AsyncAddTaskEventData,
(std::unique_ptr<rpc::TaskEventData> data_ptr, StatusCallback callback),
(override));
};

} // namespace gcs
} // namespace ray

namespace ray {
namespace gcs {

class MockStatsInfoAccessor : public StatsInfoAccessor {
public:
MOCK_METHOD(Status,
Expand Down
3 changes: 3 additions & 0 deletions src/mock/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MockGcsClient : public GcsClient {
mock_worker_accessor = new MockWorkerInfoAccessor();
mock_placement_group_accessor = new MockPlacementGroupInfoAccessor();
mock_internal_kv_accessor = new MockInternalKVAccessor();
mock_task_accessor = new MockTaskInfoAccessor();

GcsClient::job_accessor_.reset(mock_job_accessor);
GcsClient::actor_accessor_.reset(mock_actor_accessor);
Expand All @@ -55,6 +56,7 @@ class MockGcsClient : public GcsClient {
GcsClient::error_accessor_.reset(mock_error_accessor);
GcsClient::worker_accessor_.reset(mock_worker_accessor);
GcsClient::placement_group_accessor_.reset(mock_placement_group_accessor);
GcsClient::task_accessor_.reset(mock_task_accessor);
}
MockActorInfoAccessor *mock_actor_accessor;
MockJobInfoAccessor *mock_job_accessor;
Expand All @@ -65,6 +67,7 @@ class MockGcsClient : public GcsClient {
MockWorkerInfoAccessor *mock_worker_accessor;
MockPlacementGroupInfoAccessor *mock_placement_group_accessor;
MockInternalKVAccessor *mock_internal_kv_accessor;
MockTaskInfoAccessor *mock_task_accessor;
};

} // namespace gcs
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static inline rpc::ObjectReference GetReferenceForActorDummyObject(
class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
public:
/// Construct an empty task specification. This should not be used directly.
TaskSpecification() {}
TaskSpecification() { ComputeResources(); }
rickyyx marked this conversation as resolved.
Show resolved Hide resolved

/// Construct from a protobuf message object.
/// The input message will be copied/moved into this object.
Expand Down
31 changes: 27 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
profiler_ = std::make_shared<worker::Profiler>(
worker_context_, options_.node_ip_address, io_service_, gcs_client_);

// Initialize the task state event buffer.
auto task_event_gcs_client = std::make_unique<gcs::GcsClient>(options_.gcs_options);
task_event_buffer_ =
std::make_unique<worker::TaskEventBufferImpl>(std::move(task_event_gcs_client));
if (RayConfig::instance().task_events_report_interval_ms() > 0) {
if (!task_event_buffer_->Start().ok()) {
RAY_CHECK(!task_event_buffer_->Enabled()) << "TaskEventBuffer should be disabled.";
}
}

core_worker_client_pool_ =
std::make_shared<rpc::CoreWorkerClientPool>(*client_call_manager_);

Expand Down Expand Up @@ -342,7 +352,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
}
},
push_error_callback,
RayConfig::instance().max_lineage_bytes()));
RayConfig::instance().max_lineage_bytes(),
*task_event_buffer_.get()));

// Create an entry for the driver task in the task table. This task is
// added immediately with status RUNNING. This allows us to push errors
Expand Down Expand Up @@ -597,6 +608,8 @@ void CoreWorker::Shutdown() {
options_.on_worker_shutdown(GetWorkerID());
}

task_event_buffer_->Stop();

if (gcs_client_) {
// We should disconnect gcs client first otherwise because it contains
// a blocking logic that can block the io service upon
Expand Down Expand Up @@ -633,6 +646,9 @@ void CoreWorker::Disconnect(
// Force stats export before exiting the worker.
RecordMetrics();

// Force task state events push before exiting the worker.
task_event_buffer_->FlushEvents(/* forced */ true);

opencensus::stats::StatsExporter::ExportNow();
if (connected_) {
RAY_LOG(INFO) << "Disconnecting to the raylet.";
Expand Down Expand Up @@ -1147,7 +1163,6 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids,
std::vector<std::shared_ptr<RayObject>> *results) {
ScopedTaskMetricSetter state(
worker_context_, task_counter_, rpc::TaskStatus::RUNNING_IN_RAY_GET);

results->resize(ids.size(), nullptr);

absl::flat_hash_set<ObjectID> plasma_object_ids;
Expand Down Expand Up @@ -2250,9 +2265,17 @@ Status CoreWorker::ExecuteTask(
std::string func_name = task_spec.FunctionDescriptor()->CallString();
if (!options_.is_local_mode) {
task_counter_.MovePendingToRunning(func_name, task_spec.IsRetry());
}

if (!options_.is_local_mode) {
// Make task event
if (task_event_buffer_->Enabled()) {
rpc::TaskEvents task_event;
task_event.set_task_id(task_spec.TaskId().Binary());
task_event.set_attempt_number(task_spec.AttemptNumber());
auto state_updates = task_event.mutable_state_updates();
state_updates->set_running_ts(absl::GetCurrentTimeNanos());
task_event_buffer_->AddTaskEvent(std::move(task_event));
}

worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId(), task_spec.AttemptNumber(), task_spec.GetName());
}
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/store_provider/plasma_store_provider.h"
#include "ray/core_worker/task_event_buffer.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/gcs/gcs_client/gcs_client.h"
Expand Down Expand Up @@ -1501,6 +1502,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// the checking and increasing of backpressure pending calls counter
/// is not atomic, which may lead to under counting or over counting.
absl::Mutex actor_task_mutex_;

/// A shared pointer between various components that emitting task state events.
/// e.g. CoreWorker, TaskManager.
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer_ = nullptr;
};

} // namespace core
Expand Down
209 changes: 209 additions & 0 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/core_worker/task_event_buffer.h"

namespace ray {
namespace core {

namespace worker {

TaskEventBufferImpl::TaskEventBufferImpl(std::unique_ptr<gcs::GcsClient> gcs_client)
: work_guard_(boost::asio::make_work_guard(io_service_)),
periodical_runner_(io_service_),
gcs_client_(std::move(gcs_client)) {}

Status TaskEventBufferImpl::Start(bool auto_flush) {
absl::MutexLock lock(&mutex_);
auto report_interval_ms = RayConfig::instance().task_events_report_interval_ms();
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
RAY_CHECK(report_interval_ms > 0)
<< "RAY_task_events_report_interval_ms should be > 0 to use TaskEventBuffer.";

buffer_.reserve(RayConfig::instance().task_events_max_num_task_events_in_buffer());

// Reporting to GCS, set up gcs client and and events flushing.
auto status = gcs_client_->Connect(io_service_);
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to connect to GCS, TaskEventBuffer will stop now. [status="
<< status.ToString() << "].";

enabled_ = false;
return status;
}

enabled_ = true;

io_thread_ = std::thread([this]() {
#ifndef _WIN32
// Block SIGINT and SIGTERM so they will be handled by the main thread.
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &mask, NULL);
#endif
SetThreadName("task_event_buffer.io");
io_service_.run();
RAY_LOG(INFO) << "Task event buffer io service stopped.";
});

if (!auto_flush) {
return Status::OK();
}

RAY_LOG(INFO) << "Reporting task events to GCS every " << report_interval_ms << "ms.";
periodical_runner_.RunFnPeriodically([this] { FlushEvents(/* forced */ false); },
report_interval_ms,
"CoreWorker.deadline_timer.flush_task_events");
return Status::OK();
}

void TaskEventBufferImpl::Stop() {
if (!enabled_) {
return;
}
RAY_LOG(INFO) << "Shutting down TaskEventBuffer.";

// Shutting down the io service to exit the io_thread. This should prevent
// any other callbacks to be run on the io thread.
io_service_.stop();
if (io_thread_.joinable()) {
RAY_LOG(DEBUG) << "Joining io thread from TaskEventBuffer";
io_thread_.join();
}

{
absl::MutexLock lock(&mutex_);
// It's now safe to disconnect the GCS client since it will not be used by any
// callbacks.
gcs_client_->Disconnect();
}
}

bool TaskEventBufferImpl::Enabled() const { return enabled_; }

void TaskEventBufferImpl::AddTaskEvent(rpc::TaskEvents task_events) {
if (!enabled_) {
return;
}
absl::MutexLock lock(&mutex_);

auto limit = RayConfig::instance().task_events_max_num_task_events_in_buffer();
if (limit > 0 && buffer_.size() >= static_cast<size_t>(limit)) {
// Too many task events, start overriding older ones.
buffer_[next_idx_to_overwrite_] = std::move(task_events);
next_idx_to_overwrite_ = (next_idx_to_overwrite_ + 1) % limit;
num_task_events_dropped_++;
return;
}
buffer_.push_back(std::move(task_events));
}

void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
}
std::vector<rpc::TaskEvents> task_events;
size_t num_task_events_dropped = 0;
{
absl::MutexLock lock(&mutex_);

RAY_LOG_EVERY_MS(INFO, 15000)
<< "Pushed task state events to GCS. [total_bytes="
<< (1.0 * total_events_bytes_) / 1024 / 1024
<< "MiB][total_count=" << total_num_events_
<< "][total_task_events_dropped=" << num_task_events_dropped_
<< "][cur_buffer_size=" << buffer_.size() << "].";

// Skip if GCS hasn't finished processing the previous message.
if (grpc_in_progress_ && !forced) {
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
RAY_LOG_EVERY_N_OR_DEBUG(WARNING, 100)
<< "GCS hasn't replied to the previous flush events call (likely "
"overloaded). "
"Skipping reporting task state events and retry later."
<< "[cur_buffer_size=" << buffer_.size() << "].";
return;
}

if (buffer_.size() == 0) {
return;
}

task_events.reserve(
RayConfig::instance().task_events_max_num_task_events_in_buffer());
buffer_.swap(task_events);
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
next_idx_to_overwrite_ = 0;
num_task_events_dropped = num_task_events_dropped_;
num_task_events_dropped_ = 0;
}

// Merge multiple events from a single task attempt run into one task event.
absl::flat_hash_map<std::pair<std::string, int>, rpc::TaskEvents> task_events_map;
for (auto event : task_events) {
auto &task_events_itr =
task_events_map[std::make_pair(event.task_id(), event.attempt_number())];
task_events_itr.MergeFrom(event);
}

// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_task_events_dropped(num_task_events_dropped);
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
auto num_task_events = task_events_map.size();
for (auto itr : task_events_map) {
auto events_by_task = data->add_events_by_task();
events_by_task->Swap(&itr.second);
}

{
// Sending the protobuf to GCS.
absl::MutexLock lock(&mutex_);
// Some debug tracking.
total_num_events_ += num_task_events;
total_events_bytes_ += data->ByteSizeLong();

auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
} else {
RAY_LOG(DEBUG) << "Push " << num_task_events << " task state events to GCS.";
}
grpc_in_progress_ = false;
};

// The flag should be unset when on_complete is invoked.
grpc_in_progress_ = true;
auto status =
gcs_client_->Tasks().AsyncAddTaskEventData(std::move(data), on_complete);
if (!status.ok()) {
// If we couldn't even send the data by invoking client side callbacks, there's
// something seriously wrong, and losing data in this case should not be too
// worse. So we will silently drop these task events.
RAY_LOG(WARNING)
<< "Failed to push task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
grpc_in_progress_ = false;

// Fail to send, currently dropping events.
num_task_events_dropped_ += num_task_events;
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

} // namespace worker

} // namespace core
} // namespace ray
Loading