Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state] Adjust worker side reporting with batches && add debugstring #31840

Merged
merged 14 commits into from
Jan 28, 2023
Merged
10 changes: 8 additions & 2 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,15 @@ RAY_CONFIG(int64_t, task_events_report_interval_ms, 1000)
RAY_CONFIG(int64_t, task_events_max_num_task_in_gcs, 100000)

/// Max number of task events stored in the buffer on workers. Any additional events
/// will be dropped.
/// will be dropped. This is set to a large value to avoid worker side data loss.
/// For now, avg size of task event is 200Bytes, 1M task events would incur 200MiB
/// overhead.
/// Setting the value to -1 allows for unlimited task events buffered on workers.
RAY_CONFIG(int64_t, task_events_max_num_task_events_in_buffer, 10000)
RAY_CONFIG(int64_t, task_events_max_num_task_events_in_buffer, 1 * 1000 * 1000)

/// Max number of task events to be send in a single message to GCS. This caps both
/// the message size, and also the processing work on GCS.
RAY_CONFIG(int64_t, task_events_send_batch_size, 10 * 1000)

/// Max number of profile events allowed for a single task when sent to GCS.
/// NOTE: this limit only applies to the profile events per task in a single
Expand Down
5 changes: 4 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,10 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
periodical_runner_.RunFnPeriodically(
[this] {
RAY_LOG(INFO) << "Event stats:\n\n"
<< io_service_.stats().StatsString() << "\n\n";
<< io_service_.stats().StatsString() << "\n\n"
<< "-----------------\n"
<< "Task Event stats:\n"
<< task_event_buffer_->DebugString() << "\n";
},
event_stats_print_interval_ms);
}
Expand Down
143 changes: 94 additions & 49 deletions src/ray/core_worker/task_event_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,13 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
if (!enabled_) {
return;
}
std::vector<rpc::TaskEvents> task_events;
size_t num_status_task_events_dropped = 0;
size_t num_profile_task_events_dropped = 0;
std::vector<rpc::TaskEvents> to_send;

{
absl::MutexLock lock(&mutex_);

RAY_LOG_EVERY_MS(INFO, 15000)
<< "Pushed task state events to GCS. [total_bytes="
<< (1.0 * total_events_bytes_) / 1024 / 1024
<< "MiB][total_count=" << total_num_events_
<< "][total_status_task_events_dropped=" << num_status_task_events_dropped_
<< "][total_profile_task_events_dropped=" << num_profile_task_events_dropped_
<< "][cur_buffer_size=" << buffer_.size() << "].";

// Skip if GCS hasn't finished processing the previous message.
if (grpc_in_progress_ && !forced) {
RAY_LOG_EVERY_N_OR_DEBUG(WARNING, 100)
Expand All @@ -143,72 +136,85 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
return;
}

if (buffer_.size() == 0) {
// No data to send.
if (buffer_.empty() && send_buffer_.empty()) {
return;
}

task_events.reserve(
RayConfig::instance().task_events_max_num_task_events_in_buffer());
buffer_.swap(task_events);
next_idx_to_overwrite_ = 0;

num_profile_task_events_dropped = num_profile_task_events_dropped_;
num_profile_task_events_dropped_ = 0;
// Have new data and send buffer empty, add new data to the send buffer.
if (send_buffer_.empty() && !buffer_.empty()) {
buffer_.swap(send_buffer_);
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
next_idx_to_overwrite_ = 0;

num_status_task_events_dropped = num_status_task_events_dropped_;
num_status_task_events_dropped_ = 0;
}
num_profile_task_events_dropped = num_profile_task_events_dropped_;
num_profile_task_events_dropped_ = 0;

// Merge multiple events from a single task attempt run into one task event.
absl::flat_hash_map<std::pair<std::string, int>, rpc::TaskEvents> task_events_map;
num_status_task_events_dropped = num_status_task_events_dropped_;
num_status_task_events_dropped_ = 0;
}

size_t num_profile_event_to_send = 0;
size_t num_status_event_to_send = 0;
for (auto event : task_events) {
if (event.has_profile_events()) {
num_profile_event_to_send++;
// Take one single batch to send.
if (static_cast<int64_t>(send_buffer_.size()) >
RayConfig::instance().task_events_send_batch_size()) {
size_t batch_size = RayConfig::instance().task_events_send_batch_size();
auto move_start = std::prev(send_buffer_.end(), batch_size);
to_send.insert(to_send.end(),
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
std::make_move_iterator(move_start),
std::make_move_iterator(send_buffer_.end()));
send_buffer_.erase(move_start, send_buffer_.end());
} else {
num_status_event_to_send++;
// Move all, so just swap.
to_send.swap(send_buffer_);
}

if (to_send.empty()) {
return;
}
auto &task_events_itr =
task_events_map[std::make_pair(event.task_id(), event.attempt_number())];
task_events_itr.MergeFrom(event);
}

// Convert to rpc::TaskEventsData
auto data = std::make_unique<rpc::TaskEventData>();
data->set_num_profile_task_events_dropped(num_profile_task_events_dropped);
data->set_num_status_task_events_dropped(num_status_task_events_dropped);

auto num_task_events = task_events_map.size();
for (auto itr : task_events_map) {
size_t num_task_events = to_send.size();
size_t num_profile_event_to_send = 0;
size_t num_status_event_to_send = 0;
for (auto &task_event : to_send) {
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
auto events_by_task = data->add_events_by_task();
events_by_task->Swap(&itr.second);
if (task_event.has_profile_events()) {
num_profile_event_to_send++;
} else {
num_status_event_to_send++;
}
events_by_task->Swap(&task_event);
}

gcs::TaskInfoAccessor *task_accessor;
{
// Sending the protobuf to GCS.
absl::MutexLock lock(&mutex_);
// Some debug tracking.
total_num_events_ += num_task_events;
total_events_bytes_ += data->ByteSizeLong();

auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
} else {
RAY_LOG(DEBUG) << "Push " << num_task_events << " task state events to GCS.";
}
grpc_in_progress_ = false;
};

// The flag should be unset when on_complete is invoked.
grpc_in_progress_ = true;
auto status =
gcs_client_->Tasks().AsyncAddTaskEventData(std::move(data), on_complete);
task_accessor = &gcs_client_->Tasks();
}

auto on_complete = [this, num_task_events](const Status &status) {
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to push " << num_task_events
<< " task state events to GCS. Data will be lost. [status="
<< status.ToString() << "]";
}
grpc_in_progress_ = false;
};

auto status = task_accessor->AsyncAddTaskEventData(std::move(data), on_complete);
{
absl::MutexLock lock(&mutex_);
if (!status.ok()) {
// If we couldn't even send the data by invoking client side callbacks, there's
// something seriously wrong, and losing data in this case should not be too
Expand All @@ -225,6 +231,45 @@ void TaskEventBufferImpl::FlushEvents(bool forced) {
}
}

const std::string TaskEventBufferImpl::DebugString() {
std::stringstream ss;

if (!Enabled()) {
ss << "Task Event Buffer is disabled.";
return ss.str();
}

bool grpc_in_progress;
size_t num_status_task_events_dropped, num_profile_task_events_dropped,
send_buffer_size, data_buffer_size;
uint64_t total_events_bytes, total_num_events;

{
absl::MutexLock lock(&mutex_);
grpc_in_progress = grpc_in_progress_;
num_status_task_events_dropped = num_status_task_events_dropped_;
num_profile_task_events_dropped = num_profile_task_events_dropped_;
total_events_bytes = total_events_bytes_;
total_num_events = total_num_events_;
send_buffer_size = send_buffer_.size();
data_buffer_size = buffer_.size();
}

ss << "\nIO Service Stats:\n";
ss << io_service_.stats().StatsString();
ss << "\nOther Stats:"
<< "\n\tgrpc_in_progress:" << grpc_in_progress
<< "\n\tcurrent number of task events in buffer: " << data_buffer_size
<< "\n\tnumber of task events to be sent in batches: " << send_buffer_size
<< "\n\ttotal task events sent: " << 1.0 * total_events_bytes / 1024 / 1024 << " MiB"
<< "\n\ttotal number of task events sent: " << total_num_events
<< "\n\tnum status task events dropped: " << num_status_task_events_dropped
<< "\n\tnum profile task events dropped: " << num_profile_task_events_dropped
<< "\n";

return ss.str();
}

} // namespace worker

} // namespace core
Expand Down
9 changes: 9 additions & 0 deletions src/ray/core_worker/task_event_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class TaskEventBuffer {
///
/// The TaskEventBuffer will be disabled if Start() returns not ok.
virtual bool Enabled() const = 0;

/// Return a string that describes the task event buffer stats.
virtual const std::string DebugString() = 0;
};

