Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe to state changes in wait_for_flow_run #17243

Merged
merged 7 commits into from
Feb 27, 2025

Conversation

bnaul
Copy link
Contributor

@bnaul bnaul commented Feb 21, 2025

Sort of a precursor to #17228; the current implementation polls for state changes (and logs on each poll, even when the state has not changed). As a half step towards also streaming other events / logs, I wanted to start by just swapping the polling for an event subscriber. The result is almost the same except for de-duping the repeated logs from the same state.

I went back and forth on whether to remove the poll_interval/watch_interval argument or warn that it's deprecated or just have it do nothing, I'm not sure if you have a specific policy on little CLI changes like that but either seems fine to me.

Before:

Creating flow run for deployment 'hello-flow/hello-flow'...
Created flow run 'belligerent-lobster'.
└── UUID: 322266a2-b2df-488f-9d84-6db32230c8a7
└── Parameters: {}
└── Job Variables: {}
└── Scheduled start time: 2025-02-21 17:15:28 EST (now)
└── URL: https://app.prefect.cloud/account/1e4d7e04-0fb7-4aa3-8ef5-746e9f404f4f/workspace/849a6829-4afb-48c3-9cc2-2dc6b262fd9c/runs/flow-run/322266a2-b2df-488f-9d84-6db32230c8a7
Watching flow run 'belligerent-lobster'...
17:15:28.614 | INFO    | prefect - Flow run is in state 'Scheduled'
17:15:33.718 | INFO    | prefect - Flow run is in state 'Scheduled'
17:15:38.815 | INFO    | prefect - Flow run is in state 'Pending'
17:15:43.932 | INFO    | prefect - Flow run is in state 'Pending'
17:15:49.024 | INFO    | prefect - Flow run is in state 'Pending'
17:15:54.166 | INFO    | prefect - Flow run is in state 'Pending'
17:15:59.262 | INFO    | prefect - Flow run is in state 'Pending'
17:16:04.464 | INFO    | prefect - Flow run is in state 'Pending'
17:16:09.576 | INFO    | prefect - Flow run is in state 'Completed'
Flow run finished successfully in 'Completed'.

After:

prefect deployment run hello-flow/hello-flow --watch
Creating flow run for deployment 'hello-flow/hello-flow'...
Created flow run 'petite-firefly'.
└── UUID: 688e205f-588d-4503-912e-fd25864e0ea2
└── Parameters: {}
└── Job Variables: {}
└── Scheduled start time: 2025-02-21 17:14:57 EST (now)
└── URL: https://app.prefect.cloud/account/1e4d7e04-0fb7-4aa3-8ef5-746e9f404f4f/workspace/849a6829-4afb-48c3-9cc2-2dc6b262fd9c/runs/flow-run/688e205f-588d-4503-912e-fd25864e0ea2
Watching flow run 'petite-firefly'...
17:14:59.288 | INFO    | prefect - Flow run is in state 'Scheduled'
17:14:59.667 | INFO    | prefect - Flow run is in state 'Pending'
17:15:42.142 | INFO    | prefect - Flow run is in state 'Running'
17:16:21.295 | INFO    | prefect - Flow run is in state 'Completed'
Flow run finished successfully in 'Completed'.

@github-actions github-actions bot added the enhancement An improvement of an existing feature label Feb 21, 2025
@@ -116,15 +118,30 @@ async def main(num_runs: int):
"""
assert client is not None, "Client injection failed"
logger = get_logger()

flow_run = await client.read_flow_run(flow_run_id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't know that this is even necessary to check before we enter the subscriber iterable, maybe we can remove this whole block?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think to avoid any missed events / race conditions due to the time it takes to make this call, we should check for a final state immediately after entering the subscriber (but before the async for) , but yeah we should be able to remove that first one outside the subscriber

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, let me know if the new version is what you had in mind

Copy link

codspeed-hq bot commented Feb 21, 2025

CodSpeed Performance Report

Merging #17243 will not alter performance

Comparing bnaul:watch_flow_run_stream (be1ba63) with main (abef75f)

Summary

✅ 2 untouched benchmarks

Copy link
Collaborator

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the PR @bnaul 🎉

a couple comments but I really like the idea of the change. For later, I think there's some fun nuance with the eventual goal of interleaving logs and events since I think we'll need 2 async iterators (events come from websocket, logs from REST)

happy to hash out details in the issue you opened!

@@ -116,15 +118,30 @@ async def main(num_runs: int):
"""
assert client is not None, "Client injection failed"
logger = get_logger()

flow_run = await client.read_flow_run(flow_run_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I think to avoid any missed events / race conditions due to the time it takes to make this call, we should check for a final state immediately after entering the subscriber (but before the async for) , but yeah we should be able to remove that first one outside the subscriber

@@ -54,7 +56,6 @@
async def wait_for_flow_run(
flow_run_id: UUID,
timeout: int | None = 10800,
poll_interval: int = 5,
Copy link
Collaborator

@zzstoatzz zzstoatzz Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically this is a breaking change (unfortunately since its likely infrequently used) so I think we should keep it and if a non-default value is provided we log some warning to say it has no effect and will be removed in a future release

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, that's actually how I had it before so I just reverted 53a1242

@bnaul
Copy link
Contributor Author

bnaul commented Feb 26, 2025

For later, I think there's some fun nuance with the eventual goal of interleaving logs and events since I think we'll need 2 async iterators (events come from websocket, logs from REST)

Yeah I did not realize that until digging into the subscriber stuff a bit more...I think this is a nice little change on its own but the full goal of #17228 is a little more than I initially bargained for 😅

Copy link
Collaborator

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet! this LGTM, thanks for the PR @bnaul 🎉

for reference, I'll put a WIP of the interleaving stuff that more or less does the right thing in the original issue in case you do end up wanting to tackle that

@zzstoatzz
Copy link
Collaborator

@bnaul oh also, please run pre-commit run --all-files so we can pass this static analysis check

@bnaul
Copy link
Contributor Author

bnaul commented Feb 26, 2025

thanks @zzstoatzz, believe those are both taken care of!

@zzstoatzz zzstoatzz merged commit 29d8afd into PrefectHQ:main Feb 27, 2025
45 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants