Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @kubernetes decorator #17248

Merged
merged 5 commits into from
Feb 25, 2025
Merged

Add @kubernetes decorator #17248

merged 5 commits into from
Feb 25, 2025

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented Feb 22, 2025

This PR introduces a @kubernetes decorator that will bind a flow to run in a given work pool.

Here's what it looks like in action:

import asyncio

from prefect_kubernetes.experimental.decorators import kubernetes

from prefect import flow

# Thes are set on the work pool
UPLOAD_STEP = {
    "prefect_aws.experimental.bundles.upload": {
        "requires": "prefect-aws",
        "bucket": "test-bucket",
        "aws_credentials_block_name": "my-creds",
    }
}

EXECUTE_STEP = {
    "prefect_aws.experimental.bundles.execute": {
        "requires": "prefect-aws",
        "bucket": "test-bucket",
        "aws_credentials_block_name": "my-creds",
    }
}

# Results must be configured to retrieve the return value
@kubernetes(work_pool="olympic")
@flow(log_prints=True, result_storage="s3-bucket/results-bucket") 
def my_flow(name: str) -> str:
    print("Returning greeting...")
    return f"Hello, {name}!"


async def main():
    print(my_flow(name="John"))


if __name__ == "__main__":
    asyncio.run(main())

The goal of this decorator is to allow decorating flows and bind them to running in a Kubernetes cluster, but not require any code changes where the flow is called. This is achieved in this PR except for a few exceptions:

  1. You must configure result storage on the flow or execution will fail when attempting to retrieve the return value for the flow.
  2. The wait_for kwarg is unused right now.

Additional work will be necessary to address these limitations.

Related to #17218

future = await worker.submit(
flow=flow, parameters=parameters, job_variables=job_variables
)
return await future.aresult()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note from discussion: we probably want to consider adding a .submit method to the decorated flow object that returns this future directly when called and otherwise, when the new flow-object is called directly it automatically resolves the future as it does here.

Comment on lines 164 to 170
def decorator(flow: Flow[P, R]) -> InfrastructureBoundFlow[P, R]:
return InfrastructureBoundFlow.from_flow(
flow,
work_pool=work_pool,
job_variables=job_variables,
worker_cls=KubernetesWorker,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking trim!

@desertaxle desertaxle marked this pull request as ready for review February 25, 2025 17:24
Copy link
Collaborator

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!



def kubernetes(
work_pool: str, **job_variables: Any
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be nice to have a TypedDict here to Unpack someday, not sure how we'd manage that, but just throwing it out there

Suggested change
work_pool: str, **job_variables: Any
work_pool: str, **job_variables: Any

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see a world where we can generate a decorator for a specific work pool that includes the job variables as named and type kwargs. We're probably a ways off from that though.

Base automatically changed from k8s-worker-submit to main February 25, 2025 18:00
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love it

@desertaxle desertaxle merged commit cc5b133 into main Feb 25, 2025
14 checks passed
@desertaxle desertaxle deleted the kubernetes-decorator branch February 25, 2025 18:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants