Skip to content

Commit

Permalink
Merge branch 'master' into redact-redis-pass
Browse files Browse the repository at this point in the history
  • Loading branch information
letaoj authored Feb 11, 2025
2 parents 685b2b9 + 5632a4b commit c0ecac0
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 108 deletions.
272 changes: 206 additions & 66 deletions BUILD.bazel

Large diffs are not rendered by default.

11 changes: 4 additions & 7 deletions python/ray/dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import ray
import ray._private.prometheus_exporter as prometheus_exporter
import ray._private.services
import ray._private.utils
import ray.dashboard.modules.reporter.reporter_consts as reporter_consts
import ray.dashboard.utils as dashboard_utils
from ray._private import utils
Expand Down Expand Up @@ -350,9 +349,7 @@ def __init__(self, dashboard_agent):
# psutil does not give a meaningful logical cpu count when in a K8s pod, or
# in a container in general.
# Use ray._private.utils for this instead.
logical_cpu_count = ray._private.utils.get_num_cpus(
override_docker_cpu_warning=True
)
logical_cpu_count = utils.get_num_cpus(override_docker_cpu_warning=True)
# (Override the docker warning to avoid dashboard log spam.)

# The dashboard expects a physical CPU count as well.
Expand Down Expand Up @@ -580,8 +577,8 @@ def _get_network_stats():

