diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt index 451cf1573..f90835350 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt @@ -92,6 +92,7 @@ import kotlin.coroutines.CoroutineContext */ internal class SubtreeManager( private val contextForChildren: CoroutineContext, + private val emitActionToParent: (WorkflowAction) -> Any?, private val parentDiagnosticId: Long, private val diagnosticListener: WorkflowDiagnosticListener? = null, private val idCounter: IdCounter? = null @@ -150,16 +151,9 @@ internal class SubtreeManager( * Uses [selector] to invoke [WorkflowNode.tick] for every running child workflow this instance * is managing. */ - fun tickChildren( - selector: SelectBuilder, - handler: (WorkflowAction) -> T? - ) { + fun tickChildren(selector: SelectBuilder) { children.forEachActive { child -> - child.workflowNode.tick(selector) { output -> - val componentUpdate = child.acceptChildOutput(output) - @Suppress("UNCHECKED_CAST") - return@tick handler(componentUpdate as WorkflowAction) - } + child.workflowNode.tick(selector) } } @@ -187,16 +181,25 @@ internal class SubtreeManager( handler: (ChildOutputT) -> WorkflowAction ): WorkflowChildNode { val id = child.id(key) + lateinit var node: WorkflowChildNode + + fun acceptChildOutput(output: ChildOutputT): Any? { + val action = node.acceptChildOutput(output) + return emitActionToParent(action) + } + val workflowNode = WorkflowNode( id, child.asStatefulWorkflow(), initialProps, snapshotCache[id], contextForChildren, + ::acceptChildOutput, parentDiagnosticId, diagnosticListener, idCounter ) return WorkflowChildNode(child, handler, workflowNode) + .also { node = it } } } diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt index b1702c433..a33d89fb0 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt @@ -121,7 +121,7 @@ internal open class RealWorkflowLoop : WorkflowLoop { } // Tick the workflow tree. - rootNode.tick(this) { it } + rootNode.tick(this) } } // Compiler gets confused, and thinks both that this throw is unreachable, and without the diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt index 51537a75d..14ac2457f 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt @@ -28,7 +28,6 @@ import com.squareup.workflow.diagnostic.createId import com.squareup.workflow.internal.RealRenderContext.WorkerRunner import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel @@ -40,6 +39,8 @@ import kotlin.coroutines.CoroutineContext /** * A node in a state machine tree. Manages the actual state for a given [Workflow]. * + * @param emitOutputToParent A function that this node will call when it needs to emit an output + * value to its parent. Returns either the output to be emitted from the root workflow, or null. * @param initialState Allows unit tests to start the node from a given state, instead of calling * [StatefulWorkflow.initialState]. */ @@ -50,6 +51,7 @@ internal class WorkflowNode( initialProps: PropsT, snapshot: ByteString?, baseContext: CoroutineContext, + private val emitOutputToParent: (OutputT) -> Any? = { it }, parentDiagnosticId: Long? = null, private val diagnosticListener: WorkflowDiagnosticListener? = null, private val idCounter: IdCounter? = null, @@ -69,8 +71,9 @@ internal class WorkflowNode( */ internal val diagnosticId = idCounter.createId() - private val subtreeManager = - SubtreeManager(coroutineContext, diagnosticId, diagnosticListener, idCounter) + private val subtreeManager = SubtreeManager( + coroutineContext, ::applyAction, diagnosticId, diagnosticListener, idCounter + ) private val workers = ActiveStagingList>() @@ -158,20 +161,9 @@ internal class WorkflowNode( * * It is an error to call this method after calling [cancel]. */ - @UseExperimental(InternalCoroutinesApi::class) - fun tick( - selector: SelectBuilder, - handler: (OutputT) -> T? - ) { - fun acceptUpdate(action: WorkflowAction): T? { - val (newState, output) = action.applyTo(state) - diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output) - state = newState - return output?.let(handler) - } - + fun tick(selector: SelectBuilder) { // Listen for any child workflow updates. - subtreeManager.tickChildren(selector, ::acceptUpdate) + subtreeManager.tickChildren(selector) // Listen for any subscription updates. workers.forEachActive { child -> @@ -188,7 +180,7 @@ internal class WorkflowNode( } else { val update = child.acceptUpdate(valueOrDone.value) @Suppress("UNCHECKED_CAST") - acceptUpdate(update as WorkflowAction) + return@onReceive applyAction(update as WorkflowAction) } } } @@ -198,7 +190,7 @@ internal class WorkflowNode( with(selector) { eventActionsChannel.onReceive { action -> diagnosticListener?.onSinkReceived(diagnosticId, action) - acceptUpdate(action) + return@onReceive applyAction(action) } } } @@ -256,6 +248,18 @@ internal class WorkflowNode( lastProps = newProps } + /** + * Applies [action] to this workflow's [state] and + * [emits an output to its parent][emitOutputToParent] if necessary. + */ + private fun applyAction(action: WorkflowAction): T? { + val (newState, output) = action.applyTo(state) + diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output) + state = newState + @Suppress("UNCHECKED_CAST") + return output?.let(emitOutputToParent) as T? + } + private fun createWorkerNode( worker: Worker, key: String, diff --git a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt index 958a72ec0..36d698297 100644 --- a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt +++ b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt @@ -99,8 +99,7 @@ class SubtreeManagerTest { private val context = Unconfined @Test fun `render starts new child`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() manager.render(workflow, "props", key = "", handler = { fail() }) @@ -108,8 +107,7 @@ class SubtreeManagerTest { } @Test fun `render doesn't start existing child`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render() = manager.render(workflow, "props", key = "", handler = { fail() }) .also { manager.commitRenderedChildren() } @@ -121,8 +119,7 @@ class SubtreeManagerTest { } @Test fun `render restarts child after tearing down`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render() = manager.render(workflow, "props", key = "", handler = { fail() }) .also { manager.commitRenderedChildren() } @@ -138,8 +135,7 @@ class SubtreeManagerTest { } @Test fun `render throws on duplicate key`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() manager.render(workflow, "props", "foo", handler = { fail() }) @@ -153,8 +149,7 @@ class SubtreeManagerTest { } @Test fun `render returns child rendering`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() val (composeProps, composeState) = manager.render( @@ -165,8 +160,7 @@ class SubtreeManagerTest { } @Test fun `tick children handles child output`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() val handler: StringHandler = { output -> action { setOutput("case output:$output") } @@ -189,8 +183,7 @@ class SubtreeManagerTest { } @Test fun `render updates child's output handler`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render(handler: StringHandler) = manager.render(workflow, "props", key = "", handler = handler) @@ -219,7 +212,7 @@ class SubtreeManagerTest { // See https://github.com/square/workflow/issues/404 @Test fun `createChildSnapshot snapshots eagerly`() { - val manager = SubtreeManager(Unconfined, parentDiagnosticId = 0) + val manager = subtreeManagerForTest() val workflow = SnapshotTestWorkflow() assertEquals(0, workflow.snapshots) @@ -232,7 +225,7 @@ class SubtreeManagerTest { // See https://github.com/square/workflow/issues/404 @Test fun `createChildSnapshot serializes lazily`() { - val manager = SubtreeManager(Unconfined, parentDiagnosticId = 0) + val manager = subtreeManagerForTest() val workflow = SnapshotTestWorkflow() assertEquals(0, workflow.serializes) @@ -246,11 +239,9 @@ class SubtreeManagerTest { assertEquals(1, workflow.serializes) } - private suspend fun SubtreeManager.tickAction(): WorkflowAction? { - return select { - tickChildren(this) { update -> - return@tickChildren update - } - } - } + private suspend fun SubtreeManager.tickAction(): WorkflowAction? = + select { tickChildren(this) } + + private fun subtreeManagerForTest() = + SubtreeManager(context, emitActionToParent = { it }, parentDiagnosticId = 0) } diff --git a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt index 928006bba..0878a1588 100644 --- a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt +++ b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt @@ -21,12 +21,14 @@ import com.squareup.workflow.RenderContext import com.squareup.workflow.Sink import com.squareup.workflow.Snapshot import com.squareup.workflow.StatefulWorkflow +import com.squareup.workflow.Worker import com.squareup.workflow.Workflow import com.squareup.workflow.WorkflowAction import com.squareup.workflow.WorkflowAction.Companion.emitOutput import com.squareup.workflow.WorkflowAction.Updater import com.squareup.workflow.action import com.squareup.workflow.asWorker +import com.squareup.workflow.contraMap import com.squareup.workflow.makeEventSink import com.squareup.workflow.parse import com.squareup.workflow.readUtf8WithLength @@ -165,7 +167,7 @@ class WorkflowNodeTest { return "" } } - val node = WorkflowNode(workflow.id(), workflow, "", null, context) + val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" }) node.render(workflow, "") sink.send("event") @@ -173,7 +175,7 @@ class WorkflowNodeTest { val result = runBlocking { withTimeout(10) { select { - node.tick(this) { "tick:$it" } + node.tick(this) } } } @@ -200,7 +202,7 @@ class WorkflowNodeTest { return "" } } - val node = WorkflowNode(workflow.id(), workflow, "", null, context) + val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" }) node.render(workflow, "") sink.send("event") @@ -208,14 +210,14 @@ class WorkflowNodeTest { val result = runBlocking { withTimeout(10) { - List(2) { i -> + List(2) { select { - node.tick(this) { "tick$i:$it" } + node.tick(this) } } } } - assertEquals(listOf("tick0:event", "tick1:event2"), result) + assertEquals(listOf("tick:event", "tick:event2"), result) } @Test fun `send allows subsequent events on same rendering`() { @@ -312,7 +314,7 @@ class WorkflowNodeTest { try { withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } fail("Expected exception") @@ -324,7 +326,7 @@ class WorkflowNodeTest { withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } } @@ -379,7 +381,7 @@ class WorkflowNodeTest { // This tick will process the event handler, it won't close the channel yet. withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } @@ -749,7 +751,7 @@ class WorkflowNodeTest { // update. launch(start = UNDISPATCHED) { select { - node.tick(this) { null } + node.tick(this) } } yield() @@ -805,7 +807,7 @@ class WorkflowNodeTest { // update. launch(start = UNDISPATCHED) { select { - node.tick(this) { null } + node.tick(this) } } yield() @@ -873,4 +875,166 @@ class WorkflowNodeTest { error.message ) } + + @Test fun `actionSink action changes state`() { + val workflow = Workflow.stateful>>( + initialState = { "initial" }, + render = { _, state -> + state to actionSink.contraMap { + action { nextState = "$nextState->$it" } + } + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + val (_, sink) = node.render(workflow.asStatefulWorkflow(), Unit) + + sink.send("hello") + + runBlocking { + select { + node.tick(this) + } + } + + val (state, _) = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `actionSink action emits output`() { + val workflow = Workflow.stateless> { + actionSink.contraMap { action { setOutput(it) } } + } + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + val rendering = node.render(workflow.asStatefulWorkflow(), Unit) + + rendering.send("hello") + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:hello", output) + } + + @Test fun `worker action changes state`() { + val workflow = Workflow.stateful( + initialState = { "initial" }, + render = { _, state -> + runningWorker(Worker.from { "hello" }) { action { nextState = "$nextState->$it" } } + return@stateful state + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + runBlocking { + select { + node.tick(this) + } + } + + val state = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `worker action emits output`() { + val workflow = Worker.from { "hello" } + .asWorkflow() + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:hello", output) + } + + @Test fun `child action changes state`() { + val child = Worker.from { "hello" } + .asWorkflow() + val workflow = Workflow.stateful( + initialState = { "initial" }, + render = { _, state -> + renderChild(child) { action { nextState = "$nextState->$it" } } + return@stateful state + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + runBlocking { + select { + node.tick(this) + } + } + + val state = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `child action emits output`() { + val child = Worker.from { "hello" } + .asWorkflow() + val workflow = Workflow.stateless { + renderChild(child) { action { setOutput("child:$it") } } + } + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:child:hello", output) + } + + private fun Worker.asWorkflow() = Workflow.stateless { + runningWorker(this@asWorkflow) { action { setOutput(it) } } + } }