From 71598d169f3afdc4504a404002b24d9dd1ae40d4 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 11:34:48 -0500 Subject: [PATCH 1/7] Add docstrings for new stream class, update sphinx docs --- docs/source/api.rst | 6 +++ docs/source/api_pipeline.rst | 9 +++- docs/source/api_streaming.rst | 10 ++++ docs/source/command_line.rst | 4 ++ docs/source/configuration.rst | 4 ++ docs/source/execution.rst | 4 ++ docs/source/extending.rst | 4 ++ docs/source/index.rst | 9 ++-- docs/source/quickstart.rst | 8 ++- docs/source/tasks_graphs.rst | 4 ++ docs/source/usage.rst | 12 +++-- dplutils/pipeline/graph.py | 35 +++++++++++++ dplutils/pipeline/ray.py | 19 +++++++ dplutils/pipeline/stream.py | 99 ++++++++++++++++++++++++++++++++++- 14 files changed, 214 insertions(+), 13 deletions(-) create mode 100644 docs/source/api_streaming.rst create mode 100644 docs/source/command_line.rst create mode 100644 docs/source/configuration.rst create mode 100644 docs/source/execution.rst create mode 100644 docs/source/extending.rst create mode 100644 docs/source/tasks_graphs.rst diff --git a/docs/source/api.rst b/docs/source/api.rst index 3be39ec..0d0ae8d 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -7,3 +7,9 @@ API api_pipeline.rst api_observers.rst api_cli.rst + + +.. toctree:: + :hidden: + + api_streaming.rst diff --git a/docs/source/api_pipeline.rst b/docs/source/api_pipeline.rst index ce75150..c64daf7 100644 --- a/docs/source/api_pipeline.rst +++ b/docs/source/api_pipeline.rst @@ -10,11 +10,17 @@ Backbone :toctree: generated PipelineExecutor + PipelineTask + PipelineGraph + +Executors +--------- +.. currentmodule:: dplutils.pipeline.stream .. autosummary:: :toctree: generated - PipelineTask + StreamingGraphExecutor Ray Execution ------------- @@ -24,3 +30,4 @@ Ray Execution :toctree: generated RayDataPipelineExecutor + RayStreamGraphExecutor diff --git a/docs/source/api_streaming.rst b/docs/source/api_streaming.rst new file mode 100644 index 0000000..be0ff07 --- /dev/null +++ b/docs/source/api_streaming.rst @@ -0,0 +1,10 @@ +.. currentmodule:: dplutils.pipeline.stream +.. autosummary:: + :toctree: generated + + StreamingGraphExecutor.is_task_ready + StreamingGraphExecutor.poll_tasks + StreamingGraphExecutor.split_batch_submit + StreamingGraphExecutor.task_resolve_output + StreamingGraphExecutor.task_submit + StreamingGraphExecutor.task_submittable diff --git a/docs/source/command_line.rst b/docs/source/command_line.rst new file mode 100644 index 0000000..a65df7a --- /dev/null +++ b/docs/source/command_line.rst @@ -0,0 +1,4 @@ +Executing via CLI +================= + +TBD diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst new file mode 100644 index 0000000..ff53442 --- /dev/null +++ b/docs/source/configuration.rst @@ -0,0 +1,4 @@ +Configuring Pipelines +===================== + +TBD diff --git a/docs/source/execution.rst b/docs/source/execution.rst new file mode 100644 index 0000000..bb120b2 --- /dev/null +++ b/docs/source/execution.rst @@ -0,0 +1,4 @@ +Executing Pipelines +=================== + +TBD diff --git a/docs/source/extending.rst b/docs/source/extending.rst new file mode 100644 index 0000000..b917646 --- /dev/null +++ b/docs/source/extending.rst @@ -0,0 +1,4 @@ +Extending Functionality +======================= + +TBD diff --git a/docs/source/index.rst b/docs/source/index.rst index 1c89a4a..7fa10b4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,10 +1,9 @@ -Welcome to Opsins Discovery's documentation! -============================================ +Welcome to dplutils documentation! +================================== -**dplutils** is a Python library for... +**dplutils** is a Python library for remote execution of compute intensive streaming task graphs. -Check out the :doc:`usage` section for further information, including -how to :ref:`installation` the project. +Check out the :doc:`quickstart` to get going immediately, :doc:`usage` section for further information, and the :doc:`api` documentation for details the the classes and methods available. .. note:: diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 08df791..702085e 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -4,7 +4,13 @@ Quick Start Installation ------------ -TBD +Using pip: + + pip install dplutils + +Using docker: + + docker pull ghcr.io/ssec-jhu/dplutils:latest Define Pipeline --------------- diff --git a/docs/source/tasks_graphs.rst b/docs/source/tasks_graphs.rst new file mode 100644 index 0000000..62dd632 --- /dev/null +++ b/docs/source/tasks_graphs.rst @@ -0,0 +1,4 @@ +Tasks and Pipeline Graphs +========================= + +TBD diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 72b4d6d..c9c0e31 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -1,9 +1,11 @@ Usage ===== -.. _installation: +.. toctree:: + :maxdepth: 1 -Installation ------------- - -To use dplutils, first install it using...: + tasks_graphs + configuration + execution + command_line + extending diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index 0bbdfb4..2551c29 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -9,6 +9,15 @@ class TRM(Enum): class PipelineGraph(DiGraph): + """Graph of pipeline tasks. + + This class adds convenience functionality for task pipeline handing on top + of :class:`networkx.DiGraph` on which it is based. + + Args: + graph: This is either a list of :class:`PipelineTask` objects representing a + simple-graph, or anything that is legal input to :class:`networkx.DiGraph`. + """ def __init__(self, graph=None): if isinstance(graph, list) and isinstance(graph[0], PipelineTask): graph = path_graph(graph, DiGraph) @@ -29,6 +38,8 @@ def sink_tasks(self): return [n for n,d in self.out_degree() if d == 0] def to_list(self): + """Return list representation of task iff it is a simple-path graph + """ if len(self.source_tasks) != 1 or len(self.sink_tasks) != 1: raise ValueError('to_list requires a graph with only one start and end task') source = self.source_tasks[0] @@ -59,7 +70,31 @@ def _sort_key(x): yield node def walk_fwd(self, source=None, sort_key=None): + """Walk graph forward in breadth-first order from ``source`` + + This is a generator that yeilds tasks encountered as it walks along + edges in the forward direction, starting at ``source``, or at the set of + :attr:`source_tasks` if not supplied. + + Args: + source: starting task of walk, defaults to :attr:`source_tasks` + sort_key: when multiple out-egdes are encountered, sort the yeilded + tasks in order of callable `sort_key`, which should return a + sortable object given :class:`PipelineTask` as input. + """ return self._walk(source or TRM.source, back=False, sort_key=sort_key) def walk_back(self, source=None, sort_key=None): + """Walk graph backward in breadth-first order from ``source`` + + This is a generator that yeilds tasks encountered as it walks along + edges in the reverse direction, starting at ``source``, or at the set of + :attr:`sink_tasks` if not supplied. + + Args: + source: starting task of walk, defaults to :attr:`source_tasks` + sort_key: when multiple in-egdes are encountered, sort the yeilded + tasks in order of callable `sort_key`, which should return a + sortable object given :class:`PipelineTask` as input. + """ return self._walk(source or TRM.sink, back=True, sort_key=sort_key) diff --git a/dplutils/pipeline/ray.py b/dplutils/pipeline/ray.py index 4c63b50..08f0b13 100644 --- a/dplutils/pipeline/ray.py +++ b/dplutils/pipeline/ray.py @@ -135,6 +135,25 @@ class RemoteTracker: class RayStreamGraphExecutor(StreamingGraphExecutor): + """Ray-based implementation of stream graph executor. + + All task outputs are kept in object store and only de-serialized as needed + for execution, until yeilded by :meth:`run`, where they are de-serialized on + the driver. + + This executor will attempt to pack the cluster, irrespective of any other + workloads. + + Note: + Ray cluster will be initialized with defaults upon run if it hasn't + already been initialized + + Args: + ray_poll_timeout: After scheduling as many tasks as can fit, ray.wait on + all pending tasks for ray_poll_timeout seconds. The timeout gives + opportunity to re-evaluate cluster resources in case it has expanded + since last scheduling loop + """ def __init__(self, *args, ray_poll_timeout: int = 20, **kwargs): super().__init__(*args, **kwargs) # bootstrap remote tasks at initialization and keep reference - this is diff --git a/dplutils/pipeline/stream.py b/dplutils/pipeline/stream.py index 71c4591..2c5b299 100644 --- a/dplutils/pipeline/stream.py +++ b/dplutils/pipeline/stream.py @@ -11,12 +11,24 @@ @dataclass class StreamBatch: + """Container for task output tracking + + Args: + length: length of dataframe that is referenced by ``data``. This field is + required as in many cases ``data`` will be something that eventually + resolves to a dataframe, but not available to driver. + data: data should contain a reference to a DataFrame in whatever way is + meaningful to implementation. This field is not introspected, only + passed by the framework. + """ length: int data: Any @dataclass class StreamTask: + """Internal task wrapper for :class:`StreamingGraphExecutor` + """ task: PipelineTask data_in: list[StreamBatch] = field(default_factory=deque) pending: list = field(default_factory=deque) @@ -35,6 +47,25 @@ def total_pending(self): class StreamingGraphExecutor(PipelineExecutor, ABC): + """Base class for implementing streaming remote graph execution + + This class implements the :meth:`execute` method of + :class:`PipelineExecutor` and contains logic necessary to schedule tasks, + prioritizing completing those that are closer to terminals. It supports + arbitrary pipeline graphs with branches, multiple inputs and outputs. By + default, for each run, it generates a indefinite stream of input dataframes + tagged with a monotonically incrementing batch id. + + Implementations must override abstract methods for (remote) task submission + and polling. The following must be overriden, see their docs for more: + + - :meth:`is_task_ready` + - :meth:`poll_tasks` + - :meth:`split_batch_submit` + - :meth:`task_resolve_output` + - :meth:`task_submit` + - :meth:`task_submittable` + """ def __init__(self, graph, max_batches=None): super().__init__(graph) self.max_batches = max_batches @@ -155,31 +186,97 @@ def execute_until_output(self): @abstractmethod def task_submit(self, task: PipelineTask, df_list: list[pd.DataFrame]) -> Any: + """Run or arrange for the running of task + + Implementations must override this method and arrange for the function + of ``task`` to be called on a dataframe made from the concatenation of + ``df_list``. The return value will be maintained in a pending queue, and + both ``task_resolve_output`` and ``is_task_ready`` will take these as + input, but will otherwise not be inspected. Typically the return value + would be a handle to the remote result or a future, or equivalent. + + Note: + ``PipelineTask`` expects a single DataFrame as input, while this + function receives a batch of such. It MUST concatenate these into a + single DataFrame prior to execution (e.g. with + ``pd.concat(df_list)``). This is not done in the driver code as the + dataframes in ``df_list`` may not be local. + """ pass @abstractmethod def task_resolve_output(self, pending_task: Any) -> StreamBatch: + """Return a :class:`StreamBatch` from completed task + + This function takes the output produced by either :meth:`task_submit` or + :meth:`split_batch_submit`, and returns a :class:`StreamBatch` object which + tracks the length of returned dataframe(s) and the object which + references the underlying DataFrame. + + The ``data`` member of returned :class:`StreamBatch` will be either: + + - passed to another call of :meth:`task_submit` in a list container, or + - yielded in the :meth:`execute` call (which yields in the user-called + ``run`` method). If any handling must be done prior to yield, + implementation should do so in overloaded :meth:`execute`. + """ pass @abstractmethod def is_task_ready(self, pending_task: Any) -> bool: + """Return true if pending task is ready + + This method takes outputs from :meth:`task_submit` and + :meth:`split_batch_submit` and must return ``True`` if the task is complete + and can be passed to :meth:`task_resolve_output` or ``False`` otherwise. + """ pass @abstractmethod def task_submittable(self, task: PipelineTask, rank: int) -> bool: + """Preflight check if task can be submitted + + Return ``True`` if current conditions enable the ``task`` to be + submitted. The ``rank`` argument is an indicator of relative importance, + and is incremented whenever the pending data for a given tasks meets the + batching requirements as driver walks the task graph backward. Thus + ``Rank=0`` represents the task furthest along and so the highest + priority for submission. + """ pass @abstractmethod def split_batch_submit(self, batch: StreamBatch, max_rows: int) -> Any: + """Submit a task to split batch into at most ``max_rows`` + + Similart to task_submit, implementations should arrange by whatever + means make sense to take the dataframe reference in ``batch.data`` of + :class:`StreamBatch`, given its length in ``batch.length`` and split + into a number of parts that result in no more than ``max_rows`` per + part. The return value should be a list of objects that can be processed + by :meth:`is_task_ready` and :meth:`task_resolve_output`. + """ pass @abstractmethod def poll_tasks(self, pending_task_list: list[Any]) -> None: + """Wait for any change in status to ``pending_task_list`` + + This method will be called after submitting as many tasks as + possible. It gives a chance for implementations to wait in a io-friendly + way, for example by waiting on async futures. The input is a list of + objects as returned by :meth:`task_submit` or :meth:`split_batch_submit`. The + return value is unused. + """ pass class LocalSerialExecutor(StreamingGraphExecutor): - # for testing purposes + """Implementation for reference and testing purposes + + This reference implementation demonstrates expected outputs for abstract + methods, feeding a single batch at a time source to sink in the main thread. + """ sflag = 0 def task_submit(self, pt, df_list): From 4b598ec95782658b87571b4658f722f411dbb5b1 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:23:46 -0500 Subject: [PATCH 2/7] Update dplutils/pipeline/graph.py Co-authored-by: Jamie Noss --- dplutils/pipeline/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index 2551c29..c11679d 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -11,7 +11,7 @@ class TRM(Enum): class PipelineGraph(DiGraph): """Graph of pipeline tasks. - This class adds convenience functionality for task pipeline handing on top + This class adds convenience functionality for task pipeline handling on top of :class:`networkx.DiGraph` on which it is based. Args: From 426a2d2fe8967400f9997aa36292cf2e38e33e4b Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:25:00 -0500 Subject: [PATCH 3/7] Update dplutils/pipeline/graph.py Co-authored-by: Jamie Noss --- dplutils/pipeline/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index c11679d..a1ec477 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -78,7 +78,7 @@ def walk_fwd(self, source=None, sort_key=None): Args: source: starting task of walk, defaults to :attr:`source_tasks` - sort_key: when multiple out-egdes are encountered, sort the yeilded + sort_key: when multiple out-egdes are encountered, sort the yielded tasks in order of callable `sort_key`, which should return a sortable object given :class:`PipelineTask` as input. """ From 24f0c4c0317fec68444239e16761be815fe49055 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:25:14 -0500 Subject: [PATCH 4/7] Update dplutils/pipeline/graph.py Co-authored-by: Jamie Noss --- dplutils/pipeline/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index a1ec477..67205ae 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -72,7 +72,7 @@ def _sort_key(x): def walk_fwd(self, source=None, sort_key=None): """Walk graph forward in breadth-first order from ``source`` - This is a generator that yeilds tasks encountered as it walks along + This is a generator that yields tasks encountered as it walks along edges in the forward direction, starting at ``source``, or at the set of :attr:`source_tasks` if not supplied. From 7afa1c49e83360c03cc45082894a6984624a1369 Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:25:25 -0500 Subject: [PATCH 5/7] Update dplutils/pipeline/graph.py Co-authored-by: Jamie Noss --- dplutils/pipeline/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index 67205ae..c123c26 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -87,7 +87,7 @@ def walk_fwd(self, source=None, sort_key=None): def walk_back(self, source=None, sort_key=None): """Walk graph backward in breadth-first order from ``source`` - This is a generator that yeilds tasks encountered as it walks along + This is a generator that yields tasks encountered as it walks along edges in the reverse direction, starting at ``source``, or at the set of :attr:`sink_tasks` if not supplied. From 2d0fed0066b8deb775a49242af6c496ade38b04a Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:25:37 -0500 Subject: [PATCH 6/7] Update dplutils/pipeline/graph.py Co-authored-by: Jamie Noss --- dplutils/pipeline/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/graph.py b/dplutils/pipeline/graph.py index c123c26..5dd7150 100644 --- a/dplutils/pipeline/graph.py +++ b/dplutils/pipeline/graph.py @@ -93,7 +93,7 @@ def walk_back(self, source=None, sort_key=None): Args: source: starting task of walk, defaults to :attr:`source_tasks` - sort_key: when multiple in-egdes are encountered, sort the yeilded + sort_key: when multiple in-egdes are encountered, sort the yielded tasks in order of callable `sort_key`, which should return a sortable object given :class:`PipelineTask` as input. """ From 53f2768980fe88a1c6232a8c8e4b17b18bd22a0d Mon Sep 17 00:00:00 2001 From: Arik Mitschang Date: Thu, 25 Jan 2024 15:25:46 -0500 Subject: [PATCH 7/7] Update dplutils/pipeline/ray.py Co-authored-by: Jamie Noss --- dplutils/pipeline/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dplutils/pipeline/ray.py b/dplutils/pipeline/ray.py index 08f0b13..683138c 100644 --- a/dplutils/pipeline/ray.py +++ b/dplutils/pipeline/ray.py @@ -138,7 +138,7 @@ class RayStreamGraphExecutor(StreamingGraphExecutor): """Ray-based implementation of stream graph executor. All task outputs are kept in object store and only de-serialized as needed - for execution, until yeilded by :meth:`run`, where they are de-serialized on + for execution, until yielded by :meth:`run`, where they are de-serialized on the driver. This executor will attempt to pack the cluster, irrespective of any other