Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Make doc code snippet testable [8/n] #36963

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions doc/source/ray-core/examples/plot_example-a3c.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ Worker Code Walkthrough

We use a Ray Actor to simulate the environment.

.. code-block:: python
.. testcode::
:skipif: True

import numpy as np
import ray

@ray.remote
class Runner(object):
class Runner:
"""Actor object to start running simulation on workers.
Gradient computation is also executed on this object."""
def __init__(self, env_name, actor_id):
Expand Down Expand Up @@ -124,7 +125,7 @@ global model parameters. The main training script looks like the following.

.. TODO: this is untested code. literalinclude and test.

.. code-block:: python
.. testcode::

import numpy as np
import ray
Expand Down
16 changes: 10 additions & 6 deletions doc/source/ray-core/examples/plot_example-lm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ We provide ``ray_train.py`` (`code <https://github.com/ray-project/ray/tree/mast

Two main components of ``ray_train.py`` are a ``RayDistributedActor`` class and a function ``run_fault_tolerant_loop()``. The ``RayDistributedActor`` sets proper arguments for different ray actor processes, adds a checkpoint hook to enable the process to make use of new available GPUs, and calls the ``main`` of Fairseq:

.. code-block:: python
.. testcode::
:skipif: True

import math
import copy
Expand Down Expand Up @@ -150,7 +151,8 @@ Two main components of ``ray_train.py`` are a ``RayDistributedActor`` class and

The function ``run_fault_tolerant_loop()`` provides fault-tolerance by catching failure and restart the computation:

.. code-block:: python
.. testcode::
:skipif: True

def run_fault_tolerant_loop():
"""Entrance function to the fairseq library, providing fault-tolerance."""
Expand Down Expand Up @@ -202,7 +204,8 @@ The function ``run_fault_tolerant_loop()`` provides fault-tolerance by catching

In ``ray_train.py``, we also define a set of helper functions. ``add_ray_args()`` adds Ray and fault-tolerant training related arguments to the argument parser:

.. code-block:: python
.. testcode::
:skipif: True

def add_ray_args(parser):
"""Add ray and fault-tolerance related parser arguments to the parser."""
Expand All @@ -226,8 +229,8 @@ In ``ray_train.py``, we also define a set of helper functions. ``add_ray_args()`

``set_num_resources()`` sets the distributed world size to be the number of resources. Also if we want to use GPUs but the current number of GPUs is 0, the function will wait until there is GPU available:

.. code-block:: python

.. testcode::
:skipif: True

def set_num_resources(args):
"""Get the number of resources and set the corresponding fields."""
Expand All @@ -245,7 +248,8 @@ In ``ray_train.py``, we also define a set of helper functions. ``add_ray_args()`

``set_batch_size()`` keeps the effective batch size to be relatively the same given different number of GPUs:

.. code-block:: python
.. testcode::
:skipif: True

def set_batch_size(args):
"""Fixes the total batch_size to be agnostic to the GPU count."""
Expand Down
16 changes: 10 additions & 6 deletions doc/source/ray-core/examples/testing-tips.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ Tip 2: Sharing the ray cluster across tests if possible

It is safest to start a new ray cluster for each test.

.. code-block:: python
.. testcode::

import unittest

class RayTest(unittest.TestCase):
def setUp(self):
Expand All @@ -44,7 +46,7 @@ Across 20 tests, this ends up being 90 seconds of added overhead.

Reusing a Ray cluster across tests can provide significant speedups to your test suite. This reduces the overhead to a constant, amortized quantity:

.. code-block:: python
.. testcode::

class RayClassTest(unittest.TestCase):
@classmethod
Expand Down Expand Up @@ -72,7 +74,7 @@ If writing an application for a cluster setting, you may want to mock a multi-no
On Windows, support for multi-node Ray clusters is currently experimental and untested.
If you run into issues please file a report at https://github.com/ray-project/ray/issues.

.. code-block:: python
.. testcode::

from ray.cluster_utils import Cluster

Expand All @@ -85,7 +87,9 @@ If writing an application for a cluster setting, you may want to mock a multi-no

After starting a cluster, you can execute a typical ray script in the same process:

.. code-block:: python
.. testcode::

import ray

ray.init(address=cluster.address)

Expand All @@ -108,15 +112,15 @@ After starting a cluster, you can execute a typical ray script in the same proce

You can also add multiple nodes, each with different resource quantities:

.. code-block:: python
.. testcode::

mock_node = cluster.add_node(num_cpus=10)

assert ray.cluster_resources()["CPU"] == 20

You can also remove nodes, which is useful when testing failure-handling logic:

.. code-block:: python
.. testcode::

cluster.remove_node(mock_node)

Expand Down
3 changes: 2 additions & 1 deletion python/ray/_private/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ def profile(event_type, extra_data=None):

This function can be used as follows (both on the driver or within a task).

.. code-block:: python
.. testcode::
import ray._private.profiling as profiling

with profiling.profile("custom event", extra_data={'key': 'val'}):
# Do some computation here.
x = 1 * 2

Optionally, a dictionary can be passed as the "extra_data" argument, and
it can have keys "name" and "cname" if you want to override the default
Expand Down
101 changes: 0 additions & 101 deletions python/ray/_private/ray_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import re
import sys
import threading
from logging.handlers import RotatingFileHandler
import time
from typing import Callable, Dict, List, Set, Tuple, Any, Optional

Expand Down Expand Up @@ -97,106 +96,6 @@ def setup_component_logger(
"""


class StandardStreamInterceptor:
"""Used to intercept stdout and stderr.

Intercepted messages are handled by the given logger.

NOTE: The logger passed to this method should always have
logging.INFO severity level.

Example:

.. code-block:: python

from contextlib import redirect_stdout
logger = logging.getLogger("ray_logger")
hook = StandardStreamHook(logger)
with redirect_stdout(hook):
print("a") # stdout will be delegated to logger.

Args:
logger: Python logger that will receive messages streamed to
the standard out/err and delegate writes.
intercept_stdout: True if the class intercepts stdout. False
if stderr is intercepted.
"""

def __init__(self, logger, intercept_stdout=True):
self.logger = logger
assert (
len(self.logger.handlers) == 1
), "Only one handler is allowed for the interceptor logger."
self.intercept_stdout = intercept_stdout

def write(self, message):
"""Redirect the original message to the logger."""
self.logger.info(message)
return len(message)

def flush(self):
for handler in self.logger.handlers:
handler.flush()

def isatty(self):
# Return the standard out isatty. This is used by colorful.
fd = 1 if self.intercept_stdout else 2
return os.isatty(fd)

def close(self):
handler = self.logger.handlers[0]
handler.close()

def fileno(self):
handler = self.logger.handlers[0]
return handler.stream.fileno()


class StandardFdRedirectionRotatingFileHandler(RotatingFileHandler):
"""RotatingFileHandler that redirects stdout and stderr to the log file.

It is specifically used to default_worker.py.

The only difference from this handler vs original RotatingFileHandler is
that it actually duplicates the OS level fd using os.dup2.
"""

def __init__(
self,
filename,
mode="a",
maxBytes=0,
backupCount=0,
encoding=None,
delay=False,
is_for_stdout=True,
):
super().__init__(
filename,
mode=mode,
maxBytes=maxBytes,
backupCount=backupCount,
encoding=encoding,
delay=delay,
)
self.is_for_stdout = is_for_stdout
self.switch_os_fd()

def doRollover(self):
super().doRollover()
self.switch_os_fd()

def get_original_stream(self):
if self.is_for_stdout:
return sys.stdout
else:
return sys.stderr

def switch_os_fd(self):
# Old fd will automatically closed by dup2 when necessary.
os.dup2(self.stream.fileno(), self.get_original_stream().fileno())


def get_worker_log_file_name(worker_type, job_id=None):
if job_id is None:
job_id = os.environ.get("RAY_JOB_ID")
Expand Down
8 changes: 5 additions & 3 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,22 +1137,24 @@ def init(
This will autodetect an existing Ray cluster or start a new Ray instance if
no existing cluster is found:

.. code-block:: python
.. testcode::

ray.init()

To explicitly connect to an existing local cluster, use this as follows. A
ConnectionError will be thrown if no existing local cluster is found.

.. code-block:: python
.. testcode::
:skipif: True

ray.init(address="auto")

To connect to an existing remote cluster, use this as follows (substituting
in the appropriate address). Note the addition of "ray://" at the beginning
of the address.

.. code-block:: python
.. testcode::
:skipif: True

ray.init(address="ray://123.45.67.89:10001")

Expand Down