Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlyteCoPilot: Raw container support [Alpha] #107

Merged
merged 27 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flytekit/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class SdkTaskType(object):
SIDECAR_TASK = "sidecar"
SENSOR_TASK = "sensor-task"
PRESTO_TASK = "presto"
RAW_CONTAINER_TASK = "raw_container"

GLOBAL_INPUT_NODE_ID = ''

Expand Down
12 changes: 5 additions & 7 deletions flytekit/common/tasks/presto_task.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
from __future__ import absolute_import

import datetime as _datetime

from google.protobuf.json_format import MessageToDict as _MessageToDict
from flytekit import __version__

from flytekit import __version__
from flytekit.common import constants as _constants
from flytekit.common import interface as _interface
from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.tasks import task as _base_task
from flytekit.models import (
interface as _interface_model
)
from flytekit.models import literals as _literals, types as _types, \
task as _task_model

from flytekit.common import interface as _interface
import datetime as _datetime
from flytekit.models import presto as _presto_models
from flytekit.common.exceptions.user import \
FlyteValueException as _FlyteValueException
from flytekit.common.exceptions import scopes as _exception_scopes


class SdkPrestoTask(_base_task.SdkTask):
Expand Down
255 changes: 255 additions & 0 deletions flytekit/common/tasks/raw_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
from __future__ import absolute_import

import datetime as _datetime
from typing import Dict, List

from flytekit import __version__
from flytekit.common import constants as _constants
from flytekit.common import interface as _interface
from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.tasks import task as _base_task
from flytekit.common.types.base_sdk_types import FlyteSdkType
from flytekit.configuration import resources as _resource_config
from flytekit.models import literals as _literals, task as _task_models
from flytekit.models.interface import Variable


def types_to_variable(t: Dict[str, FlyteSdkType]) -> Dict[str, Variable]:
var = {}
if t:
for k, v in t.items():
var[k] = Variable(v.to_flyte_literal_type(), "")
return var


def _get_container_definition(
image: str,
command: List[str],
args: List[str],
storage_request: str = None,
cpu_request: str = None,
gpu_request: str = None,
memory_request: str = None,
storage_limit: str = None,
cpu_limit: str = None,
gpu_limit: str = None,
memory_limit: str = None,
environment: Dict[str, str] = None,
) -> _task_models.Container:
storage_limit = storage_limit or _resource_config.DEFAULT_STORAGE_LIMIT.get()
storage_request = storage_request or _resource_config.DEFAULT_STORAGE_REQUEST.get()
cpu_limit = cpu_limit or _resource_config.DEFAULT_CPU_LIMIT.get()
cpu_request = cpu_request or _resource_config.DEFAULT_CPU_REQUEST.get()
gpu_limit = gpu_limit or _resource_config.DEFAULT_GPU_LIMIT.get()
gpu_request = gpu_request or _resource_config.DEFAULT_GPU_REQUEST.get()
memory_limit = memory_limit or _resource_config.DEFAULT_MEMORY_LIMIT.get()
memory_request = memory_request or _resource_config.DEFAULT_MEMORY_REQUEST.get()

requests = []
if storage_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.STORAGE,
storage_request
)
)
if cpu_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.CPU,
cpu_request
)
)
if gpu_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.GPU,
gpu_request
)
)
if memory_request:
requests.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.MEMORY,
memory_request
)
)

limits = []
if storage_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.STORAGE,
storage_limit
)
)
if cpu_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.CPU,
cpu_limit
)
)
if gpu_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.GPU,
gpu_limit
)
)
if memory_limit:
limits.append(
_task_models.Resources.ResourceEntry(
_task_models.Resources.ResourceName.MEMORY,
memory_limit
)
)

if environment is None:
environment = {}

return _task_models.Container(
image=image,
command=command,
args=args,
resources=_task_models.Resources(limits=limits, requests=requests),
env=environment,
config={}
)


class SdkRawContainerTask(_base_task.SdkTask):
"""
This class includes the logic for building a task that executes as a Presto task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a few paragraphs of documentation here or somewhere else on 1) how to use this and 2) how it works?

For 1) I've left you a page to fill in here: https://github.com/lyft/flytesnacks/blob/cookbook/cookbook/workflows/recipe_5/index.rst

Feel free to check out that branch and commit... or make your own PR if you want the street cred.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :D will do

"""

METADATA_FORMAT_PROTO = "pb"
METADATA_FORMAT_JSON = "json"
METADATA_FORMAT_YAML = "yaml"
_METADATA_FORMAT = frozenset([METADATA_FORMAT_JSON, METADATA_FORMAT_PROTO, METADATA_FORMAT_YAML])

