Skip to content

Commit

Permalink
Adding how to chain subworkflows (flyteorg#753)
Browse files Browse the repository at this point in the history
* Adding how to chain subworkflows

Signed-off-by: Alekhya Sai Punnamaraju <alekhyasaip@gmail.com>

* Updated the docs based on doc stds and moved the code to chain tasks

Signed-off-by: Alekhya Sai Punnamaraju <alekhyasaip@gmail.com>

* Apply suggestions from code review

Co-authored-by: Samhita Alla <aallasamhita@gmail.com>

* Updated as per review suggestions.

Signed-off-by: Alekhya Sai Punnamaraju <alekhyasaip@gmail.com>

* Update cookbook/core/control_flow/chain_tasks.py

Co-authored-by: Samhita Alla <aallasamhita@gmail.com>

* Apply suggestions from code review

Co-authored-by: Samhita Alla <aallasamhita@gmail.com>

* Modifying chain_tasks to chain_entities

Signed-off-by: Alekhya Sai Punnamaraju <alekhyasaip@gmail.com>

* Updated as per review suggestions

Signed-off-by: Alekhya Sai Punnamaraju <alekhyasaip@gmail.com>

* working version

Signed-off-by: Samhita Alla <aallasamhita@gmail.com>

Co-authored-by: Samhita Alla <aallasamhita@gmail.com>
  • Loading branch information
AlekhyaSasi and samhita-alla authored Aug 10, 2022
1 parent 97afc31 commit 1b86bdd
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,9 @@
}
],
"metadata": {
"interpreter": {
"hash": "dc3fb656270592a65285897570586647c2377dd9205211f8c7206e5246caf1a6"
},
"kernelspec": {
"display_name": "Python 3.8.10 64-bit ('feast': pyenv)",
"display_name": "Python 3.10.4 64-bit ('flytesnacks')",
"language": "python",
"name": "python3"
},
"language_info": {
Expand All @@ -296,7 +294,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
"version": "3.10.4"
},
"vscode": {
"interpreter": {
"hash": "3463a4f17fc1455f4f7ef6772298fdddae48e9dcb4cae2b2fb546e84138ea758"
}
}
},
"nbformat": 4,
Expand Down
Empty file.
15 changes: 15 additions & 0 deletions cookbook/core/control_flow/_run/run_chain_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote

from chain_entities import chain_tasks_wf

remote = FlyteRemote(
config=Config.auto(),
default_project="flytesnacks",
default_domain="development",
)

registered_workflow = remote.register_script(chain_tasks_wf)

execution = remote.execute(registered_workflow, inputs={})
print(f"Execution successfully started: {execution.id.name}")
1 change: 1 addition & 0 deletions cookbook/core/control_flow/_run/run_chain_entities.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pyflyte run --remote --image ghcr.io/flyteorg/flytecookbook:feast_integration-latest chain_entities.py chain_tasks_wf
134 changes: 134 additions & 0 deletions cookbook/core/control_flow/chain_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""
Chain Flyte Entities
--------------------
Data passing between tasks or workflows need not necessarily happen through parameters.
In such a case, if you want to explicitly construct the dependency, flytekit provides a mechanism to chain Flyte entities using the ``>>`` operator.
Tasks
^^^^^
Let's enforce an order for ``read()`` to happen after ``write()``, and for ``write()`` to happen after ``create_bucket()``.
.. note::
To run the example locally, spin up the demo cluster using ``flytectl demo start``.
"""

# %%
# Import the necessary dependencies.
import logging
from io import StringIO

import boto3
import pandas as pd
from flytekit import task, workflow

logger = logging.getLogger(__file__)

CSV_FILE = "iris.csv"
BUCKET_NAME = "chain-flyte-entities"


BOTO3_CLIENT = boto3.client(
"s3",
aws_access_key_id="minio",
aws_secret_access_key="miniostorage",
use_ssl=False,
endpoint_url="http://minio.flyte:9000",
)

# %%
# Create an s3 bucket.
# This task exists just for the sandbox case.
@task(cache=True, cache_version="1.0")
def create_bucket():
try:
BOTO3_CLIENT.create_bucket(Bucket=BUCKET_NAME)
except BOTO3_CLIENT.exceptions.BucketAlreadyOwnedByYou:
logger.info(f"Bucket {BUCKET_NAME} has already been created by you.")
pass


# %%
# Define a ``read()`` task to read from the s3 bucket.
@task
def read() -> pd.DataFrame:
data = pd.read_csv(
BOTO3_CLIENT.get_object(Bucket=BUCKET_NAME, Key=CSV_FILE)["Body"]
)
return data


# %%
# Define a ``write()`` task to write the dataframe to a CSV file in the s3 bucket.
@task(cache=True, cache_version="1.0")
def write():
df = pd.DataFrame( # noqa : F841
data={
"sepal_length": [5.3],
"sepal_width": [3.8],
"petal_length": [0.1],
"petal_width": [0.3],
"species": ["setosa"],
}
)
csv_buffer = StringIO()
df.to_csv(csv_buffer)
BOTO3_CLIENT.put_object(
Body=csv_buffer.getvalue(), Bucket=BUCKET_NAME, Key=CSV_FILE
)


# %%
# We want to enforce an order here: ``create_bucket()`` followed by ``write()`` followed by ``read()``.
# Since no data-passing happens between the tasks, use ``>>`` operator on the tasks.
@workflow
def chain_tasks_wf() -> pd.DataFrame:
create_bucket_promise = create_bucket()
write_promise = write()
read_promise = read()

create_bucket_promise >> write_promise
write_promise >> read_promise

return read_promise


# %%
# SubWorkflows
# ^^^^^^^^^^^^
#
# Similar to tasks, you can chain :ref:`subworkflows <subworkflows>`.
@workflow
def write_sub_workflow():
write()


@workflow
def read_sub_workflow() -> pd.DataFrame:
return read()


# %%
# Use ``>>`` to chain the subworkflows.
@workflow
def chain_workflows_wf() -> pd.DataFrame:
create_bucket_promise = create_bucket()
write_sub_wf = write_sub_workflow()
read_sub_wf = read_sub_workflow()

create_bucket_promise >> write_sub_wf
write_sub_wf >> read_sub_wf

return read_sub_wf


#%%
# Run the workflows locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running chain_tasks_wf()... {chain_tasks_wf()}")
print(f"Running chain_workflows_wf()... {chain_workflows_wf()}")

# %%
# .. run-example-cmds::
73 changes: 0 additions & 73 deletions cookbook/core/control_flow/chain_tasks.py

This file was deleted.

Loading

0 comments on commit 1b86bdd

Please sign in to comment.