@staticmethod
def _get_mem_usage():
total = ray._private.utils.get_system_memory()
used = ray._private.utils.get_used_memory()
total = utils.get_system_memory()
used = utils.get_used_memory()
available = total - used
percent = round(used / total, 3) * 100
return total, available, percent, used
Expand All @@ -597,7 +594,7 @@ def _get_disk_usage():
root = psutil.disk_partitions()[0].mountpoint
else:
root = os.sep
tmp = ray._private.utils.get_user_temp_dir()
tmp = utils.get_user_temp_dir()
return {
"/": psutil.disk_usage(root),
tmp: psutil.disk_usage(tmp),
Expand Down
4 changes: 3 additions & 1 deletion python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ cdef extern from * namespace "ray::gcs" nogil:
"""
#include <thread>
#include "ray/gcs/gcs_server/store_client_kv.h"
#include "ray/gcs/redis_client.h"
#include "ray/gcs/store_client/redis_store_client.h"
namespace ray {
namespace gcs {
Expand Down Expand Up @@ -135,7 +137,7 @@ cdef extern from * namespace "ray::gcs" nogil:
c_string* data)


cdef extern from * namespace "ray::gcs" nogil:
cdef extern from "ray/gcs/store_client/redis_store_client.h" namespace "ray::gcs" nogil:
c_bool RedisDelKeyPrefixSync(const c_string& host,
c_int32_t port,
const c_string& username,
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_function_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

#pragma once

#include "absl/container/flat_hash_map.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/constants.h"
#include "ray/gcs/gcs_server/gcs_kv_manager.h"

Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_init_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include "absl/container/flat_hash_map.h"
#include "ray/common/asio/postable.h"
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
Expand Down
9 changes: 5 additions & 4 deletions src/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

#pragma once

#include <memory>
#include <string>

#include "absl/container/btree_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/gcs/redis_client.h"
#include "ray/gcs/store_client/redis_store_client.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/postable.h"
#include "ray/common/status.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"

namespace ray {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>

#include "ray/common/ray_config.h"
#include "ray/gcs/redis_client.h"

namespace ray {
namespace gcs {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_redis_failure_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/gcs/redis_client.h"

namespace ray {

namespace gcs {
class RedisGcsClient;

// Forward declaration.
class RedisClient;

/// GcsRedisFailureDetector is responsible for monitoring redis and binding GCS server and
/// redis life cycle together. GCS client subscribes to redis messages and it cannot sense
Expand Down
34 changes: 24 additions & 10 deletions src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,31 @@
#include <stdexcept>

#include "absl/strings/match.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/common/id.h"
#include "ray/common/ray_config.h"
#include "ray/common/status.h"

namespace ray {
namespace gcs {

GcsTaskManager::GcsTaskManager(instrumented_io_context &io_service)
: io_service_(io_service),
stats_counter_(),
task_event_storage_(std::make_unique<GcsTaskManagerStorage>(
RayConfig::instance().task_events_max_num_task_in_gcs(),
stats_counter_,
std::make_unique<FinishedTaskActorTaskGcPolicy>())),
periodical_runner_(PeriodicalRunner::Create(io_service_)) {
periodical_runner_->RunFnPeriodically([this] { task_event_storage_->GcJobSummary(); },
5 * 1000,
"GcsTaskManager.GcJobSummary");
}

std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents()
const {
std::vector<rpc::TaskEvents> ret;
ret.reserve(gc_policy_->MaxPriority());
// From the higher priority to the lower priority list.
for (int i = gc_policy_->MaxPriority() - 1; i >= 0; --i) {
// Reverse iterate the list to get the latest task events.
Expand Down Expand Up @@ -69,6 +84,7 @@ std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvent
std::vector<rpc::TaskEvents> GcsTaskManager::GcsTaskManagerStorage::GetTaskEvents(
const absl::flat_hash_set<std::shared_ptr<TaskEventLocator>> &task_locators) const {
std::vector<rpc::TaskEvents> result;
result.reserve(task_locators.size());
for (const auto &task_attempt_loc : task_locators) {
// Copy the task event to the output.
result.push_back(task_attempt_loc->GetTaskEventsMutable());
Expand Down Expand Up @@ -686,11 +702,10 @@ void GcsTaskManager::OnWorkerDead(
const WorkerID &worker_id, const std::shared_ptr<rpc::WorkerTableData> &worker_data) {
RAY_LOG(DEBUG) << "Marking all running tasks of worker " << worker_id << " as failed.";

std::shared_ptr<boost::asio::deadline_timer> timer =
std::make_shared<boost::asio::deadline_timer>(
io_service_,
boost::posix_time::milliseconds(
RayConfig::instance().gcs_mark_task_failed_on_worker_dead_delay_ms()));
auto timer = std::make_shared<boost::asio::deadline_timer>(
io_service_,
boost::posix_time::milliseconds(
RayConfig::instance().gcs_mark_task_failed_on_worker_dead_delay_ms()));

timer->async_wait(
[this, timer, worker_id, worker_data](const boost::system::error_code &error) {
Expand All @@ -705,11 +720,10 @@ void GcsTaskManager::OnWorkerDead(
}

void GcsTaskManager::OnJobFinished(const JobID &job_id, int64_t job_finish_time_ms) {
std::shared_ptr<boost::asio::deadline_timer> timer =
std::make_shared<boost::asio::deadline_timer>(
io_service_,
boost::posix_time::milliseconds(
RayConfig::instance().gcs_mark_task_failed_on_job_done_delay_ms()));
auto timer = std::make_shared<boost::asio::deadline_timer>(
io_service_,
boost::posix_time::milliseconds(
RayConfig::instance().gcs_mark_task_failed_on_job_done_delay_ms()));

timer->async_wait([this, timer, job_id, job_finish_time_ms](
const boost::system::error_code &error) {
Expand Down
19 changes: 5 additions & 14 deletions src/ray/gcs/gcs_server/gcs_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/gcs/gcs_server/usage_stats_client.h"
#include "ray/gcs/pb_util.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/util/counter_map.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {

// Forward declaration.
class PeriodicalRunner;

namespace gcs {

enum GcsTaskManagerCounter {
Expand Down Expand Up @@ -86,18 +88,7 @@ class FinishedTaskActorTaskGcPolicy : public TaskEventsGcPolicyInterface {
class GcsTaskManager : public rpc::TaskInfoHandler {
public:
/// Create a GcsTaskManager.
explicit GcsTaskManager(instrumented_io_context &io_service)
: io_service_(io_service),
stats_counter_(),
task_event_storage_(std::make_unique<GcsTaskManagerStorage>(
RayConfig::instance().task_events_max_num_task_in_gcs(),
stats_counter_,
std::make_unique<FinishedTaskActorTaskGcPolicy>())),
periodical_runner_(PeriodicalRunner::Create(io_service_)) {
periodical_runner_->RunFnPeriodically([this] { task_event_storage_->GcJobSummary(); },
5 * 1000,
"GcsTaskManager.GcJobSummary");
}
explicit GcsTaskManager(instrumented_io_context &io_service);

/// Handles a AddTaskEventData request.
///
Expand Down
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_server/store_client_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@
// limitations under the License.

#pragma once

#include <memory>
#include <optional>
#include <string>

#include "ray/common/asio/postable.h"
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
#include "ray/gcs/store_client/store_client.h"

Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/store_client/observable_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#pragma once

#include <memory>
#include <utility>

#include "ray/gcs/store_client/store_client.h"

namespace ray {
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/store_client/store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/gcs/callback.h"
#include "ray/util/logging.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {

Expand Down
9 changes: 9 additions & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ ray_cc_library(
],
)

ray_cc_library(
name = "scoped_env_setter",
hdrs = ["scoped_env_setter.h"],
srcs = ["scoped_env_setter.cc"],
deps = [
":util",
],
)

ray_cc_library(
name = "timestamp_utils",
hdrs = ["timestamp_utils.h"],
Expand Down
39 changes: 39 additions & 0 deletions src/ray/util/scoped_env_setter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/util/scoped_env_setter.h"

#include <cstdlib>

#include "ray/util/util.h"

namespace ray {

ScopedEnvSetter::ScopedEnvSetter(const char *env_name, const char *value)
: env_name_(env_name) {
const char *val = ::getenv(env_name);
if (val != nullptr) {
old_value_ = val;
}
setEnv(env_name, value);
}

ScopedEnvSetter::~ScopedEnvSetter() {
unsetEnv(env_name_.c_str());
if (old_value_.has_value()) {
setEnv(env_name_, *old_value_);
}
}

} // namespace ray
39 changes: 39 additions & 0 deletions src/ray/util/scoped_env_setter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Create a scoped environment variable, which is set env at construction and unset and
// recover to old value at destruction.

#pragma once

#include <cstring>
#include <optional>
#include <string>

namespace ray {

class ScopedEnvSetter {
public:
ScopedEnvSetter(const char *env_name, const char *value);
~ScopedEnvSetter();

ScopedEnvSetter(const ScopedEnvSetter &) = delete;
ScopedEnvSetter &operator=(const ScopedEnvSetter &) = delete;

private:
std::string env_name_;
std::optional<std::string> old_value_;
};

} // namespace ray
11 changes: 11 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,17 @@ ray_cc_test(
tags = ["team:core"],
)

ray_cc_test(
name = "scoped_env_setter_test",
srcs = ["scoped_env_setter_test.cc"],
deps = [
"//src/ray/util:scoped_env_setter",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = ["team:core"],
)

ray_cc_test(
name = "pipe_logger_test",
srcs = ["pipe_logger_test.cc"],
Expand Down
Loading

0 comments on commit c0ecac0

Please sign in to comment.