def __init__(
self,
inputs: Dict[str, FlyteSdkType],
image: str,
outputs: Dict[str, FlyteSdkType]=None,
input_data_dir: str = None,
output_data_dir: str = None,
metadata_format: str = METADATA_FORMAT_JSON,
command: List[str] = None,
args: List[str] = None,
storage_request: str = None,
cpu_request: str = None,
gpu_request: str = None,
memory_request: str = None,
storage_limit: str = None,
cpu_limit: str = None,
gpu_limit: str = None,
memory_limit: str = None,
environment: Dict[str, str] = None,
interruptible: bool = False,
discoverable: bool = False,
discovery_version: str = None,
retries: int = 1,
timeout: _datetime.timedelta = None,
):
"""
:param inputs:
:param outputs:
:param image:
:param command:
:param args:
:param storage_request:
:param cpu_request:
:param gpu_request:
:param memory_request:
:param storage_limit:
:param cpu_limit:
:param gpu_limit:
:param memory_limit:
:param environment:
:param interruptible:
:param discoverable:
:param discovery_version:
:param retries:
:param timeout:
:param input_data_dir: This is the directory where data will be downloaded to
:param output_data_dir: This is the directory where data will be uploaded from
:param metadata_format: Format in which the metadata will be available for the script
"""

# Set as class fields which are used down below to configure implicit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these aren't class fields though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.input_data_dir = input_data_dir self.output_data_dir = output_data_dir self.metadata_format = metadata_format

# parameters
self.input_data_dir = input_data_dir
self.output_data_dir = output_data_dir
self.metadata_format = metadata_format
if metadata_format not in self._METADATA_FORMAT:
raise Exception("{} Illegal metadata format, only [{}] metadata formats are supported".format(metadata_format, ",".join(self._METADATA_FORMAT)))

metadata = _task_models.TaskMetadata(
discoverable,
# This needs to have the proper version reflected in it
_task_models.RuntimeMetadata(
_task_models.RuntimeMetadata.RuntimeType.FLYTE_SDK, __version__,
"python"),
timeout or _datetime.timedelta(seconds=0),
_literals.RetryStrategy(retries),
interruptible,
discovery_version,
"This is deprecated!"
)

# Here we set the routing_group, catalog, and schema as implicit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover comment from presto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true!

# parameters for caching purposes
i = _interface.TypedInterface(inputs=types_to_variable(inputs), outputs=types_to_variable(outputs))

# TODO create custom proto to store data dir and other things
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no not remove, add the custom proto I guess?


super(SdkRawContainerTask, self).__init__(
_constants.SdkTaskType.RAW_CONTAINER_TASK,
metadata,
i,
None,
container=_get_container_definition(
image=image,
args=args,
command=command,
storage_request=storage_request,
cpu_request=cpu_request,
gpu_request=gpu_request,
memory_request=memory_request,
storage_limit=storage_limit,
cpu_limit=cpu_limit,
gpu_limit=gpu_limit,
memory_limit=memory_limit,
environment=environment,
)
)

# Override method in order to set the implicit inputs
def __call__(self, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

return super(SdkRawContainerTask, self).__call__(
*args, **kwargs
)

@_exception_scopes.system_entry_point
def add_inputs(self, inputs: Dict[str, Variable]):
"""
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
the wrapped function.
:param dict[Text, flytekit.models.interface.Variable] inputs: names and variables
"""
self._validate_inputs(inputs)
self.interface.inputs.update(inputs)

@_exception_scopes.system_entry_point
def add_outputs(self, outputs: Dict[str, Variable]):
"""
Adds the inputs to this task. This can be called multiple times, but it will fail if an input with a given
name is added more than once, a name collides with an output, or if the name doesn't exist as an arg name in
the wrapped function.
:param dict[Text, flytekit.models.interface.Variable] outputs: names and variables
"""
self._validate_outputs(outputs)
self.interface.outputs.update(outputs)
25 changes: 25 additions & 0 deletions tests/flytekit/common/workflows/raw_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import absolute_import, division, print_function

from flytekit.common.tasks.raw_container import SdkRawContainerTask
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Input

square = SdkRawContainerTask(
inputs={"val": Types.Integer},
outputs={"out": Types.Integer},
image="alpine",
command=["sh", "-c", "cd /var/flyte/data; mkdir outputs; paste ./inputs/val | awk '{print ($1 * $1)}' > ./outputs/out"],
)

echo = SdkRawContainerTask(
inputs={"x": Types.Integer},
image="alpine",
command=["echo", "{{.inputs.x}}"],
)


@workflow_class
class RawContainerWorkflow(object):
val = Input(Types.Integer)
sq = square(val=val)
ec = echo(x=sq.outputs.out)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from flytekit.common.tasks.raw_container import SdkRawContainerTask
from flytekit.sdk.types import Types


def test_raw_container_task_definition():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do a test in your test harness, and maybe add one here, where you have a workflow with two of these raw container tasks and the second one consumes the output of the first one? Or is that not possible to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible, good point. I actually have a real workflow, just scroll up. But i will write an test workflow too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i cannot write a test today, as it will execute it locally, which we do not want

tk = SdkRawContainerTask(
inputs={"x": Types.Integer},
outputs={"y": Types.Integer},
image="my-image",
command=["echo", "hello, world!"],
gpu_limit="1",
gpu_request="1",
)
assert not tk.serialize() is None



def test_raw_container_task_definition_no_outputs():
tk = SdkRawContainerTask(
inputs={"x": Types.Integer},
image="my-image",
command=["echo", "hello, world!"],
gpu_limit="1",
gpu_request="1",
)
assert not tk.serialize() is None