Skip to content

Commit

Permalink
Support for workflow ID conflict policy (#579)
Browse files Browse the repository at this point in the history
Fixes #504
  • Loading branch information
cretz authored Jul 12, 2024
1 parent bcbacc2 commit c57df81
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
21 changes: 21 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -328,6 +329,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -360,6 +362,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -392,6 +395,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -422,6 +426,7 @@ async def start_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -455,6 +460,10 @@ async def start_workflow(
run_timeout: Timeout of a single workflow run.
task_timeout: Timeout of a single workflow task.
id_reuse_policy: How already-existing IDs are treated.
id_conflict_policy: How already-running workflows of the same ID are
treated. Default is unspecified which effectively means fail the
start attempt. This cannot be set if ``id_reuse_policy`` is set
to terminate if running.
retry_policy: Retry policy for the workflow.
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
Expand Down Expand Up @@ -510,6 +519,7 @@ async def start_workflow(
run_timeout=run_timeout,
task_timeout=task_timeout,
id_reuse_policy=id_reuse_policy,
id_conflict_policy=id_conflict_policy,
retry_policy=retry_policy,
cron_schedule=cron_schedule,
memo=memo,
Expand Down Expand Up @@ -537,6 +547,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -567,6 +578,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -599,6 +611,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -631,6 +644,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -661,6 +675,7 @@ async def execute_workflow(
run_timeout: Optional[timedelta] = None,
task_timeout: Optional[timedelta] = None,
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
Expand Down Expand Up @@ -696,6 +711,7 @@ async def execute_workflow(
run_timeout=run_timeout,
task_timeout=task_timeout,
id_reuse_policy=id_reuse_policy,
id_conflict_policy=id_conflict_policy,
retry_policy=retry_policy,
cron_schedule=cron_schedule,
memo=memo,
Expand Down Expand Up @@ -4487,6 +4503,7 @@ class StartWorkflowInput:
run_timeout: Optional[timedelta]
task_timeout: Optional[timedelta]
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy
retry_policy: Optional[temporalio.common.RetryPolicy]
cron_schedule: str
memo: Optional[Mapping[str, Any]]
Expand Down Expand Up @@ -5008,6 +5025,10 @@ async def start_workflow(
"temporalio.api.enums.v1.WorkflowIdReusePolicy.ValueType",
int(input.id_reuse_policy),
)
req.workflow_id_conflict_policy = cast(
"temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType",
int(input.id_conflict_policy),
)
if input.retry_policy is not None:
input.retry_policy.apply_to_proto(req.retry_policy)
req.cron_schedule = input.cron_schedule
Expand Down
20 changes: 20 additions & 0 deletions temporalio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ class WorkflowIDReusePolicy(IntEnum):
)


class WorkflowIDConflictPolicy(IntEnum):
"""How already-running workflows of the same ID are handled on start.
See :py:class:`temporalio.api.enums.v1.WorkflowIdConflictPolicy`.
"""

UNSPECIFIED = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
)
FAIL = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL
)
USE_EXISTING = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
)
TERMINATE_EXISTING = int(
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
)


class QueryRejectCondition(IntEnum):
"""Whether a query should be rejected in certain conditions.
Expand Down
67 changes: 67 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
SearchAttributes,
SearchAttributeValues,
TypedSearchAttributes,
WorkflowIDConflictPolicy,
)
from temporalio.converter import (
DataConverter,
Expand Down Expand Up @@ -5505,3 +5506,69 @@ def _unfinished_handler_warning_cls(self) -> Type:
"update": workflow.UnfinishedUpdateHandlersWarning,
"signal": workflow.UnfinishedSignalHandlersWarning,
}[self.handler_type]


@workflow.defn
class IDConflictWorkflow:
# Just run forever
@workflow.run
async def run(self) -> None:
await workflow.wait_condition(lambda: False)


async def test_workflow_id_conflict(client: Client):
async with new_worker(client, IDConflictWorkflow) as worker:
# Start a workflow
handle = await client.start_workflow(
IDConflictWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, handle.id, run_id=handle.result_run_id
)

# Confirm another fails by default
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
)

# Confirm fails if explicitly given that option
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
)

# Confirm gives back same handle if requested
new_handle = await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
)
new_handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
)
assert new_handle.run_id == handle.run_id
assert (await handle.describe()).status == WorkflowExecutionStatus.RUNNING
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING

# Confirm terminates and starts new if requested
new_handle = await client.start_workflow(
IDConflictWorkflow.run,
id=handle.id,
task_queue=worker.task_queue,
id_conflict_policy=WorkflowIDConflictPolicy.TERMINATE_EXISTING,
)
new_handle = client.get_workflow_handle_for(
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
)
assert new_handle.run_id != handle.run_id
assert (await handle.describe()).status == WorkflowExecutionStatus.TERMINATED
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING

0 comments on commit c57df81

Please sign in to comment.