Skip to content

Commit

Permalink
Support workflow start delay
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Oct 24, 2023
1 parent 4242dfb commit 67a46a7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
16 changes: 16 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -299,6 +300,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -325,6 +327,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -351,6 +354,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -375,6 +379,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -400,6 +405,7 @@ async def start_workflow(
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
search_attributes: Search attributes for the workflow.
start_delay: Amount of time to wait before starting the workflow.
start_signal: If present, this signal is sent as signal-with-start
instead of traditional workflow start.
start_signal_args: Arguments for start_signal if start_signal
Expand Down Expand Up @@ -444,6 +450,7 @@ async def start_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
start_delay=start_delay,
headers={},
start_signal=start_signal,
start_signal_args=start_signal_args,
Expand All @@ -469,6 +476,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -493,6 +501,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -519,6 +528,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -545,6 +555,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -569,6 +580,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand Down Expand Up @@ -597,6 +609,7 @@ async def execute_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
start_delay=start_delay,
start_signal=start_signal,
start_signal_args=start_signal_args,
rpc_metadata=rpc_metadata,
Expand Down Expand Up @@ -3753,6 +3766,7 @@ class StartWorkflowInput:
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
start_delay: Optional[timedelta]
headers: Mapping[str, temporalio.api.common.v1.Payload]
start_signal: Optional[str]
start_signal_args: Sequence[Any]
Expand Down Expand Up @@ -4233,6 +4247,8 @@ async def start_workflow(
temporalio.converter.encode_search_attributes(
input.search_attributes, req.search_attributes
)
if input.start_delay is not None:
req.workflow_start_delay.FromTimedelta(input.start_delay)
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.header.fields)

Expand Down
23 changes: 23 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,29 @@ async def test_start_with_signal(client: Client, worker: ExternalWorker):
assert "some signal arg" == await handle.result()


async def test_start_delay(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Java test server does not support start delay")
start_delay = timedelta(hours=1, minutes=20, seconds=30)
handle = await client.start_workflow(
"kitchen_sink",
KSWorkflowParams(
actions=[KSAction(result=KSResultAction(value="some result"))]
),
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
start_delay=start_delay,
)
# Check that first event has start delay
first_event = [e async for e in handle.fetch_history_events()][0]
assert (
start_delay
== first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta()
)


async def test_result_follow_continue_as_new(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
Expand Down

0 comments on commit 67a46a7

Please sign in to comment.