From b31e886b7b1cc1e224d5879230d84485d638012b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 10 Feb 2020 16:57:02 -0800 Subject: [PATCH 1/6] Prototype exporter completed --- .../prometheus/prometheus_export_server.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py diff --git a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py new file mode 100644 index 000000000000..045438236aa8 --- /dev/null +++ b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py @@ -0,0 +1,36 @@ +from prometheus_client import start_http_server, Gauge +import random +import time +import json +import redis +import ray +from ray.dashboard.closed_source.ingest_server import NODE_INFO_CHANNEL, RAY_INFO_CHANNEL + +if __name__ == '__main__': + redis_client = redis.StrictRedis(host="127.0.0.1", port=6379) + p = redis_client.pubsub(ignore_subscribe_messages=True) + p.psubscribe(RAY_INFO_CHANNEL) + p.psubscribe(NODE_INFO_CHANNEL) + g = Gauge("mamam", "CPU usage of pid", labelnames=("pid", "host", "ip")) + # Start up the server to expose the metrics. + start_http_server(8000) + # Generate some requests. + for x in p.listen(): + data = x["data"] + channel = ray.utils.decode(x["channel"]) + data = json.loads(ray.utils.decode(data)) + # print(channel) + # print(data) + + if channel == NODE_INFO_CHANNEL: + clients = data['clients'] + for client in clients: + print(client) + host = client['hostname'] + ip = client['ip'] + workers = client['workers'] + + for worker in workers: + pid = worker['pid'] + cpu_usage = worker['cpu_percent'] + g.labels(pid=pid, host=host, ip=ip).set(cpu_usage) From bb2da83d8323954bf33451af9714ab6fd40a052d Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 12 Feb 2020 11:23:31 -0800 Subject: [PATCH 2/6] Implemented end to end flow --- .../dashboard/closed_source/auth_server.py | 21 ++++++++++ .../closed_source/hosted_dashboard_main.py | 1 - .../prometheus/prometheus_export_server.py | 39 ++++++++++++----- python/ray/dashboard/dashboard.py | 25 +++++++---- python/ray/dashboard/dashboard_main.py | 11 +++-- .../hosted_dashboard/dashboard_client.py | 42 +++++++++++++++---- .../dashboard/hosted_dashboard/exporter.py | 3 +- python/ray/node.py | 11 ++--- python/ray/parameter.py | 3 ++ python/ray/services.py | 29 +++++++++++-- python/ray/worker.py | 3 ++ 11 files changed, 149 insertions(+), 39 deletions(-) create mode 100644 python/ray/dashboard/closed_source/auth_server.py diff --git a/python/ray/dashboard/closed_source/auth_server.py b/python/ray/dashboard/closed_source/auth_server.py new file mode 100644 index 000000000000..14b6a3794a3e --- /dev/null +++ b/python/ray/dashboard/closed_source/auth_server.py @@ -0,0 +1,21 @@ +from aiohttp import web + +async def handle(request): + print(request.match_info) + text = {'good': 'bad'} + return web.json_response(text) + +async def return_ingest_server_url(request): + result = { + 'ingestor_url': 'localhost:50051', + 'access_token': '1234' + } + return web.json_response(result) + +app = web.Application() +app.add_routes([web.get('/', handle), + web.get('/auth', return_ingest_server_url)]) + +if __name__ == '__main__': + host, port = '127.0.0.1', 8080 + web.run_app(app, host=host, port=port, shutdown_timeout=5.0) \ No newline at end of file diff --git a/python/ray/dashboard/closed_source/hosted_dashboard_main.py b/python/ray/dashboard/closed_source/hosted_dashboard_main.py index 216438168c70..7b5c6c1ba83d 100644 --- a/python/ray/dashboard/closed_source/hosted_dashboard_main.py +++ b/python/ray/dashboard/closed_source/hosted_dashboard_main.py @@ -111,7 +111,6 @@ args.redis_address, args.temp_dir, redis_password=args.redis_password, - hosted_dashboard_client=False, DashboardController=HostedDashboardController) dashboard.run() except Exception as e: diff --git a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py index 045438236aa8..5d259a3129f6 100644 --- a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py +++ b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py @@ -1,31 +1,46 @@ -from prometheus_client import start_http_server, Gauge -import random -import time +import argparse import json +import logging +import random import redis +import time + +# TODO(sang): Add this module to requirements +from prometheus_client import start_http_server, Gauge + import ray -from ray.dashboard.closed_source.ingest_server import NODE_INFO_CHANNEL, RAY_INFO_CHANNEL + +from ray.dashboard.closed_source.ingest_server \ + import NODE_INFO_CHANNEL, RAY_INFO_CHANNEL + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + if __name__ == '__main__': redis_client = redis.StrictRedis(host="127.0.0.1", port=6379) p = redis_client.pubsub(ignore_subscribe_messages=True) p.psubscribe(RAY_INFO_CHANNEL) p.psubscribe(NODE_INFO_CHANNEL) - g = Gauge("mamam", "CPU usage of pid", labelnames=("pid", "host", "ip")) + metrics_cpu_usage = Gauge("ray_metrics_cpu_usage", "CPU usage of pid residing in a host.", labelnames=("pid", "host", "ip")) + metrics_mem_usage = Gauge("ray_metrics_mem_usage", "Memory usage of pid residing in a host.", labelnames=("pid", "host", "ip")) + # Start up the server to expose the metrics. - start_http_server(8000) - # Generate some requests. + host, port = '127.0.0.1', 8000 + logger.info('Server listening on port {} at address {}'.format(host, port)) + start_http_server(port) + + # Read data from Redis. for x in p.listen(): data = x["data"] channel = ray.utils.decode(x["channel"]) data = json.loads(ray.utils.decode(data)) - # print(channel) - # print(data) + print(channel) + print(data) if channel == NODE_INFO_CHANNEL: clients = data['clients'] for client in clients: - print(client) host = client['hostname'] ip = client['ip'] workers = client['workers'] @@ -33,4 +48,6 @@ for worker in workers: pid = worker['pid'] cpu_usage = worker['cpu_percent'] - g.labels(pid=pid, host=host, ip=ip).set(cpu_usage) + mem_usage = worker['memory_info']['rss'] + metrics_cpu_usage.labels(pid=pid, host=host, ip=ip).set(cpu_usage) + metrics_mem_usage.labels(pid=pid, host=host, ip=ip).set(mem_usage) diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 4ace9e1773d7..21ea6ffb2ea8 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -96,6 +96,10 @@ def b64_decode(reply): return b64decode(reply).decode("utf-8") +def get_host_and_port(addr): + return addr.strip().split(':') + + class DashboardController(BaseDashboardController): """Perform data fetching and other actions required by HTTP endpoints.""" @@ -213,8 +217,7 @@ class Dashboard(object): temp_dir (str): The temporary directory used for log files and information for this Ray session. redis_passord(str): Redis password to access GCS - hosted_dashboard_client(bool): True if a server runs as a - hosted dashboard client mode. + hosted_dashboard_addr(str): The address users host their dashboard. update_frequency(float): Frequency where metrics are updated. DashboardController(DashboardController): DashboardController that defines the business logic of a Dashboard server. @@ -226,7 +229,7 @@ def __init__(self, redis_address, temp_dir, redis_password=None, - hosted_dashboard_client=False, + hosted_dashboard_addr=None, update_frequency=1.0, DashboardController=DashboardController): self.host = host @@ -240,10 +243,16 @@ def __init__(self, if Analysis is not None: self.tune_stats = TuneCollector(DEFAULT_RESULTS_DIR, 2.0) - self.hosted_dashboard_client = hosted_dashboard_client + self.hosted_dashboard_addr = hosted_dashboard_addr self.dashboard_client = None - if self.hosted_dashboard_client: - self.dashboard_client = DashboardClient(self.dashboard_controller) + + if self.hosted_dashboard_addr: + hosted_dashboard_host, hosted_dashboard_port = get_host_and_port( + self.hosted_dashboard_addr) + self.dashboard_client = DashboardClient( + hosted_dashboard_host, + hosted_dashboard_port, + self.dashboard_controller) # Setting the environment variable RAY_DASHBOARD_DEV=1 disables some # security checks in the dashboard server to ease development while @@ -381,7 +390,7 @@ async def errors(req) -> aiohttp.web.Response: result = self.dashboard_controller.get_errors(hostname, pid) return await json_response(result=result) - if not self.hosted_dashboard_client: + if self.hosted_dashboard_addr is None: # Hosted dashboard mode won't use local dashboard frontend. self.app.router.add_get("/", get_index) self.app.router.add_get("/favicon.ico", get_favicon) @@ -428,7 +437,7 @@ def log_dashboard_url(self): def run(self): self.log_dashboard_url() self.dashboard_controller.start_collecting_metrics() - if self.hosted_dashboard_client: + if self.hosted_dashboard_addr: self.dashboard_client.start_exporting_metrics() if Analysis is not None: self.tune_stats.start() diff --git a/python/ray/dashboard/dashboard_main.py b/python/ray/dashboard/dashboard_main.py index fb3f54c71eb7..9a269789b331 100644 --- a/python/ray/dashboard/dashboard_main.py +++ b/python/ray/dashboard/dashboard_main.py @@ -52,6 +52,13 @@ type=str, default=None, help="Specify the path of the temporary directory use by Ray process.") + parser.add_argument( + "--hosted-dashboard-addr", + required=False, + type=str, + default=None, + help="Specify the address where user dashboard will be hosted." + ) args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) @@ -61,9 +68,7 @@ args.port, args.redis_address, args.temp_dir, - # TODO(sang): Make this value configurable - # through Ray API - hosted_dashboard_client=False, + hosted_dashboard_addr=args.hosted_dashboard_addr, redis_password=args.redis_password) dashboard.run() except Exception as e: diff --git a/python/ray/dashboard/hosted_dashboard/dashboard_client.py b/python/ray/dashboard/hosted_dashboard/dashboard_client.py index 1949656bdc48..25242183096e 100644 --- a/python/ray/dashboard/hosted_dashboard/dashboard_client.py +++ b/python/ray/dashboard/hosted_dashboard/dashboard_client.py @@ -1,8 +1,15 @@ -from ray.dashboard.hosted_dashboard.exporter import Exporter +import requests +from ray.dashboard.hosted_dashboard.exporter import Exporter class DashboardClient: - """Managing communication to hosted dashboard. + """Managing the communication to hosted dashboard. + + Args: + dashboard_controller (BaseDashboardController): Dashboard + controller that is used to export metrics. + hosted_dashboard_addr (str): The address users host their + dashboard. Attributes: ingestor_url(str): Address that metrics will be exported. @@ -10,12 +17,33 @@ class DashboardClient: metrics to the external services. """ - def __init__(self, dashboard_controller): - # TODO(sang): Remove hard coded ingestor url. - self.ingestor_url = "127.0.0.1:50051" - self.exporter = Exporter(self.ingestor_url, dashboard_controller) + def __init__(self, host, port, dashboard_controller): + self.auth_url = "http://{}:{}/auth".format(host, port) + self.timeout = 5.0 + + self.auth_info = self._connect() + self.exporter = Exporter( + self.auth_info.get("ingestor_url"), + self.auth_info.get("access_token"), + dashboard_controller) + + def _authorize(self): + resp = requests.get(self.auth_url, timeout=self.timeout) + status = resp.status_code + json_response = resp.json() + return status, json_response["ingestor_url"], json_response["access_token"] + + def _connect(self): + status, ingestor_url, access_token = self._authorize() + if status != 200: + raise ConnectionError("Failed to authorize to hosted dashbaord server.") + + auth_info = { + "ingestor_url": ingestor_url, + "access_token": access_token + } + return auth_info def start_exporting_metrics(self): """Run an exporter thread to export metrics""" - # TODO(sang): Add a health check. self.exporter.start() diff --git a/python/ray/dashboard/hosted_dashboard/exporter.py b/python/ray/dashboard/hosted_dashboard/exporter.py index 6492169cb1c8..f3302ab54b92 100644 --- a/python/ray/dashboard/hosted_dashboard/exporter.py +++ b/python/ray/dashboard/hosted_dashboard/exporter.py @@ -7,12 +7,12 @@ from ray.core.generated import dashboard_pb2 from ray.core.generated import dashboard_pb2_grpc - class Exporter(threading.Thread): """Thread that keeps running and export metrics""" def __init__(self, export_address, + access_token, dashboard_controller, update_frequency=1.0): self.dashboard_controller = dashboard_controller @@ -46,6 +46,7 @@ def export_profiling_info(self, data: dict): raise NotImplementedError("Not implemented yet.") def run(self): + # TODO(sang): Health check. # TODO(sang): Add error handling. while True: time.sleep(self.update_frequency) diff --git a/python/ray/node.py b/python/ray/node.py index fdeef3e45989..6bed68e3b574 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -472,13 +472,14 @@ def start_reporter(self): process_info ] - def start_dashboard(self, require_webui): + def start_dashboard(self, require_webui, hosted_dashboard_addr): """Start the dashboard. Args: require_webui (bool): If true, this will raise an exception if we fail to start the webui. Otherwise it will print a warning if we fail to start the webui. + hosted_dashboard_addr (str): The address users host their dashboard. """ stdout_file, stderr_file = self.new_log_files("dashboard", True) self._webui_url, process_info = ray.services.start_dashboard( @@ -486,6 +487,7 @@ def start_dashboard(self, require_webui): self._ray_params.webui_host, self.redis_address, self._temp_dir, + hosted_dashboard_addr, stdout_file=stdout_file, stderr_file=stderr_file, redis_password=self._ray_params.redis_password) @@ -614,10 +616,9 @@ def start_head_processes(self): self.start_monitor() self.start_raylet_monitor() - if self._ray_params.include_webui: - self.start_dashboard(require_webui=True) - elif self._ray_params.include_webui is None: - self.start_dashboard(require_webui=False) + self.start_dashboard( + require_webui=self._ray_params.include_webui, + hosted_dashboard_addr=self._ray_params.hosted_dashboard_addr) def start_ray_processes(self): """Start all of the processes on the node.""" diff --git a/python/ray/parameter.py b/python/ray/parameter.py index f4750e71dc2b..98deb267838d 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -80,6 +80,7 @@ class RayParams: java_worker_options (str): The command options for Java worker. load_code_from_local: Whether load code from local file or from GCS. use_pickle: Whether data objects should be serialized with cloudpickle. + hosted_dashboard_addr: The address where users host their dashboard. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. """ @@ -121,6 +122,7 @@ def __init__(self, java_worker_options=None, load_code_from_local=False, use_pickle=False, + hosted_dashboard_addr=None, _internal_config=None): self.object_id_seed = object_id_seed self.redis_address = redis_address @@ -156,6 +158,7 @@ def __init__(self, self.java_worker_options = java_worker_options self.load_code_from_local = load_code_from_local self.use_pickle = use_pickle + self.hosted_dashboard_addr = hosted_dashboard_addr self._internal_config = _internal_config self._check_usage() diff --git a/python/ray/services.py b/python/ray/services.py index 861e3228e2b0..350319d5257e 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1008,6 +1008,7 @@ def start_dashboard(require_webui, host, redis_address, temp_dir, + hosted_dashboard_addr, stdout_file=None, stderr_file=None, redis_password=None): @@ -1021,6 +1022,7 @@ def start_dashboard(require_webui, redis_address (str): The address of the Redis instance. temp_dir (str): The temporary directory used for log files and information for this Ray session. + hosted_dashboard_addr (str): The address users host their dashboard. stdout_file: A file handle opened for writing to redirect stdout to. If no redirection should happen, then this should be None. stderr_file: A file handle opened for writing to redirect stderr to. If @@ -1050,11 +1052,14 @@ def start_dashboard(require_webui, "--host={}".format(host), "--port={}".format(port), "--redis-address={}".format(redis_address), - "--temp-dir={}".format(temp_dir), + "--temp-dir={}".format(temp_dir) ] if redis_password: command += ["--redis-password", redis_password] + if hosted_dashboard_addr: + command += ["--hosted-dashboard-addr={}".format(hosted_dashboard_addr)] + webui_dependencies_present = True try: import aiohttp # noqa: F401 @@ -1075,12 +1080,30 @@ def start_dashboard(require_webui, ray_constants.PROCESS_TYPE_DASHBOARD, stdout_file=stdout_file, stderr_file=stderr_file) + + dashboard_url = "" - dashboard_url = "{}:{}".format( - host if host != "0.0.0.0" else get_node_ip_address(), port) + if not hosted_dashboard_addr: + dashboard_url = "{}:{}".format( + host if host != "0.0.0.0" else get_node_ip_address(), port) + else: + # TODO(sang): Find a way to read hosted dashboard url + dashboard_url = "localhost:8266" + logger.info("View the Ray dashboard at {}{}{}{}{}".format( colorama.Style.BRIGHT, colorama.Fore.GREEN, dashboard_url, colorama.Fore.RESET, colorama.Style.NORMAL)) + + if not hosted_dashboard_addr: + enable_hosted_dashboard_url = "{}/to_host".format(dashboard_url) + logger.info("To host your dashboard, go to {}{}{}{}{}" \ + .format( + colorama.Style.BRIGHT, + colorama.Fore.GREEN, + enable_hosted_dashboard_url, + colorama.Fore.RESET, + colorama.Style.NORMAL)) + return dashboard_url, process_info else: return None, None diff --git a/python/ray/worker.py b/python/ray/worker.py index c9fffa23bfd9..ed66763d5a9c 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -566,6 +566,7 @@ def init(address=None, temp_dir=None, load_code_from_local=False, use_pickle=ray.cloudpickle.FAST_CLOUDPICKLE_USED, + hosted_dashboard_addr=None, _internal_config=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -658,6 +659,7 @@ def init(address=None, load_code_from_local: Whether code should be loaded from a local module or from the GCS. use_pickle: Whether data objects should be serialized with cloudpickle. + hosted_dashboard_addr: The address users host their dashboard. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. @@ -739,6 +741,7 @@ def init(address=None, temp_dir=temp_dir, load_code_from_local=load_code_from_local, use_pickle=use_pickle, + hosted_dashboard_addr=hosted_dashboard_addr, _internal_config=_internal_config, ) # Start the Ray processes. We set shutdown_at_exit=False because we From 2bfc766c3f1f95b48ca12d3fa2c1ed7e713592aa Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 12 Feb 2020 16:55:09 -0800 Subject: [PATCH 3/6] Clean up --- .../dashboard/closed_source/auth_server.py | 36 ++++++++++++++----- .../prometheus/prometheus_export_server.py | 6 ++-- python/ray/dashboard/dashboard.py | 9 ++++- .../dashboard_controller_interface.py | 10 +++++- .../hosted_dashboard/dashboard_client.py | 9 ++--- python/ray/services.py | 1 + 6 files changed, 51 insertions(+), 20 deletions(-) diff --git a/python/ray/dashboard/closed_source/auth_server.py b/python/ray/dashboard/closed_source/auth_server.py index 14b6a3794a3e..423f93428c4b 100644 --- a/python/ray/dashboard/closed_source/auth_server.py +++ b/python/ray/dashboard/closed_source/auth_server.py @@ -1,21 +1,39 @@ +import argparse + from aiohttp import web -async def handle(request): - print(request.match_info) - text = {'good': 'bad'} - return web.json_response(text) +async def health(request): + return web.json_response({ + 'Status': 'Healty' + }) async def return_ingest_server_url(request): + # TODO(sang): Prepare the proper authorization process. result = { 'ingestor_url': 'localhost:50051', 'access_token': '1234' } return web.json_response(result) -app = web.Application() -app.add_routes([web.get('/', handle), - web.get('/auth', return_ingest_server_url)]) if __name__ == '__main__': - host, port = '127.0.0.1', 8080 - web.run_app(app, host=host, port=port, shutdown_timeout=5.0) \ No newline at end of file + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + required=False, + type=str, + default='127.0.0.1', + help="The host to use for the GRPC server.") + parser.add_argument( + "--port", + required=False, + default=8080, + type=str, + help="The port to use for the GRPC server.") + args = parser.parse_args() + app = web.Application() + app.add_routes([ + web.get('/auth', return_ingest_server_url), + web.get('/health', health)] + ) + web.run_app(app, host=args.host, port=args.port, shutdown_timeout=5.0) diff --git a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py index 5d259a3129f6..aa89b54541a0 100644 --- a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py +++ b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py @@ -16,7 +16,8 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) - +# TODO(sang) Refactor to have more scalable structure +# TODO(sang) Add args if __name__ == '__main__': redis_client = redis.StrictRedis(host="127.0.0.1", port=6379) p = redis_client.pubsub(ignore_subscribe_messages=True) @@ -26,6 +27,7 @@ metrics_mem_usage = Gauge("ray_metrics_mem_usage", "Memory usage of pid residing in a host.", labelnames=("pid", "host", "ip")) # Start up the server to expose the metrics. + # TODO(sang) Read from args host, port = '127.0.0.1', 8000 logger.info('Server listening on port {} at address {}'.format(host, port)) start_http_server(port) @@ -35,8 +37,6 @@ data = x["data"] channel = ray.utils.decode(x["channel"]) data = json.loads(ray.utils.decode(data)) - print(channel) - print(data) if channel == NODE_INFO_CHANNEL: clients = data['clients'] diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 21ea6ffb2ea8..b06fdfe96344 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -32,9 +32,10 @@ from ray.core.generated import reporter_pb2_grpc from ray.core.generated import core_worker_pb2 from ray.core.generated import core_worker_pb2_grpc -from ray.dashboard.hosted_dashboard.dashboard_client import DashboardClient from ray.dashboard.dashboard_controller_interface \ import BaseDashboardController +from ray.dashboard.hosted_dashboard.dashboard_client import DashboardClient +from ray.dashboard.hosted_dashboard.exporter import Exporter try: from ray.tune.result import DEFAULT_RESULTS_DIR @@ -202,6 +203,12 @@ def start_collecting_metrics(self): self.node_stats.start() self.raylet_stats.start() + def start_exporting_metrics(self, auth_info): + exporter = Exporter(self.auth_info.get("ingestor_url"), + self.auth_info.get("access_token"), + dashboard_controller) + exporter.start() + class Dashboard(object): """A dashboard process for monitoring Ray nodes. diff --git a/python/ray/dashboard/dashboard_controller_interface.py b/python/ray/dashboard/dashboard_controller_interface.py index 5e57b25154b4..374d882ec8eb 100644 --- a/python/ray/dashboard/dashboard_controller_interface.py +++ b/python/ray/dashboard/dashboard_controller_interface.py @@ -2,10 +2,12 @@ class BaseDashboardController(ABC): - """Interface to get Ray cluster metrics and control actions + """Perform data fetching and other actions required by Dashboard Make sure you run start_collecting_metrics function before using get_[stats]_info methods. + + TODO(sang): Write descriptions and requirements for each interface """ @abstractmethod @@ -42,4 +44,10 @@ def get_errors(self, hostname, pid): @abstractmethod def start_collecting_metrics(self): + """Start threads/processes/actors to collect metrics + + NOTE: This interface should be called before using other + interface. + """ raise NotImplementedError("Please implement this method.") + diff --git a/python/ray/dashboard/hosted_dashboard/dashboard_client.py b/python/ray/dashboard/hosted_dashboard/dashboard_client.py index 25242183096e..3f74b8bccbb4 100644 --- a/python/ray/dashboard/hosted_dashboard/dashboard_client.py +++ b/python/ray/dashboard/hosted_dashboard/dashboard_client.py @@ -3,16 +3,13 @@ from ray.dashboard.hosted_dashboard.exporter import Exporter class DashboardClient: - """Managing the communication to hosted dashboard. + """Managing the authentication to external services. Args: - dashboard_controller (BaseDashboardController): Dashboard - controller that is used to export metrics. - hosted_dashboard_addr (str): The address users host their - dashboard. + host: Host address of service that are used to authenticate. + port: Port of the host that are used to authenticate. Attributes: - ingestor_url(str): Address that metrics will be exported. exporter(Exporter): Exporter thread that keeps exporting metrics to the external services. """ diff --git a/python/ray/services.py b/python/ray/services.py index 350319d5257e..e671d4624081 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1089,6 +1089,7 @@ def start_dashboard(require_webui, else: # TODO(sang): Find a way to read hosted dashboard url dashboard_url = "localhost:8266" + logger.info("Dashboard is running in a host {}".format(dashboard_url)) logger.info("View the Ray dashboard at {}{}{}{}{}".format( colorama.Style.BRIGHT, colorama.Fore.GREEN, dashboard_url, From b480609411a3a2a1bbeb1214890ad1cf5a473cbe Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 12 Feb 2020 16:57:03 -0800 Subject: [PATCH 4/6] Fix bug --- python/ray/dashboard/dashboard.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index b06fdfe96344..21ea6ffb2ea8 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -32,10 +32,9 @@ from ray.core.generated import reporter_pb2_grpc from ray.core.generated import core_worker_pb2 from ray.core.generated import core_worker_pb2_grpc +from ray.dashboard.hosted_dashboard.dashboard_client import DashboardClient from ray.dashboard.dashboard_controller_interface \ import BaseDashboardController -from ray.dashboard.hosted_dashboard.dashboard_client import DashboardClient -from ray.dashboard.hosted_dashboard.exporter import Exporter try: from ray.tune.result import DEFAULT_RESULTS_DIR @@ -203,12 +202,6 @@ def start_collecting_metrics(self): self.node_stats.start() self.raylet_stats.start() - def start_exporting_metrics(self, auth_info): - exporter = Exporter(self.auth_info.get("ingestor_url"), - self.auth_info.get("access_token"), - dashboard_controller) - exporter.start() - class Dashboard(object): """A dashboard process for monitoring Ray nodes. From c99112e0c11ddc6a14c50618b5041712aed54c18 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 12 Feb 2020 17:01:40 -0800 Subject: [PATCH 5/6] Formatting. --- .../dashboard/closed_source/auth_server.py | 21 +++++----- .../prometheus/prometheus_export_server.py | 39 +++++++++++-------- python/ray/dashboard/dashboard.py | 35 ++++++++--------- .../dashboard_controller_interface.py | 7 ++-- python/ray/dashboard/dashboard_main.py | 4 +- .../hosted_dashboard/dashboard_client.py | 14 ++++--- .../dashboard/hosted_dashboard/exporter.py | 1 + python/ray/node.py | 4 +- python/ray/services.py | 28 ++++++------- python/ray/worker.py | 2 +- 10 files changed, 75 insertions(+), 80 deletions(-) diff --git a/python/ray/dashboard/closed_source/auth_server.py b/python/ray/dashboard/closed_source/auth_server.py index 423f93428c4b..97d4968d1965 100644 --- a/python/ray/dashboard/closed_source/auth_server.py +++ b/python/ray/dashboard/closed_source/auth_server.py @@ -2,27 +2,24 @@ from aiohttp import web + async def health(request): - return web.json_response({ - 'Status': 'Healty' - }) + return web.json_response({"Status": "Healty"}) + async def return_ingest_server_url(request): # TODO(sang): Prepare the proper authorization process. - result = { - 'ingestor_url': 'localhost:50051', - 'access_token': '1234' - } + result = {"ingestor_url": "localhost:50051", "access_token": "1234"} return web.json_response(result) -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--host", required=False, type=str, - default='127.0.0.1', + default="127.0.0.1", help="The host to use for the GRPC server.") parser.add_argument( "--port", @@ -33,7 +30,7 @@ async def return_ingest_server_url(request): args = parser.parse_args() app = web.Application() app.add_routes([ - web.get('/auth', return_ingest_server_url), - web.get('/health', health)] - ) + web.get("/auth", return_ingest_server_url), + web.get("/health", health) + ]) web.run_app(app, host=args.host, port=args.port, shutdown_timeout=5.0) diff --git a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py index aa89b54541a0..772dfe67c4e1 100644 --- a/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py +++ b/python/ray/dashboard/closed_source/prometheus/prometheus_export_server.py @@ -1,9 +1,6 @@ -import argparse import json import logging -import random import redis -import time # TODO(sang): Add this module to requirements from prometheus_client import start_http_server, Gauge @@ -18,18 +15,24 @@ # TODO(sang) Refactor to have more scalable structure # TODO(sang) Add args -if __name__ == '__main__': +if __name__ == "__main__": redis_client = redis.StrictRedis(host="127.0.0.1", port=6379) p = redis_client.pubsub(ignore_subscribe_messages=True) p.psubscribe(RAY_INFO_CHANNEL) p.psubscribe(NODE_INFO_CHANNEL) - metrics_cpu_usage = Gauge("ray_metrics_cpu_usage", "CPU usage of pid residing in a host.", labelnames=("pid", "host", "ip")) - metrics_mem_usage = Gauge("ray_metrics_mem_usage", "Memory usage of pid residing in a host.", labelnames=("pid", "host", "ip")) + metrics_cpu_usage = Gauge( + "ray_metrics_cpu_usage", + "CPU usage of pid residing in a host.", + labelnames=("pid", "host", "ip")) + metrics_mem_usage = Gauge( + "ray_metrics_mem_usage", + "Memory usage of pid residing in a host.", + labelnames=("pid", "host", "ip")) # Start up the server to expose the metrics. # TODO(sang) Read from args - host, port = '127.0.0.1', 8000 - logger.info('Server listening on port {} at address {}'.format(host, port)) + host, port = "127.0.0.1", 8000 + logger.info("Server listening on port {} at address {}".format(host, port)) start_http_server(port) # Read data from Redis. @@ -39,15 +42,17 @@ data = json.loads(ray.utils.decode(data)) if channel == NODE_INFO_CHANNEL: - clients = data['clients'] + clients = data["clients"] for client in clients: - host = client['hostname'] - ip = client['ip'] - workers = client['workers'] + host = client["hostname"] + ip = client["ip"] + workers = client["workers"] for worker in workers: - pid = worker['pid'] - cpu_usage = worker['cpu_percent'] - mem_usage = worker['memory_info']['rss'] - metrics_cpu_usage.labels(pid=pid, host=host, ip=ip).set(cpu_usage) - metrics_mem_usage.labels(pid=pid, host=host, ip=ip).set(mem_usage) + pid = worker["pid"] + cpu_usage = worker["cpu_percent"] + mem_usage = worker["memory_info"]["rss"] + metrics_cpu_usage.labels( + pid=pid, host=host, ip=ip).set(cpu_usage) + metrics_mem_usage.labels( + pid=pid, host=host, ip=ip).set(mem_usage) diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 21ea6ffb2ea8..07ba6e456408 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -97,7 +97,7 @@ def b64_decode(reply): def get_host_and_port(addr): - return addr.strip().split(':') + return addr.strip().split(":") class DashboardController(BaseDashboardController): @@ -119,8 +119,8 @@ def _construct_raylet_info(self): # ready_tasks are used to render tasks that are not schedulable # due to resource limitations. # (e.g., Actor requires 2 GPUs but there is only 1 gpu available). - ready_tasks = sum( - (data.get("readyTasks", []) for data in D.values()), []) + ready_tasks = sum((data.get("readyTasks", []) for data in D.values()), + []) actor_tree = self.node_stats.get_actor_tree( workers_info_by_node, infeasible_tasks, ready_tasks) for address, data in D.items(): @@ -129,8 +129,8 @@ def _construct_raylet_info(self): for view_data in data["viewData"]: view_name = view_data["viewName"] if view_name in ("local_available_resource", - "local_total_resource", - "object_manager_stats"): + "local_total_resource", + "object_manager_stats"): measures_dicts[view_name] = measures_to_dict( view_data["measures"]) # process resources info @@ -154,9 +154,8 @@ def _construct_raylet_info(self): for stats_name in [ "used_object_store_memory", "num_local_objects" ]: - stats_value = measures_dicts[ - "object_manager_stats"].get( - prefix + stats_name, .0) + stats_value = measures_dicts["object_manager_stats"].get( + prefix + stats_name, .0) extra_info_strings.append("{}: {}".format( stats_name, stats_value)) data["extraInfo"] += ", ".join(extra_info_strings) @@ -167,8 +166,7 @@ def _construct_raylet_info(self): max_line_length = max(map(len, lines)) to_print = [] for line in lines: - to_print.append(line + - (max_line_length - len(line)) * " ") + to_print.append(line + (max_line_length - len(line)) * " ") data["extraInfo"] += "\n" + "\n".join(to_print) return {"nodes": D, "actors": actor_tree} @@ -249,10 +247,9 @@ def __init__(self, if self.hosted_dashboard_addr: hosted_dashboard_host, hosted_dashboard_port = get_host_and_port( self.hosted_dashboard_addr) - self.dashboard_client = DashboardClient( - hosted_dashboard_host, - hosted_dashboard_port, - self.dashboard_controller) + self.dashboard_client = DashboardClient(hosted_dashboard_host, + hosted_dashboard_port, + self.dashboard_controller) # Setting the environment variable RAY_DASHBOARD_DEV=1 disables some # security checks in the dashboard server to ease development while @@ -841,10 +838,12 @@ def __init__(self, logdir, reload_interval): try: logger.info("Create a directory at {}".format(self._logdir)) os.mkdir(self._logdir) - except: - FileNotFoundError("Log directory {} does not exist. " - "Please create the directory to use Tune " - "collector".format(self._logdir)) + except OSError as e: + logger.warning(e) + raise FileNotFoundError( + "Log directory {} does not exist. " + "Please create the directory to use Tune " + "collector".format(self._logdir)) super().__init__() diff --git a/python/ray/dashboard/dashboard_controller_interface.py b/python/ray/dashboard/dashboard_controller_interface.py index 374d882ec8eb..814853e7cb2f 100644 --- a/python/ray/dashboard/dashboard_controller_interface.py +++ b/python/ray/dashboard/dashboard_controller_interface.py @@ -2,7 +2,7 @@ class BaseDashboardController(ABC): - """Perform data fetching and other actions required by Dashboard + """Perform data fetching and other actions required by Dashboard Make sure you run start_collecting_metrics function before using get_[stats]_info methods. @@ -45,9 +45,8 @@ def get_errors(self, hostname, pid): @abstractmethod def start_collecting_metrics(self): """Start threads/processes/actors to collect metrics - - NOTE: This interface should be called before using other + + NOTE: This interface should be called before using other interface. """ raise NotImplementedError("Please implement this method.") - diff --git a/python/ray/dashboard/dashboard_main.py b/python/ray/dashboard/dashboard_main.py index 9a269789b331..2510d104e6b2 100644 --- a/python/ray/dashboard/dashboard_main.py +++ b/python/ray/dashboard/dashboard_main.py @@ -57,8 +57,7 @@ required=False, type=str, default=None, - help="Specify the address where user dashboard will be hosted." - ) + help="Specify the address where user dashboard will be hosted.") args = parser.parse_args() ray.utils.setup_logger(args.logging_level, args.logging_format) @@ -82,4 +81,3 @@ ray.utils.push_error_to_driver_through_redis( redis_client, ray_constants.DASHBOARD_DIED_ERROR, message) raise e - \ No newline at end of file diff --git a/python/ray/dashboard/hosted_dashboard/dashboard_client.py b/python/ray/dashboard/hosted_dashboard/dashboard_client.py index 3f74b8bccbb4..4751e5f10169 100644 --- a/python/ray/dashboard/hosted_dashboard/dashboard_client.py +++ b/python/ray/dashboard/hosted_dashboard/dashboard_client.py @@ -2,6 +2,7 @@ from ray.dashboard.hosted_dashboard.exporter import Exporter + class DashboardClient: """Managing the authentication to external services. @@ -20,20 +21,21 @@ def __init__(self, host, port, dashboard_controller): self.auth_info = self._connect() self.exporter = Exporter( - self.auth_info.get("ingestor_url"), - self.auth_info.get("access_token"), - dashboard_controller) - + self.auth_info.get("ingestor_url"), + self.auth_info.get("access_token"), dashboard_controller) + def _authorize(self): resp = requests.get(self.auth_url, timeout=self.timeout) status = resp.status_code json_response = resp.json() - return status, json_response["ingestor_url"], json_response["access_token"] + return status, json_response["ingestor_url"], json_response[ + "access_token"] def _connect(self): status, ingestor_url, access_token = self._authorize() if status != 200: - raise ConnectionError("Failed to authorize to hosted dashbaord server.") + raise ConnectionError( + "Failed to authorize to hosted dashbaord server.") auth_info = { "ingestor_url": ingestor_url, diff --git a/python/ray/dashboard/hosted_dashboard/exporter.py b/python/ray/dashboard/hosted_dashboard/exporter.py index f3302ab54b92..3589c406dbfe 100644 --- a/python/ray/dashboard/hosted_dashboard/exporter.py +++ b/python/ray/dashboard/hosted_dashboard/exporter.py @@ -7,6 +7,7 @@ from ray.core.generated import dashboard_pb2 from ray.core.generated import dashboard_pb2_grpc + class Exporter(threading.Thread): """Thread that keeps running and export metrics""" diff --git a/python/ray/node.py b/python/ray/node.py index 6bed68e3b574..bd0214b47670 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -479,7 +479,7 @@ def start_dashboard(self, require_webui, hosted_dashboard_addr): require_webui (bool): If true, this will raise an exception if we fail to start the webui. Otherwise it will print a warning if we fail to start the webui. - hosted_dashboard_addr (str): The address users host their dashboard. + hosted_dashboard_addr (str): The address users host dashboard. """ stdout_file, stderr_file = self.new_log_files("dashboard", True) self._webui_url, process_info = ray.services.start_dashboard( @@ -617,7 +617,7 @@ def start_head_processes(self): self.start_monitor() self.start_raylet_monitor() self.start_dashboard( - require_webui=self._ray_params.include_webui, + require_webui=self._ray_params.include_webui, hosted_dashboard_addr=self._ray_params.hosted_dashboard_addr) def start_ray_processes(self): diff --git a/python/ray/services.py b/python/ray/services.py index e671d4624081..c32b1aa661e5 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1046,12 +1046,8 @@ def start_dashboard(require_webui, os.path.dirname(os.path.abspath(__file__)), "dashboard/dashboard_main.py") command = [ - sys.executable, - "-u", - dashboard_filepath, - "--host={}".format(host), - "--port={}".format(port), - "--redis-address={}".format(redis_address), + sys.executable, "-u", dashboard_filepath, "--host={}".format(host), + "--port={}".format(port), "--redis-address={}".format(redis_address), "--temp-dir={}".format(temp_dir) ] if redis_password: @@ -1080,7 +1076,7 @@ def start_dashboard(require_webui, ray_constants.PROCESS_TYPE_DASHBOARD, stdout_file=stdout_file, stderr_file=stderr_file) - + dashboard_url = "" if not hosted_dashboard_addr: @@ -1089,21 +1085,19 @@ def start_dashboard(require_webui, else: # TODO(sang): Find a way to read hosted dashboard url dashboard_url = "localhost:8266" - logger.info("Dashboard is running in a host {}".format(dashboard_url)) - + logger.info( + "Dashboard is running in a host {}".format(dashboard_url)) + logger.info("View the Ray dashboard at {}{}{}{}{}".format( colorama.Style.BRIGHT, colorama.Fore.GREEN, dashboard_url, colorama.Fore.RESET, colorama.Style.NORMAL)) - + if not hosted_dashboard_addr: enable_hosted_dashboard_url = "{}/to_host".format(dashboard_url) - logger.info("To host your dashboard, go to {}{}{}{}{}" \ - .format( - colorama.Style.BRIGHT, - colorama.Fore.GREEN, - enable_hosted_dashboard_url, - colorama.Fore.RESET, - colorama.Style.NORMAL)) + logger.info("To host your dashboard, go to {}{}{}{}{}".format( + colorama.Style.BRIGHT, colorama.Fore.GREEN, + enable_hosted_dashboard_url, colorama.Fore.RESET, + colorama.Style.NORMAL)) return dashboard_url, process_info else: diff --git a/python/ray/worker.py b/python/ray/worker.py index ed66763d5a9c..40d84cd846a8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -659,7 +659,7 @@ def init(address=None, load_code_from_local: Whether code should be loaded from a local module or from the GCS. use_pickle: Whether data objects should be serialized with cloudpickle. - hosted_dashboard_addr: The address users host their dashboard. + hosted_dashboard_addr: The address users host their dashboard. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. From 9c48eff1c36a28e49b7a851b97107eea731f7cbb Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 17 Feb 2020 15:53:46 -0800 Subject: [PATCH 6/6] Added a todo comment to implement frontend ui --- python/ray/services.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/services.py b/python/ray/services.py index c32b1aa661e5..2beb2b736fc1 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1093,6 +1093,7 @@ def start_dashboard(require_webui, colorama.Fore.RESET, colorama.Style.NORMAL)) if not hosted_dashboard_addr: + # TODO(simon): Implement frontend UI for this. enable_hosted_dashboard_url = "{}/to_host".format(dashboard_url) logger.info("To host your dashboard, go to {}{}{}{}{}".format( colorama.Style.BRIGHT, colorama.Fore.GREEN,