From 54625225302d11463f11c0cdb7db6d2fca53e123 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 16 Aug 2024 16:05:48 -0700 Subject: [PATCH] Job ordering updates (#789) Renamed StartWorkflow to InitializeWorkflow Now ordering is `init workflow -> patches -> random-seed-updates -> signals/updates -> other -> queries -> evictions` --- core/src/core_tests/activity_tasks.rs | 2 +- core/src/core_tests/local_activities.rs | 4 +- core/src/core_tests/queries.rs | 4 +- core/src/core_tests/updates.rs | 4 +- core/src/core_tests/workflow_cancels.rs | 6 +- core/src/core_tests/workflow_tasks.rs | 83 +++++----- core/src/ephemeral_server/mod.rs | 8 +- .../machines/activity_state_machine.rs | 2 +- .../machines/cancel_workflow_state_machine.rs | 2 +- .../workflow/machines/patch_state_machine.rs | 5 +- core/src/worker/workflow/mod.rs | 20 ++- .../workflow_activation.proto | 10 +- sdk-core-protos/src/lib.rs | 10 +- sdk/src/lib.rs | 4 +- sdk/src/workflow_context.rs | 6 + sdk/src/workflow_future.rs | 20 +-- test-utils/src/lib.rs | 3 + tests/integ_tests/queries_tests.rs | 78 ++-------- tests/integ_tests/update_tests.rs | 25 +-- tests/integ_tests/visibility_tests.rs | 2 +- tests/integ_tests/workflow_tests.rs | 2 +- tests/integ_tests/workflow_tests/resets.rs | 143 +++++++++++++++++- .../workflow_tests/upsert_search_attrs.rs | 32 ++-- tests/runner.rs | 9 +- 24 files changed, 296 insertions(+), 188 deletions(-) diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index 13c4eb339..46f92ecad 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -462,7 +462,7 @@ async fn activity_timeout_no_double_resolve() { WorkflowCachingPolicy::NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_id, activity_id: activity_id.to_string(), diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 74b1377ed..db40ac309 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -636,7 +636,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer assert_matches!( task.jobs.as_slice(), &[WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), },] ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( @@ -1239,7 +1239,7 @@ async fn queries_can_be_received_while_heartbeating() { assert_matches!( task.jobs.as_slice(), &[WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), },] ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( diff --git a/core/src/core_tests/queries.rs b/core/src/core_tests/queries.rs index b1a70c9a1..e8847ac8d 100644 --- a/core/src/core_tests/queries.rs +++ b/core/src/core_tests/queries.rs @@ -474,7 +474,7 @@ async fn query_cache_miss_causes_page_fetch_dont_reply_wft_too_early( assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( @@ -696,7 +696,7 @@ async fn new_query_fail() { assert_matches!( task.jobs[0], WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), } ); diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index 5ce1bdf07..a33fb6172 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -46,10 +46,10 @@ async fn replay_with_empty_first_task() { task.jobs.as_slice(), [ WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::DoUpdate(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }, WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::DoUpdate(_)), }, ] ); diff --git a/core/src/core_tests/workflow_cancels.rs b/core/src/core_tests/workflow_cancels.rs index 895784d36..b88e28498 100644 --- a/core/src/core_tests/workflow_cancels.rs +++ b/core/src/core_tests/workflow_cancels.rs @@ -58,7 +58,7 @@ async fn timer_then_cancel_req( NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![start_timer_cmd(timer_seq, Duration::from_secs(1))], ), gen_assert_and_reply( @@ -84,7 +84,7 @@ async fn timer_then_cancel_req_then_timer_then_cancelled() { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![start_timer_cmd(1, Duration::from_secs(1))], ), gen_assert_and_reply( @@ -114,7 +114,7 @@ async fn immediate_cancel() { NonSticky, &[gen_assert_and_reply( &job_assert!( - workflow_activation_job::Variant::StartWorkflow(_), + workflow_activation_job::Variant::InitializeWorkflow(_), workflow_activation_job::Variant::CancelWorkflow(_) ), vec![CancelWorkflowExecution {}.into()], diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index a7f0b706b..2a30265f6 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -42,8 +42,8 @@ use temporal_sdk_core_protos::{ activity_result::{self as ar, activity_resolution, ActivityResolution}, common::VersioningIntent, workflow_activation::{ - remove_from_cache::EvictionReason, workflow_activation_job, FireTimer, ResolveActivity, - StartWorkflow, UpdateRandomSeed, WorkflowActivationJob, + remove_from_cache::EvictionReason, workflow_activation_job, FireTimer, + InitializeWorkflow, ResolveActivity, UpdateRandomSeed, WorkflowActivationJob, }, workflow_commands::{ update_response::Response, workflow_command, ActivityCancellationType, CancelTimer, @@ -112,7 +112,7 @@ async fn single_timer(#[case] worker: Worker, #[case] evict: WorkflowCachingPoli evict, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![start_timer_cmd(1, Duration::from_secs(1))], ), gen_assert_and_reply( @@ -137,7 +137,7 @@ async fn single_activity_completion(worker: Worker) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { activity_id: "fake_activity".to_string(), ..default_act_sched() @@ -171,7 +171,7 @@ async fn parallel_timer_test_across_wf_bridge(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ start_timer_cmd(timer_1_id, Duration::from_secs(1)), start_timer_cmd(timer_2_id, Duration::from_secs(1)), @@ -223,7 +223,7 @@ async fn timer_cancel(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ start_timer_cmd(cancel_timer_id, Duration::from_secs(1)), start_timer_cmd(timer_id, Duration::from_secs(1)), @@ -260,7 +260,7 @@ async fn scheduled_activity_cancellation_try_cancel(hist_batches: &'static [usiz NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_seq, activity_id: activity_id.to_string(), @@ -297,7 +297,7 @@ async fn scheduled_activity_timeout(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_seq, activity_id: activity_id.to_string(), @@ -350,7 +350,7 @@ async fn started_activity_timeout(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_seq, activity_id: activity_seq.to_string(), @@ -405,7 +405,7 @@ async fn cancelled_activity_timeout(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_seq, activity_id: activity_id.to_string(), @@ -557,7 +557,7 @@ async fn verify_activity_cancellation( NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_seq, activity_id: activity_seq.to_string(), @@ -625,7 +625,7 @@ async fn verify_activity_cancellation_wait_for_cancellation(activity_id: u32, wo NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_id, activity_id: activity_id.to_string(), @@ -682,8 +682,8 @@ async fn workflow_update_random_seed_on_workflow_reset() { assert_matches!( res.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow( - StartWorkflow{randomness_seed, ..} + variant: Some(workflow_activation_job::Variant::InitializeWorkflow( + InitializeWorkflow{randomness_seed, ..} )), }] => { randomness_seed_from_start.store(*randomness_seed, Ordering::SeqCst); @@ -692,17 +692,19 @@ async fn workflow_update_random_seed_on_workflow_reset() { }, vec![start_timer_cmd(timer_1_id, Duration::from_secs(1))], ), + // The random seed update should always be the first job gen_assert_and_reply( &|res| { assert_matches!( res.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::FireTimer(_),), - }, - WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::UpdateRandomSeed( UpdateRandomSeed{randomness_seed})), - }] => { + }, + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::FireTimer(_),), + }, + ] => { assert_ne!(randomness_seed_from_start.load(Ordering::SeqCst), *randomness_seed); } @@ -731,7 +733,7 @@ async fn cancel_timer_before_sent_wf_bridge() { &core, NonSticky, &[gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ start_timer_cmd(cancel_timer_id, Duration::from_secs(1)), CancelTimer { @@ -802,7 +804,7 @@ async fn simple_timer_fail_wf_execution(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![start_timer_cmd(timer_id, Duration::from_secs(1))], ), gen_assert_and_reply( @@ -833,7 +835,7 @@ async fn two_signals(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), // Task is completed with no commands vec![], ), @@ -957,7 +959,7 @@ async fn activity_not_canceled_on_replay_repro(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), // Start timer and activity vec![ ScheduleActivity { @@ -1003,7 +1005,7 @@ async fn activity_not_canceled_when_also_completed_repro(hist_batches: &'static NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_id, activity_id: "act-1".to_string(), @@ -1061,7 +1063,7 @@ async fn lots_of_workflows() { while let Ok(wft) = worker.poll_workflow_activation().await { let job = &wft.jobs[0]; let reply = match job.variant { - Some(workflow_activation_job::Variant::StartWorkflow(_)) => { + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) => { start_timer_cmd(1, Duration::from_secs(1)) } Some(workflow_activation_job::Variant::RemoveFromCache(_)) => { @@ -1104,7 +1106,7 @@ async fn wft_timeout_repro(hist_batches: &'static [usize]) { NonSticky, &[ gen_assert_and_reply( - &job_assert!(workflow_activation_job::Variant::StartWorkflow(_)), + &job_assert!(workflow_activation_job::Variant::InitializeWorkflow(_)), vec![ScheduleActivity { seq: activity_id, activity_id: activity_id.to_string(), @@ -1226,7 +1228,7 @@ async fn new_server_work_while_eviction_outstanding_doesnt_overwrite_activation( let start_again = core.poll_workflow_activation().await.unwrap(); assert_matches!( start_again.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(_)) + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) ); } @@ -1441,7 +1443,7 @@ async fn lang_slower_than_wft_timeouts() { let start_again = core.poll_workflow_activation().await.unwrap(); assert_matches!( start_again.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(_)) + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( start_again.run_id, @@ -1602,7 +1604,7 @@ async fn cache_miss_will_fetch_history() { assert_matches!( activation.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); // Force an eviction (before complete matters, so that we will be sure the eviction is queued @@ -1631,7 +1633,7 @@ async fn cache_miss_will_fetch_history() { assert_matches!( activation.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); } @@ -1829,7 +1831,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { for (i, p_res) in [&p1, &p2, &p3].into_iter().enumerate() { assert_matches!( &p_res.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(sw)) + Some(workflow_activation_job::Variant::InitializeWorkflow(sw)) if sw.workflow_id == format!("wf-{}", i + 1) ); } @@ -1874,7 +1876,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { let res = core.poll_workflow_activation().await.unwrap(); assert_matches!( &res.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(sw)) + Some(workflow_activation_job::Variant::InitializeWorkflow(sw)) if sw.workflow_id == format!("wf-{}", 4) ); res @@ -1919,7 +1921,7 @@ async fn poll_faster_than_complete_wont_overflow_cache() { let res = core.poll_workflow_activation().await.unwrap(); assert_matches!( &res.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(sw)) + Some(workflow_activation_job::Variant::InitializeWorkflow(sw)) if sw.workflow_id == "wf-5" ); }; @@ -2004,7 +2006,7 @@ async fn autocompletes_wft_no_work() { assert_matches!( act.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( @@ -2187,7 +2189,7 @@ async fn ignorable_events_are_ok(#[values(true, false)] attribs_unset: bool) { let act = core.poll_workflow_activation().await.unwrap(); assert_matches!( act.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(_)) + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) ); } @@ -2235,7 +2237,7 @@ async fn fetching_to_continue_replay_works() { let act = core.poll_workflow_activation().await.unwrap(); assert_matches!( act.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(_)) + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) ); core.complete_workflow_activation(WorkflowActivationCompletion::empty(act.run_id)) .await @@ -2286,7 +2288,7 @@ async fn fetching_error_evicts_wf() { let act = core.poll_workflow_activation().await.unwrap(); assert_matches!( act.jobs[0].variant, - Some(workflow_activation_job::Variant::StartWorkflow(_)) + Some(workflow_activation_job::Variant::InitializeWorkflow(_)) ); core.complete_workflow_activation(WorkflowActivationCompletion::empty(act.run_id)) .await @@ -2441,10 +2443,10 @@ async fn lang_internal_flag_with_update() { act.jobs.as_slice(), [ WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::DoUpdate(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }, WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::DoUpdate(_)), }, ] ); @@ -2639,14 +2641,13 @@ async fn jobs_are_in_appropriate_order() { let core = mock_worker(mock); let act = core.poll_workflow_activation().await.unwrap(); - // Patch notifications always come first assert_matches!( act.jobs[0].variant.as_ref().unwrap(), - workflow_activation_job::Variant::NotifyHasPatch(_) + workflow_activation_job::Variant::InitializeWorkflow(_) ); assert_matches!( act.jobs[1].variant.as_ref().unwrap(), - workflow_activation_job::Variant::StartWorkflow(_) + workflow_activation_job::Variant::NotifyHasPatch(_) ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( act.run_id, diff --git a/core/src/ephemeral_server/mod.rs b/core/src/ephemeral_server/mod.rs index e2f4c4df6..850b2c1af 100644 --- a/core/src/ephemeral_server/mod.rs +++ b/core/src/ephemeral_server/mod.rs @@ -46,7 +46,7 @@ pub struct TemporalDevServerConfig { /// Log format and level #[builder(default = "(\"pretty\".to_owned(), \"warn\".to_owned())")] pub log: (String, String), - /// Additional arguments to Temporalite. + /// Additional arguments to Temporal dev server. #[builder(default)] pub extra_args: Vec, } @@ -567,8 +567,10 @@ async fn download_and_extract( #[cfg(test)] mod tests { use super::get_free_port; - use std::collections::HashSet; - use std::net::{TcpListener, TcpStream}; + use std::{ + collections::HashSet, + net::{TcpListener, TcpStream}, + }; #[test] fn get_free_port_no_double() { diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index 8b8bf2cef..e6bcadcd1 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -838,7 +838,7 @@ mod test { assert_matches!( a.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ) }); diff --git a/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index 97831b881..c4e6d336e 100644 --- a/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -133,7 +133,7 @@ mod tests { assert_matches!( a.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ) }); diff --git a/core/src/worker/workflow/machines/patch_state_machine.rs b/core/src/worker/workflow/machines/patch_state_machine.rs index 34d13dd27..938ed8f98 100644 --- a/core/src/worker/workflow/machines/patch_state_machine.rs +++ b/core/src/worker/workflow/machines/patch_state_machine.rs @@ -551,10 +551,11 @@ mod tests { let mut aai = ActivationAssertionsInterceptor::default(); aai.then(move |act| { - // replaying cases should immediately get a resolve change activation when marker is present + // replaying cases should immediately get a resolve change activation when marker is + // present if replaying && marker_type != MarkerType::NoMarker { assert_matches!( - &act.jobs[0], + &act.jobs[1], WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::NotifyHasPatch( NotifyHasPatch { diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 8a4f4fa33..ddb00bbbe 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -1331,7 +1331,7 @@ impl LocalActivityRequestSink for LAReqSink { /// activations must uphold. /// /// ## Ordering -/// `patches -> signals/updates -> other -> queries -> evictions` +/// `init workflow -> patches -> random-seed-updates -> signals/updates -> other -> queries -> evictions` /// /// ## Invariants: /// * Queries always go in their own activation @@ -1363,17 +1363,19 @@ fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) { } fn variant_ordinal(v: &workflow_activation_job::Variant) -> u8 { match v { + workflow_activation_job::Variant::InitializeWorkflow(_) => 0, workflow_activation_job::Variant::NotifyHasPatch(_) => 1, - workflow_activation_job::Variant::SignalWorkflow(_) => 2, - workflow_activation_job::Variant::DoUpdate(_) => 2, + workflow_activation_job::Variant::UpdateRandomSeed(_) => 2, + workflow_activation_job::Variant::SignalWorkflow(_) => 3, + workflow_activation_job::Variant::DoUpdate(_) => 3, // In principle we should never actually need to sort these with the others, since // queries always get their own activation, but, maintaining the semantic is // reasonable. - workflow_activation_job::Variant::QueryWorkflow(_) => 4, + workflow_activation_job::Variant::QueryWorkflow(_) => 5, // Also shouldn't ever end up anywhere but the end by construction, but no harm in // double-checking. - workflow_activation_job::Variant::RemoveFromCache(_) => 5, - _ => 3, + workflow_activation_job::Variant::RemoveFromCache(_) => 6, + _ => 4, } } variant_ordinal(j1v).cmp(&variant_ordinal(j2v)) @@ -1418,6 +1420,11 @@ mod tests { Default::default(), )), }, + WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::UpdateRandomSeed( + Default::default(), + )), + }, WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::SignalWorkflow( SignalWorkflow { @@ -1439,6 +1446,7 @@ mod tests { variants.as_slice(), &[ workflow_activation_job::Variant::NotifyHasPatch(_), + workflow_activation_job::Variant::UpdateRandomSeed(_), workflow_activation_job::Variant::SignalWorkflow(ref s1), workflow_activation_job::Variant::DoUpdate(_), workflow_activation_job::Variant::SignalWorkflow(ref s2), diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index 30e444cdd..eb7415f4c 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -23,7 +23,7 @@ import "temporal/sdk/core/common/common.proto"; // ## Job ordering guarantees and semantics // // Core will, by default, order jobs within the activation as follows: -// `patches -> signals/updates -> other -> queries -> evictions` +// `init-workflow -> patches -> random-seed-updates -> signals/updates -> other -> queries -> evictions` // // This is because: // * Patches are expected to apply to the entire activation @@ -74,8 +74,8 @@ message WorkflowActivation { message WorkflowActivationJob { oneof variant { - // Begin a workflow for the first time - StartWorkflow start_workflow = 1; + // A workflow is starting, record all of the information from its start event + InitializeWorkflow initialize_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed FireTimer fire_timer = 2; // Workflow was reset. The randomness seed must be updated. @@ -110,8 +110,8 @@ message WorkflowActivationJob { } } -// Start a new workflow -message StartWorkflow { +// Initialize a new workflow +message InitializeWorkflow { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; // The workflow id used on the temporal server diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index ef488aaf9..268841aa6 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -560,8 +560,8 @@ pub mod coresdk { impl Display for workflow_activation_job::Variant { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - workflow_activation_job::Variant::StartWorkflow(_) => { - write!(f, "StartWorkflow") + workflow_activation_job::Variant::InitializeWorkflow(_) => { + write!(f, "InitializeWorkflow") } workflow_activation_job::Variant::FireTimer(t) => { write!(f, "FireTimer({})", t.seq) @@ -662,14 +662,14 @@ pub mod coresdk { } } - /// Create a [StartWorkflow] job from corresponding event attributes + /// Create a [InitializeWorkflow] job from corresponding event attributes pub fn start_workflow_from_attribs( attrs: WorkflowExecutionStartedEventAttributes, workflow_id: String, randomness_seed: u64, start_time: Timestamp, - ) -> StartWorkflow { - StartWorkflow { + ) -> InitializeWorkflow { + InitializeWorkflow { workflow_type: attrs.workflow_type.map(|wt| wt.name).unwrap_or_default(), workflow_id, arguments: Vec::from_payloads(attrs.input), diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index b7e455d96..76c3601c4 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -396,10 +396,10 @@ impl WorkflowHalf { let mut res = None; let run_id = activation.run_id.clone(); - // If the activation is to start a workflow, create a new workflow driver for it, + // If the activation is to init a workflow, create a new workflow driver for it, // using the function associated with that workflow id if let Some(sw) = activation.jobs.iter().find_map(|j| match j.variant { - Some(Variant::StartWorkflow(ref sw)) => Some(sw), + Some(Variant::InitializeWorkflow(ref sw)) => Some(sw), _ => None, }) { let workflow_type = &sw.workflow_type; diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 11d4b30c6..a788a5d3d 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -129,6 +129,11 @@ impl WfContext { RwLockReadGuard::map(self.shared.read(), |s| &s.search_attributes) } + /// Return the workflow's randomness seed + pub fn random_seed(&self) -> u64 { + self.shared.read().random_seed + } + /// A future that resolves if/when the workflow is cancelled pub async fn cancelled(&self) { if *self.am_cancelled.borrow() { @@ -419,6 +424,7 @@ pub(crate) struct WfContextSharedData { pub(crate) history_length: u32, pub(crate) current_build_id: Option, pub(crate) search_attributes: SearchAttributes, + pub(crate) random_seed: u64, } /// Helper Wrapper that can drain the channel into a Vec in a blocking way. Useful diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index deb977d9c..fa7a3a340 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -175,8 +175,8 @@ impl WorkflowFuture { ) -> Result { if let Some(v) = variant { match v { - Variant::StartWorkflow(_) => { - // Don't do anything in here. Start workflow is looked at earlier, before + Variant::InitializeWorkflow(_) => { + // Don't do anything in here. Init workflow is looked at earlier, before // jobs are handled, and may have information taken out of it to avoid clones. } Variant::FireTimer(FireTimer { seq }) => { @@ -201,7 +201,9 @@ impl WorkflowFuture { seq, Box::new(result.context("Child Workflow execution must have a result")?), ))?, - Variant::UpdateRandomSeed(_) => (), + Variant::UpdateRandomSeed(rs) => { + self.wf_ctx.shared.write().random_seed = rs.randomness_seed; + } Variant::QueryWorkflow(q) => { error!( "Queries are not implemented in the Rust SDK. Got query '{}'", @@ -342,26 +344,26 @@ impl Future for WorkflowFuture { let mut activation_cmds = vec![]; // Assign initial state from start workflow job if let Some(start_info) = activation.jobs.iter_mut().find_map(|j| { - if let Some(Variant::StartWorkflow(s)) = j.variant.as_mut() { + if let Some(Variant::InitializeWorkflow(s)) = j.variant.as_mut() { Some(s) } else { None } }) { - // TODO: Can assign randomness seed whenever needed - self.wf_ctx.shared.write().search_attributes = - start_info.search_attributes.take().unwrap_or_default(); + let mut wlock = self.wf_ctx.shared.write(); + wlock.random_seed = start_info.randomness_seed; + wlock.search_attributes = start_info.search_attributes.take().unwrap_or_default(); }; // Lame hack to avoid hitting "unregistered" update handlers in a situation where // the history has no commands until an update is accepted. Will go away w/ SDK redesign if activation .jobs .iter() - .any(|j| matches!(j.variant, Some(Variant::StartWorkflow(_)))) + .any(|j| matches!(j.variant, Some(Variant::InitializeWorkflow(_)))) && activation.jobs.iter().all(|j| { matches!( j.variant, - Some(Variant::StartWorkflow(_) | Variant::DoUpdate(_)) + Some(Variant::InitializeWorkflow(_) | Variant::DoUpdate(_)) ) }) { diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 576f790b4..310e3305b 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -73,11 +73,14 @@ pub const INTEG_USE_TLS_ENV_VAR: &str = "TEMPORAL_USE_TLS"; pub const INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR: &str = "INTEG_TEMPORAL_DEV_SERVER_ON"; /// This env var is set (to any value) if the test server is in use pub const INTEG_TEST_SERVER_USED_ENV_VAR: &str = "INTEG_TEST_SERVER_ON"; +pub static SEARCH_ATTR_TXT: &str = "CustomTextField"; +pub static SEARCH_ATTR_INT: &str = "CustomIntField"; /// If set, turn export traces and metrics to the OTel collector at the given URL const OTEL_URL_ENV_VAR: &str = "TEMPORAL_INTEG_OTEL_URL"; /// If set, enable direct scraping of prom metrics on the specified port const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT"; + #[macro_export] macro_rules! prost_dur { ($dur_call:ident $args:tt) => { diff --git a/tests/integ_tests/queries_tests.rs b/tests/integ_tests/queries_tests.rs index 52d68f056..f9dbbbd8b 100644 --- a/tests/integ_tests/queries_tests.rs +++ b/tests/integ_tests/queries_tests.rs @@ -12,7 +12,7 @@ use temporal_sdk_core_protos::{ temporal::api::{failure::v1::Failure, query::v1::WorkflowQuery}, }; use temporal_sdk_core_test_utils::{ - drain_pollers_and_shutdown, init_core_and_create_wf, WorkerTestHelpers, + drain_pollers_and_shutdown, init_core_and_create_wf, CoreWfStarter, WorkerTestHelpers, }; use tokio::join; @@ -163,7 +163,7 @@ async fn query_after_execution_complete(#[case] do_evict: bool) { if matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ) { core.complete_timer(&task.run_id, 1, Duration::from_millis(500)) @@ -215,30 +215,15 @@ async fn query_after_execution_complete(#[case] do_evict: bool) { #[tokio::test] async fn fail_legacy_query() { let query_err = "oh no broken"; - let mut starter = init_core_and_create_wf("fail_legacy_query").await; + let mut starter = CoreWfStarter::new("fail_legacy_query"); let core = starter.get_worker().await; + starter.workflow_options.task_timeout = Some(Duration::from_secs(1)); + starter.start_wf().await; let workflow_id = starter.get_task_queue().to_string(); let task = core.poll_workflow_activation().await.unwrap(); - let t1_resp = vec![ - StartTimer { - seq: 1, - start_to_fire_timeout: Some(prost_dur!(from_millis(500))), - } - .into(), - StartTimer { - seq: 2, - start_to_fire_timeout: Some(prost_dur!(from_secs(3))), - } - .into(), - ]; - core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( - task.run_id.clone(), - t1_resp.clone(), - )) - .await - .unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - // Query after timer should have fired and there should be new WFT + // Queries are *always* legacy on closed workflows, so that's the easiest way to ensure that + // path is used. + core.complete_execution(&task.run_id).await; let query_fut = async { starter .get_client() @@ -255,33 +240,14 @@ async fn fail_legacy_query() { .await .unwrap_err() }; - let workflow_completions_future = async { - // Give query a beat to get going - tokio::time::sleep(Duration::from_millis(400)).await; - // This poll *should* have the `queries` field populated, but doesn't, seemingly due to - // a server bug. So, complete the WF task of the first timer firing with empty commands + let query_responder = async { let task = core.poll_workflow_activation().await.unwrap(); - assert_matches!( - task.jobs.as_slice(), - [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::FireTimer(_)), - }] - ); - core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( - task.run_id, - vec![], - )) - .await - .unwrap(); - let task = core.poll_workflow_activation().await.unwrap(); - // Poll again, and we end up getting a `query` field query response assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), }] => q ); - // Fail this task core.complete_workflow_activation(WorkflowActivationCompletion::fail( task.run_id, Failure { @@ -292,26 +258,8 @@ async fn fail_legacy_query() { )) .await .unwrap(); - // Finish the workflow (handling cache removal) - let task = core.poll_workflow_activation().await.unwrap(); - core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id)) - .await - .unwrap(); - let task = core.poll_workflow_activation().await.unwrap(); - core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( - task.run_id, - t1_resp.clone(), - )) - .await - .unwrap(); - let task = core.poll_workflow_activation().await.unwrap(); - core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id)) - .await - .unwrap(); - let task = core.poll_workflow_activation().await.unwrap(); - core.complete_execution(&task.run_id).await; }; - let (q_resp, _) = join!(query_fut, workflow_completions_future); + let (q_resp, _) = join!(query_fut, query_responder); // Ensure query response is a failure and has the right message assert_eq!(q_resp.message(), query_err); } @@ -507,7 +455,7 @@ async fn query_should_not_be_sent_if_wft_about_to_fail() { assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); core.complete_workflow_activation(WorkflowActivationCompletion::fail( @@ -522,7 +470,6 @@ async fn query_should_not_be_sent_if_wft_about_to_fail() { .unwrap(); let task = core.poll_workflow_activation().await.unwrap(); // Should *not* get a query here. If the bug wasn't fixed, this job would have a query. - dbg!(&task); assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { @@ -538,12 +485,11 @@ async fn query_should_not_be_sent_if_wft_about_to_fail() { assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); core.complete_execution(&task.run_id).await; let task = core.poll_workflow_activation().await.unwrap(); - dbg!(&task); let qid = assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { diff --git a/tests/integ_tests/update_tests.rs b/tests/integ_tests/update_tests.rs index b6de2a491..cb688f1f6 100644 --- a/tests/integ_tests/update_tests.rs +++ b/tests/integ_tests/update_tests.rs @@ -78,10 +78,9 @@ async fn update_workflow(#[values(FailUpdate::Yes, FailUpdate::No)] will_fail: F .unwrap(); let with_id = HistoryForReplay::new(history, workflow_id.to_string()); let replay_worker = init_core_replay_preloaded(workflow_id, [with_id]); - handle_update(will_fail, CompleteWorkflow::Yes, replay_worker.as_ref()).await; + handle_update(will_fail, CompleteWorkflow::Yes, replay_worker.as_ref(), 1).await; } -#[rstest::rstest] #[tokio::test] async fn reapplied_updates_due_to_reset() { let mut starter = init_core_and_create_wf("update_workflow").await; @@ -121,7 +120,9 @@ async fn reapplied_updates_due_to_reset() { .into_inner(); // Accept and complete the reapplied update - handle_update(FailUpdate::No, CompleteWorkflow::No, core.as_ref()).await; + // Index here is 2 because there will be start workflow & update random seed (from the reset) + // first. + handle_update(FailUpdate::No, CompleteWorkflow::No, core.as_ref(), 2).await; // Send a second update and complete the workflow let post_reset_run_id = send_and_handle_update( @@ -149,12 +150,19 @@ async fn reapplied_updates_due_to_reset() { // We now recapitulate the actions that the worker took on first execution above, pretending // that we always followed the post-reset history. // First, we handled the post-reset reapplied update and did not complete the workflow. - handle_update(FailUpdate::No, CompleteWorkflow::No, replay_worker.as_ref()).await; + handle_update( + FailUpdate::No, + CompleteWorkflow::No, + replay_worker.as_ref(), + 2, + ) + .await; // Then the client sent a second update; we handled it and completed the workflow. handle_update( FailUpdate::No, CompleteWorkflow::Yes, replay_worker.as_ref(), + 0, ) .await; @@ -199,7 +207,7 @@ async fn send_and_handle_update( }; // Accept update, complete update and complete workflow - let processing_task = handle_update(fail_update, complete_workflow, core); + let processing_task = handle_update(fail_update, complete_workflow, core, 0); let (ur, _) = join!(update_task, processing_task); let v = ur.outcome.unwrap().value.unwrap(); @@ -218,12 +226,11 @@ async fn handle_update( fail_update: FailUpdate, complete_workflow: CompleteWorkflow, core: &dyn Worker, + update_job_index: usize, ) { let act = core.poll_workflow_activation().await.unwrap(); - // On replay, the first activation has update & start workflow, but on first execution, it does - // not - can happen if update is waiting on some condition. let pid = assert_matches!( - &act.jobs[0], + &act.jobs[update_job_index], WorkflowActivationJob { variant: Some(workflow_activation_job::Variant::DoUpdate(d)), } => &d.protocol_instance_id @@ -573,7 +580,7 @@ async fn update_speculative_wft() { assert_matches!( res.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); core.complete_workflow_activation(WorkflowActivationCompletion::empty(res.run_id)) diff --git a/tests/integ_tests/visibility_tests.rs b/tests/integ_tests/visibility_tests.rs index 8fca527ba..7285d244e 100644 --- a/tests/integ_tests/visibility_tests.rs +++ b/tests/integ_tests/visibility_tests.rs @@ -29,7 +29,7 @@ async fn client_list_open_closed_workflow_executions() { assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index ea4bce3e1..702c6ae95 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -79,7 +79,7 @@ async fn parallel_workflows_same_queue() { assert_matches!( task.jobs.as_slice(), [WorkflowActivationJob { - variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + variant: Some(workflow_activation_job::Variant::InitializeWorkflow(_)), }] ); worker diff --git a/tests/integ_tests/workflow_tests/resets.rs b/tests/integ_tests/workflow_tests/resets.rs index 86a7e9697..c9b0fc923 100644 --- a/tests/integ_tests/workflow_tests/resets.rs +++ b/tests/integ_tests/workflow_tests/resets.rs @@ -1,9 +1,19 @@ +use crate::integ_tests::activity_functions::echo; use futures::StreamExt; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions, WorkflowService}; -use temporal_sdk::WfContext; -use temporal_sdk_core_protos::temporal::api::{ - common::v1::WorkflowExecution, workflowservice::v1::ResetWorkflowExecutionRequest, +use temporal_sdk::{LocalActivityOptions, WfContext}; +use temporal_sdk_core_protos::{ + coresdk::AsJsonPayloadExt, + temporal::api::{ + common::v1::WorkflowExecution, workflowservice::v1::ResetWorkflowExecutionRequest, + }, }; use temporal_sdk_core_test_utils::{CoreWfStarter, NAMESPACE}; use tokio::sync::Notify; @@ -92,3 +102,128 @@ async fn reset_workflow() { let (_, rr) = tokio::join!(resetter_fut, run_fut); rr.unwrap(); } + +#[tokio::test] +async fn reset_randomseed() { + let wf_name = "reset_randomseed"; + let mut starter = CoreWfStarter::new(wf_name); + starter.worker_config.no_remote_activities(true); + let mut worker = starter.worker().await; + worker.fetch_results = false; + let notify = Arc::new(Notify::new()); + + const POST_FAIL_SIG: &str = "post-fail"; + static DID_FAIL: AtomicBool = AtomicBool::new(false); + static RAND_SEED: AtomicU64 = AtomicU64::new(0); + + let wf_notify = notify.clone(); + worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| { + let notify = wf_notify.clone(); + async move { + let _ = RAND_SEED.compare_exchange( + 0, + ctx.random_seed(), + Ordering::Relaxed, + Ordering::Relaxed, + ); + // Make a couple workflow tasks + ctx.timer(Duration::from_millis(100)).await; + ctx.timer(Duration::from_millis(100)).await; + if DID_FAIL + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + // Tell outer scope to send the post-task-failure-signal + notify.notify_one(); + panic!("Ahh"); + } + // Make a command that is one thing with the initial seed, but another after reset + if RAND_SEED.load(Ordering::Relaxed) == ctx.random_seed() { + ctx.timer(Duration::from_millis(100)).await; + } else { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_string(), + input: "hi!".as_json_payload().expect("serializes fine"), + ..Default::default() + }) + .await; + } + // Wait for the post-task-fail signal + let _ = ctx.make_signal_channel(POST_FAIL_SIG).next().await.unwrap(); + // Tell outer scope to send the reset + notify.notify_one(); + let _ = ctx + .make_signal_channel(POST_RESET_SIG) + .next() + .await + .unwrap(); + Ok(().into()) + } + }); + worker.register_activity("echo", echo); + + let run_id = worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + + let mut client = starter.get_client().await; + let client = Arc::make_mut(&mut client); + let client_fur = async { + notify.notified().await; + WorkflowClientTrait::signal_workflow_execution( + client, + wf_name.to_owned(), + run_id.clone(), + POST_FAIL_SIG.to_string(), + None, + None, + ) + .await + .unwrap(); + notify.notified().await; + // Reset the workflow to be after first timer has fired + client + .reset_workflow_execution(ResetWorkflowExecutionRequest { + namespace: NAMESPACE.to_owned(), + workflow_execution: Some(WorkflowExecution { + workflow_id: wf_name.to_owned(), + run_id: run_id.clone(), + }), + workflow_task_finish_event_id: 14, + request_id: "test-req-id".to_owned(), + ..Default::default() + }) + .await + .unwrap(); + + // Unblock the workflow by sending the signal. Run ID will have changed after reset so + // we use empty run id + WorkflowClientTrait::signal_workflow_execution( + client, + wf_name.to_owned(), + "".to_owned(), + POST_RESET_SIG.to_owned(), + None, + None, + ) + .await + .unwrap(); + + // Wait for the now-reset workflow to finish + client + .get_untyped_workflow_handle(wf_name.to_owned(), "") + .get_workflow_result(Default::default()) + .await + .unwrap(); + starter.shutdown().await; + }; + let run_fut = worker.run_until_done(); + let (_, rr) = tokio::join!(client_fur, run_fut); + rr.unwrap(); +} diff --git a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index 4bc797b50..c608c1e76 100644 --- a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -1,32 +1,26 @@ use assert_matches::assert_matches; -use std::{collections::HashMap, env, time::Duration}; +use std::{collections::HashMap, time::Duration}; use temporal_client::{ GetWorkflowResultOpts, WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions, }; use temporal_sdk::{WfContext, WfExitValue, WorkflowResult}; use temporal_sdk_core_protos::coresdk::{AsJsonPayloadExt, FromJsonPayloadExt}; -use temporal_sdk_core_test_utils::{CoreWfStarter, INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR}; -use tracing::warn; +use temporal_sdk_core_test_utils::{CoreWfStarter, SEARCH_ATTR_INT, SEARCH_ATTR_TXT}; use uuid::Uuid; -// These are initialized on the server as part of the autosetup container which we -// use for integration tests. -static TXT_ATTR: &str = "CustomTextField"; -static INT_ATTR: &str = "CustomIntField"; - async fn search_attr_updater(ctx: WfContext) -> WorkflowResult<()> { let mut int_val = ctx .search_attributes() .indexed_fields - .get(INT_ATTR) + .get(SEARCH_ATTR_INT) .cloned() .unwrap_or_default(); let orig_val = int_val.data[0]; int_val.data[0] += 1; ctx.upsert_search_attributes([ - (TXT_ATTR.to_string(), "goodbye".as_json_payload()?), - (INT_ATTR.to_string(), int_val), + (SEARCH_ATTR_TXT.to_string(), "goodbye".as_json_payload()?), + (SEARCH_ATTR_INT.to_string(), int_val), ]); // 49 is ascii 1 if orig_val == 49 { @@ -43,11 +37,6 @@ async fn sends_upsert() { let mut starter = CoreWfStarter::new(wf_name); starter.worker_config.no_remote_activities(true); let mut worker = starter.worker().await; - // TODO: this should be supported in server 1.20, remove this condition when CLI is upgraded. - if env::var(INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR).is_ok() { - warn!("skipping sends_upsert -- does not work on temporal dev server"); - return; - } worker.register_wf(wf_name, search_attr_updater); worker @@ -57,8 +46,11 @@ async fn sends_upsert() { vec![], WorkflowOptions { search_attributes: Some(HashMap::from([ - (TXT_ATTR.to_string(), "hello".as_json_payload().unwrap()), - (INT_ATTR.to_string(), 1.as_json_payload().unwrap()), + ( + SEARCH_ATTR_TXT.to_string(), + "hello".as_json_payload().unwrap(), + ), + (SEARCH_ATTR_INT.to_string(), 1.as_json_payload().unwrap()), ])), execution_timeout: Some(Duration::from_secs(4)), ..Default::default() @@ -78,8 +70,8 @@ async fn sends_upsert() { .search_attributes .unwrap() .indexed_fields; - let txt_attr_payload = search_attrs.get(TXT_ATTR).unwrap(); - let int_attr_payload = search_attrs.get(INT_ATTR).unwrap(); + let txt_attr_payload = search_attrs.get(SEARCH_ATTR_TXT).unwrap(); + let int_attr_payload = search_attrs.get(SEARCH_ATTR_INT).unwrap(); for payload in [txt_attr_payload, int_attr_payload] { assert!(payload.is_json_payload()); } diff --git a/tests/runner.rs b/tests/runner.rs index d0a2dc94f..cf895a6b8 100644 --- a/tests/runner.rs +++ b/tests/runner.rs @@ -10,7 +10,7 @@ use temporal_sdk_core::ephemeral_server::{ }; use temporal_sdk_core_test_utils::{ default_cached_download, INTEG_SERVER_TARGET_ENV_VAR, INTEG_TEMPORAL_DEV_SERVER_USED_ENV_VAR, - INTEG_TEST_SERVER_USED_ENV_VAR, + INTEG_TEST_SERVER_USED_ENV_VAR, SEARCH_ATTR_INT, SEARCH_ATTR_TXT, }; use tokio::{self, process::Command}; @@ -70,11 +70,16 @@ async fn main() -> Result<(), anyhow::Error> { ServerKind::TemporalCLI => { let config = TemporalDevServerConfigBuilder::default() .exe(default_cached_download()) - // TODO: Delete when temporalCLI enables it by default. .extra_args(vec![ + // TODO: Delete when temporalCLI enables it by default. "--dynamic-config-value".to_string(), "system.enableEagerWorkflowStart=true".to_string(), + "--search-attribute".to_string(), + format!("{SEARCH_ATTR_TXT}=Text"), + "--search-attribute".to_string(), + format!("{SEARCH_ATTR_INT}=Int"), ]) + .ui(true) .build()?; println!("Using temporal CLI"); (