Skip to content

Commit

Permalink
Add docstrings for new stream class, update sphinx docs
Browse files Browse the repository at this point in the history
  • Loading branch information
amitschang committed Jan 25, 2024
1 parent 05b00bb commit 71598d1
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 13 deletions.
6 changes: 6 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ API
api_pipeline.rst
api_observers.rst
api_cli.rst


.. toctree::
:hidden:

api_streaming.rst
9 changes: 8 additions & 1 deletion docs/source/api_pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ Backbone
:toctree: generated

PipelineExecutor
PipelineTask
PipelineGraph

Executors
---------

.. currentmodule:: dplutils.pipeline.stream
.. autosummary::
:toctree: generated

PipelineTask
StreamingGraphExecutor

Ray Execution
-------------
Expand All @@ -24,3 +30,4 @@ Ray Execution
:toctree: generated

RayDataPipelineExecutor
RayStreamGraphExecutor
10 changes: 10 additions & 0 deletions docs/source/api_streaming.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.. currentmodule:: dplutils.pipeline.stream
.. autosummary::
:toctree: generated

StreamingGraphExecutor.is_task_ready
StreamingGraphExecutor.poll_tasks
StreamingGraphExecutor.split_batch_submit
StreamingGraphExecutor.task_resolve_output
StreamingGraphExecutor.task_submit
StreamingGraphExecutor.task_submittable
4 changes: 4 additions & 0 deletions docs/source/command_line.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Executing via CLI
=================

TBD
4 changes: 4 additions & 0 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Configuring Pipelines
=====================

TBD
4 changes: 4 additions & 0 deletions docs/source/execution.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Executing Pipelines
===================

TBD
4 changes: 4 additions & 0 deletions docs/source/extending.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Extending Functionality
=======================

TBD
9 changes: 4 additions & 5 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
Welcome to Opsins Discovery's documentation!
============================================
Welcome to dplutils documentation!
==================================

**dplutils** is a Python library for...
**dplutils** is a Python library for remote execution of compute intensive streaming task graphs.

Check out the :doc:`usage` section for further information, including
how to :ref:`installation` the project.
Check out the :doc:`quickstart` to get going immediately, :doc:`usage` section for further information, and the :doc:`api` documentation for details the the classes and methods available.

.. note::

Expand Down
8 changes: 7 additions & 1 deletion docs/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ Quick Start
Installation
------------

TBD
Using pip:

pip install dplutils

Using docker:

docker pull ghcr.io/ssec-jhu/dplutils:latest

Define Pipeline
---------------
Expand Down
4 changes: 4 additions & 0 deletions docs/source/tasks_graphs.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Tasks and Pipeline Graphs
=========================

TBD
12 changes: 7 additions & 5 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
Usage
=====

.. _installation:
.. toctree::
:maxdepth: 1

Installation
------------

To use dplutils, first install it using...:
tasks_graphs
configuration
execution
command_line
extending
35 changes: 35 additions & 0 deletions dplutils/pipeline/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ class TRM(Enum):


class PipelineGraph(DiGraph):
"""Graph of pipeline tasks.
This class adds convenience functionality for task pipeline handing on top
of :class:`networkx.DiGraph` on which it is based.
Args:
graph: This is either a list of :class:`PipelineTask` objects representing a
simple-graph, or anything that is legal input to :class:`networkx.DiGraph`.
"""
def __init__(self, graph=None):
if isinstance(graph, list) and isinstance(graph[0], PipelineTask):
graph = path_graph(graph, DiGraph)
Expand All @@ -29,6 +38,8 @@ def sink_tasks(self):
return [n for n,d in self.out_degree() if d == 0]

def to_list(self):
"""Return list representation of task iff it is a simple-path graph
"""
if len(self.source_tasks) != 1 or len(self.sink_tasks) != 1:
raise ValueError('to_list requires a graph with only one start and end task')
source = self.source_tasks[0]
Expand Down Expand Up @@ -59,7 +70,31 @@ def _sort_key(x):
yield node

def walk_fwd(self, source=None, sort_key=None):
"""Walk graph forward in breadth-first order from ``source``
This is a generator that yeilds tasks encountered as it walks along
edges in the forward direction, starting at ``source``, or at the set of
:attr:`source_tasks` if not supplied.
Args:
source: starting task of walk, defaults to :attr:`source_tasks`
sort_key: when multiple out-egdes are encountered, sort the yeilded
tasks in order of callable `sort_key`, which should return a
sortable object given :class:`PipelineTask` as input.
"""
return self._walk(source or TRM.source, back=False, sort_key=sort_key)

def walk_back(self, source=None, sort_key=None):
"""Walk graph backward in breadth-first order from ``source``
This is a generator that yeilds tasks encountered as it walks along
edges in the reverse direction, starting at ``source``, or at the set of
:attr:`sink_tasks` if not supplied.
Args:
source: starting task of walk, defaults to :attr:`source_tasks`
sort_key: when multiple in-egdes are encountered, sort the yeilded
tasks in order of callable `sort_key`, which should return a
sortable object given :class:`PipelineTask` as input.
"""
return self._walk(source or TRM.sink, back=True, sort_key=sort_key)
19 changes: 19 additions & 0 deletions dplutils/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ class RemoteTracker:


class RayStreamGraphExecutor(StreamingGraphExecutor):
"""Ray-based implementation of stream graph executor.
All task outputs are kept in object store and only de-serialized as needed
for execution, until yeilded by :meth:`run`, where they are de-serialized on
the driver.
This executor will attempt to pack the cluster, irrespective of any other
workloads.
Note:
Ray cluster will be initialized with defaults upon run if it hasn't
already been initialized
Args:
ray_poll_timeout: After scheduling as many tasks as can fit, ray.wait on
all pending tasks for ray_poll_timeout seconds. The timeout gives
opportunity to re-evaluate cluster resources in case it has expanded
since last scheduling loop
"""
def __init__(self, *args, ray_poll_timeout: int = 20, **kwargs):
super().__init__(*args, **kwargs)
# bootstrap remote tasks at initialization and keep reference - this is
Expand Down
99 changes: 98 additions & 1 deletion dplutils/pipeline/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@

@dataclass
class StreamBatch:
"""Container for task output tracking
Args:
length: length of dataframe that is referenced by ``data``. This field is
required as in many cases ``data`` will be something that eventually
resolves to a dataframe, but not available to driver.
data: data should contain a reference to a DataFrame in whatever way is
meaningful to implementation. This field is not introspected, only
passed by the framework.
"""
length: int
data: Any


@dataclass
class StreamTask:
"""Internal task wrapper for :class:`StreamingGraphExecutor`
"""
task: PipelineTask
data_in: list[StreamBatch] = field(default_factory=deque)
pending: list = field(default_factory=deque)
Expand All @@ -35,6 +47,25 @@ def total_pending(self):


class StreamingGraphExecutor(PipelineExecutor, ABC):
"""Base class for implementing streaming remote graph execution
This class implements the :meth:`execute` method of
:class:`PipelineExecutor` and contains logic necessary to schedule tasks,
prioritizing completing those that are closer to terminals. It supports
arbitrary pipeline graphs with branches, multiple inputs and outputs. By
default, for each run, it generates a indefinite stream of input dataframes
tagged with a monotonically incrementing batch id.
Implementations must override abstract methods for (remote) task submission
and polling. The following must be overriden, see their docs for more:
- :meth:`is_task_ready`
- :meth:`poll_tasks`
- :meth:`split_batch_submit`
- :meth:`task_resolve_output`
- :meth:`task_submit`
- :meth:`task_submittable`
"""
def __init__(self, graph, max_batches=None):
super().__init__(graph)
self.max_batches = max_batches
Expand Down Expand Up @@ -155,31 +186,97 @@ def execute_until_output(self):

@abstractmethod
def task_submit(self, task: PipelineTask, df_list: list[pd.DataFrame]) -> Any:
"""Run or arrange for the running of task
Implementations must override this method and arrange for the function
of ``task`` to be called on a dataframe made from the concatenation of
``df_list``. The return value will be maintained in a pending queue, and
both ``task_resolve_output`` and ``is_task_ready`` will take these as
input, but will otherwise not be inspected. Typically the return value
would be a handle to the remote result or a future, or equivalent.
Note:
``PipelineTask`` expects a single DataFrame as input, while this
function receives a batch of such. It MUST concatenate these into a
single DataFrame prior to execution (e.g. with
``pd.concat(df_list)``). This is not done in the driver code as the
dataframes in ``df_list`` may not be local.
"""
pass

@abstractmethod
def task_resolve_output(self, pending_task: Any) -> StreamBatch:
"""Return a :class:`StreamBatch` from completed task
This function takes the output produced by either :meth:`task_submit` or
:meth:`split_batch_submit`, and returns a :class:`StreamBatch` object which
tracks the length of returned dataframe(s) and the object which
references the underlying DataFrame.
The ``data`` member of returned :class:`StreamBatch` will be either:
- passed to another call of :meth:`task_submit` in a list container, or
- yielded in the :meth:`execute` call (which yields in the user-called
``run`` method). If any handling must be done prior to yield,
implementation should do so in overloaded :meth:`execute`.
"""
pass

@abstractmethod
def is_task_ready(self, pending_task: Any) -> bool:
"""Return true if pending task is ready
This method takes outputs from :meth:`task_submit` and
:meth:`split_batch_submit` and must return ``True`` if the task is complete
and can be passed to :meth:`task_resolve_output` or ``False`` otherwise.
"""
pass

@abstractmethod
def task_submittable(self, task: PipelineTask, rank: int) -> bool:
"""Preflight check if task can be submitted
Return ``True`` if current conditions enable the ``task`` to be
submitted. The ``rank`` argument is an indicator of relative importance,
and is incremented whenever the pending data for a given tasks meets the
batching requirements as driver walks the task graph backward. Thus
``Rank=0`` represents the task furthest along and so the highest
priority for submission.
"""
pass

@abstractmethod
def split_batch_submit(self, batch: StreamBatch, max_rows: int) -> Any:
"""Submit a task to split batch into at most ``max_rows``
Similart to task_submit, implementations should arrange by whatever
means make sense to take the dataframe reference in ``batch.data`` of
:class:`StreamBatch`, given its length in ``batch.length`` and split
into a number of parts that result in no more than ``max_rows`` per
part. The return value should be a list of objects that can be processed
by :meth:`is_task_ready` and :meth:`task_resolve_output`.
"""
pass

@abstractmethod
def poll_tasks(self, pending_task_list: list[Any]) -> None:
"""Wait for any change in status to ``pending_task_list``
This method will be called after submitting as many tasks as
possible. It gives a chance for implementations to wait in a io-friendly
way, for example by waiting on async futures. The input is a list of
objects as returned by :meth:`task_submit` or :meth:`split_batch_submit`. The
return value is unused.
"""
pass


class LocalSerialExecutor(StreamingGraphExecutor):
# for testing purposes
"""Implementation for reference and testing purposes
This reference implementation demonstrates expected outputs for abstract
methods, feeding a single batch at a time source to sink in the main thread.
"""
sflag = 0

def task_submit(self, pt, df_list):
Expand Down

0 comments on commit 71598d1

Please sign in to comment.