/// Implementation of TaskEventBuffer.
Expand All @@ -117,6 +120,8 @@ class TaskEventBufferImpl : public TaskEventBuffer {

bool Enabled() const override;

const std::string DebugString() LOCKS_EXCLUDED(mutex_) override;

private:
/// Test only functions.
std::vector<rpc::TaskEvents> GetAllTaskEvents() LOCKS_EXCLUDED(mutex_) {
Expand Down Expand Up @@ -181,13 +186,17 @@ class TaskEventBufferImpl : public TaskEventBuffer {
/// process them quick enough.
bool grpc_in_progress_ GUARDED_BY(mutex_) = false;

/// A buffer to store task events to be sent to GCS in batches.
std::vector<rpc::TaskEvents> send_buffer_ GUARDED_BY(mutex_);

/// Debug stats: total number of bytes of task events sent so far to GCS.
uint64_t total_events_bytes_ GUARDED_BY(mutex_) = 0;

/// Debug stats: total number of task events sent so far to GCS.
uint64_t total_num_events_ GUARDED_BY(mutex_) = 0;

FRIEND_TEST(TaskEventBufferTestManualStart, TestGcsClientFail);
FRIEND_TEST(TaskEventBufferTestBatchSend, TestBatchedSend);
FRIEND_TEST(TaskEventBufferTest, TestAddEvent);
FRIEND_TEST(TaskEventBufferTest, TestFlushEvents);
FRIEND_TEST(TaskEventBufferTest, TestFailedFlush);
Expand Down
49 changes: 48 additions & 1 deletion src/ray/core_worker/test/task_event_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class TaskEventBufferTest : public ::testing::Test {
R"(
{
"task_events_report_interval_ms": 1000,
"task_events_max_num_task_events_in_buffer": 100
"task_events_max_num_task_events_in_buffer": 100,
"task_events_send_batch_size": 100
}
)");

Expand Down Expand Up @@ -91,6 +92,20 @@ class TaskEventBufferTestManualStart : public TaskEventBufferTest {
void SetUp() override {}
};

class TaskEventBufferTestBatchSend : public TaskEventBufferTest {
public:
TaskEventBufferTestBatchSend() : TaskEventBufferTest() {
RayConfig::instance().initialize(
R"(
{
"task_events_report_interval_ms": 1000,
"task_events_max_num_task_events_in_buffer": 100,
"task_events_send_batch_size": 10
}
)");
}
};

TEST_F(TaskEventBufferTestManualStart, TestGcsClientFail) {
ASSERT_NE(task_event_buffer_, nullptr);

Expand Down Expand Up @@ -270,6 +285,38 @@ TEST_F(TaskEventBufferTest, TestForcedFlush) {
task_event_buffer_->FlushEvents(true);
}

TEST_F(TaskEventBufferTestBatchSend, TestBatchedSend) {
size_t num_events = 100;
size_t batch_size = 10; // Sync with constructor.
// Adding some events
for (size_t i = 0; i < num_events; ++i) {
auto task_id = RandomTaskId();
task_event_buffer_->AddTaskEvent(GenStatusTaskEvents(task_id, 0));
}

auto task_gcs_accessor =
static_cast<ray::gcs::MockGcsClient *>(task_event_buffer_->GetGcsClient())
->mock_task_accessor;

// With batch size = 10, there should be 10 flush calls
EXPECT_CALL(*task_gcs_accessor, AsyncAddTaskEventData)
.Times(10)
.WillRepeatedly([&](std::unique_ptr<rpc::TaskEventData> actual_data,
ray::gcs::StatusCallback callback) {
callback(Status::OK());
return Status::OK();
});

for (int i = 0; i * batch_size < num_events; i++) {
task_event_buffer_->FlushEvents(false);
rickyyx marked this conversation as resolved.
Show resolved Hide resolved
EXPECT_EQ(task_event_buffer_->send_buffer_.size(), num_events - (i + 1) * batch_size);
}

// With last flush, there should be no more events in the buffer and as data.
EXPECT_EQ(task_event_buffer_->send_buffer_.size(), 0);
EXPECT_EQ(task_event_buffer_->buffer_.size(), 0);
}

TEST_F(TaskEventBufferTest, TestBufferSizeLimit) {
size_t num_limit = 100; // Synced with test setup
size_t num_profile = 50;
Expand Down