From 4f55451ce20aa8c5f192b23de7b1aca458913ed5 Mon Sep 17 00:00:00 2001 From: Zach Klippenstein Date: Sat, 25 Jan 2020 14:49:39 -0800 Subject: [PATCH] Pass acceptOutput function to WorkflowNode constructor instead of every tick pass. This doesn't change any behavior, it is just more efficient: there's no need to define the function at the tick pass, so this is not only more efficient but also just easier to read, since there are fewer moving parts. This is also required for #910. --- .../workflow/internal/SubtreeManager.kt | 21 +- .../workflow/internal/WorkflowLoop.kt | 2 +- .../workflow/internal/WorkflowNode.kt | 40 ++-- .../workflow/internal/SubtreeManagerTest.kt | 37 ++-- .../workflow/internal/WorkflowNodeTest.kt | 186 ++++++++++++++++-- 5 files changed, 224 insertions(+), 62 deletions(-) diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt index 451cf1573..f90835350 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/SubtreeManager.kt @@ -92,6 +92,7 @@ import kotlin.coroutines.CoroutineContext */ internal class SubtreeManager( private val contextForChildren: CoroutineContext, + private val emitActionToParent: (WorkflowAction) -> Any?, private val parentDiagnosticId: Long, private val diagnosticListener: WorkflowDiagnosticListener? = null, private val idCounter: IdCounter? = null @@ -150,16 +151,9 @@ internal class SubtreeManager( * Uses [selector] to invoke [WorkflowNode.tick] for every running child workflow this instance * is managing. */ - fun tickChildren( - selector: SelectBuilder, - handler: (WorkflowAction) -> T? - ) { + fun tickChildren(selector: SelectBuilder) { children.forEachActive { child -> - child.workflowNode.tick(selector) { output -> - val componentUpdate = child.acceptChildOutput(output) - @Suppress("UNCHECKED_CAST") - return@tick handler(componentUpdate as WorkflowAction) - } + child.workflowNode.tick(selector) } } @@ -187,16 +181,25 @@ internal class SubtreeManager( handler: (ChildOutputT) -> WorkflowAction ): WorkflowChildNode { val id = child.id(key) + lateinit var node: WorkflowChildNode + + fun acceptChildOutput(output: ChildOutputT): Any? { + val action = node.acceptChildOutput(output) + return emitActionToParent(action) + } + val workflowNode = WorkflowNode( id, child.asStatefulWorkflow(), initialProps, snapshotCache[id], contextForChildren, + ::acceptChildOutput, parentDiagnosticId, diagnosticListener, idCounter ) return WorkflowChildNode(child, handler, workflowNode) + .also { node = it } } } diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt index b1702c433..a33d89fb0 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowLoop.kt @@ -121,7 +121,7 @@ internal open class RealWorkflowLoop : WorkflowLoop { } // Tick the workflow tree. - rootNode.tick(this) { it } + rootNode.tick(this) } } // Compiler gets confused, and thinks both that this throw is unreachable, and without the diff --git a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt index 51537a75d..14ac2457f 100644 --- a/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt +++ b/kotlin/workflow-runtime/src/main/java/com/squareup/workflow/internal/WorkflowNode.kt @@ -28,7 +28,6 @@ import com.squareup.workflow.diagnostic.createId import com.squareup.workflow.internal.RealRenderContext.WorkerRunner import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel @@ -40,6 +39,8 @@ import kotlin.coroutines.CoroutineContext /** * A node in a state machine tree. Manages the actual state for a given [Workflow]. * + * @param emitOutputToParent A function that this node will call when it needs to emit an output + * value to its parent. Returns either the output to be emitted from the root workflow, or null. * @param initialState Allows unit tests to start the node from a given state, instead of calling * [StatefulWorkflow.initialState]. */ @@ -50,6 +51,7 @@ internal class WorkflowNode( initialProps: PropsT, snapshot: ByteString?, baseContext: CoroutineContext, + private val emitOutputToParent: (OutputT) -> Any? = { it }, parentDiagnosticId: Long? = null, private val diagnosticListener: WorkflowDiagnosticListener? = null, private val idCounter: IdCounter? = null, @@ -69,8 +71,9 @@ internal class WorkflowNode( */ internal val diagnosticId = idCounter.createId() - private val subtreeManager = - SubtreeManager(coroutineContext, diagnosticId, diagnosticListener, idCounter) + private val subtreeManager = SubtreeManager( + coroutineContext, ::applyAction, diagnosticId, diagnosticListener, idCounter + ) private val workers = ActiveStagingList>() @@ -158,20 +161,9 @@ internal class WorkflowNode( * * It is an error to call this method after calling [cancel]. */ - @UseExperimental(InternalCoroutinesApi::class) - fun tick( - selector: SelectBuilder, - handler: (OutputT) -> T? - ) { - fun acceptUpdate(action: WorkflowAction): T? { - val (newState, output) = action.applyTo(state) - diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output) - state = newState - return output?.let(handler) - } - + fun tick(selector: SelectBuilder) { // Listen for any child workflow updates. - subtreeManager.tickChildren(selector, ::acceptUpdate) + subtreeManager.tickChildren(selector) // Listen for any subscription updates. workers.forEachActive { child -> @@ -188,7 +180,7 @@ internal class WorkflowNode( } else { val update = child.acceptUpdate(valueOrDone.value) @Suppress("UNCHECKED_CAST") - acceptUpdate(update as WorkflowAction) + return@onReceive applyAction(update as WorkflowAction) } } } @@ -198,7 +190,7 @@ internal class WorkflowNode( with(selector) { eventActionsChannel.onReceive { action -> diagnosticListener?.onSinkReceived(diagnosticId, action) - acceptUpdate(action) + return@onReceive applyAction(action) } } } @@ -256,6 +248,18 @@ internal class WorkflowNode( lastProps = newProps } + /** + * Applies [action] to this workflow's [state] and + * [emits an output to its parent][emitOutputToParent] if necessary. + */ + private fun applyAction(action: WorkflowAction): T? { + val (newState, output) = action.applyTo(state) + diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output) + state = newState + @Suppress("UNCHECKED_CAST") + return output?.let(emitOutputToParent) as T? + } + private fun createWorkerNode( worker: Worker, key: String, diff --git a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt index 958a72ec0..36d698297 100644 --- a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt +++ b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/SubtreeManagerTest.kt @@ -99,8 +99,7 @@ class SubtreeManagerTest { private val context = Unconfined @Test fun `render starts new child`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() manager.render(workflow, "props", key = "", handler = { fail() }) @@ -108,8 +107,7 @@ class SubtreeManagerTest { } @Test fun `render doesn't start existing child`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render() = manager.render(workflow, "props", key = "", handler = { fail() }) .also { manager.commitRenderedChildren() } @@ -121,8 +119,7 @@ class SubtreeManagerTest { } @Test fun `render restarts child after tearing down`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render() = manager.render(workflow, "props", key = "", handler = { fail() }) .also { manager.commitRenderedChildren() } @@ -138,8 +135,7 @@ class SubtreeManagerTest { } @Test fun `render throws on duplicate key`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() manager.render(workflow, "props", "foo", handler = { fail() }) @@ -153,8 +149,7 @@ class SubtreeManagerTest { } @Test fun `render returns child rendering`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() val (composeProps, composeState) = manager.render( @@ -165,8 +160,7 @@ class SubtreeManagerTest { } @Test fun `tick children handles child output`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() val handler: StringHandler = { output -> action { setOutput("case output:$output") } @@ -189,8 +183,7 @@ class SubtreeManagerTest { } @Test fun `render updates child's output handler`() { - val manager = - SubtreeManager(context, parentDiagnosticId = 0, diagnosticListener = null) + val manager = subtreeManagerForTest() val workflow = TestWorkflow() fun render(handler: StringHandler) = manager.render(workflow, "props", key = "", handler = handler) @@ -219,7 +212,7 @@ class SubtreeManagerTest { // See https://github.com/square/workflow/issues/404 @Test fun `createChildSnapshot snapshots eagerly`() { - val manager = SubtreeManager(Unconfined, parentDiagnosticId = 0) + val manager = subtreeManagerForTest() val workflow = SnapshotTestWorkflow() assertEquals(0, workflow.snapshots) @@ -232,7 +225,7 @@ class SubtreeManagerTest { // See https://github.com/square/workflow/issues/404 @Test fun `createChildSnapshot serializes lazily`() { - val manager = SubtreeManager(Unconfined, parentDiagnosticId = 0) + val manager = subtreeManagerForTest() val workflow = SnapshotTestWorkflow() assertEquals(0, workflow.serializes) @@ -246,11 +239,9 @@ class SubtreeManagerTest { assertEquals(1, workflow.serializes) } - private suspend fun SubtreeManager.tickAction(): WorkflowAction? { - return select { - tickChildren(this) { update -> - return@tickChildren update - } - } - } + private suspend fun SubtreeManager.tickAction(): WorkflowAction? = + select { tickChildren(this) } + + private fun subtreeManagerForTest() = + SubtreeManager(context, emitActionToParent = { it }, parentDiagnosticId = 0) } diff --git a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt index 928006bba..0878a1588 100644 --- a/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt +++ b/kotlin/workflow-runtime/src/test/java/com/squareup/workflow/internal/WorkflowNodeTest.kt @@ -21,12 +21,14 @@ import com.squareup.workflow.RenderContext import com.squareup.workflow.Sink import com.squareup.workflow.Snapshot import com.squareup.workflow.StatefulWorkflow +import com.squareup.workflow.Worker import com.squareup.workflow.Workflow import com.squareup.workflow.WorkflowAction import com.squareup.workflow.WorkflowAction.Companion.emitOutput import com.squareup.workflow.WorkflowAction.Updater import com.squareup.workflow.action import com.squareup.workflow.asWorker +import com.squareup.workflow.contraMap import com.squareup.workflow.makeEventSink import com.squareup.workflow.parse import com.squareup.workflow.readUtf8WithLength @@ -165,7 +167,7 @@ class WorkflowNodeTest { return "" } } - val node = WorkflowNode(workflow.id(), workflow, "", null, context) + val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" }) node.render(workflow, "") sink.send("event") @@ -173,7 +175,7 @@ class WorkflowNodeTest { val result = runBlocking { withTimeout(10) { select { - node.tick(this) { "tick:$it" } + node.tick(this) } } } @@ -200,7 +202,7 @@ class WorkflowNodeTest { return "" } } - val node = WorkflowNode(workflow.id(), workflow, "", null, context) + val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" }) node.render(workflow, "") sink.send("event") @@ -208,14 +210,14 @@ class WorkflowNodeTest { val result = runBlocking { withTimeout(10) { - List(2) { i -> + List(2) { select { - node.tick(this) { "tick$i:$it" } + node.tick(this) } } } } - assertEquals(listOf("tick0:event", "tick1:event2"), result) + assertEquals(listOf("tick:event", "tick:event2"), result) } @Test fun `send allows subsequent events on same rendering`() { @@ -312,7 +314,7 @@ class WorkflowNodeTest { try { withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } fail("Expected exception") @@ -324,7 +326,7 @@ class WorkflowNodeTest { withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } } @@ -379,7 +381,7 @@ class WorkflowNodeTest { // This tick will process the event handler, it won't close the channel yet. withTimeout(1) { select { - node.tick(this) { it } + node.tick(this) } } @@ -749,7 +751,7 @@ class WorkflowNodeTest { // update. launch(start = UNDISPATCHED) { select { - node.tick(this) { null } + node.tick(this) } } yield() @@ -805,7 +807,7 @@ class WorkflowNodeTest { // update. launch(start = UNDISPATCHED) { select { - node.tick(this) { null } + node.tick(this) } } yield() @@ -873,4 +875,166 @@ class WorkflowNodeTest { error.message ) } + + @Test fun `actionSink action changes state`() { + val workflow = Workflow.stateful>>( + initialState = { "initial" }, + render = { _, state -> + state to actionSink.contraMap { + action { nextState = "$nextState->$it" } + } + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + val (_, sink) = node.render(workflow.asStatefulWorkflow(), Unit) + + sink.send("hello") + + runBlocking { + select { + node.tick(this) + } + } + + val (state, _) = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `actionSink action emits output`() { + val workflow = Workflow.stateless> { + actionSink.contraMap { action { setOutput(it) } } + } + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + val rendering = node.render(workflow.asStatefulWorkflow(), Unit) + + rendering.send("hello") + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:hello", output) + } + + @Test fun `worker action changes state`() { + val workflow = Workflow.stateful( + initialState = { "initial" }, + render = { _, state -> + runningWorker(Worker.from { "hello" }) { action { nextState = "$nextState->$it" } } + return@stateful state + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + runBlocking { + select { + node.tick(this) + } + } + + val state = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `worker action emits output`() { + val workflow = Worker.from { "hello" } + .asWorkflow() + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:hello", output) + } + + @Test fun `child action changes state`() { + val child = Worker.from { "hello" } + .asWorkflow() + val workflow = Workflow.stateful( + initialState = { "initial" }, + render = { _, state -> + renderChild(child) { action { nextState = "$nextState->$it" } } + return@stateful state + } + ) + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + runBlocking { + select { + node.tick(this) + } + } + + val state = node.render(workflow.asStatefulWorkflow(), Unit) + assertEquals("initial->hello", state) + } + + @Test fun `child action emits output`() { + val child = Worker.from { "hello" } + .asWorkflow() + val workflow = Workflow.stateless { + renderChild(child) { action { setOutput("child:$it") } } + } + val node = WorkflowNode( + workflow.id(), + workflow.asStatefulWorkflow(), + initialProps = Unit, + snapshot = null, + baseContext = Unconfined, + emitOutputToParent = { "output:$it" } + ) + node.render(workflow.asStatefulWorkflow(), Unit) + + val output = runBlocking { + select { + node.tick(this) + } + } + + assertEquals("output:child:hello", output) + } + + private fun Worker.asWorkflow() = Workflow.stateless { + runningWorker(this@asWorkflow) { action { setOutput(it) } } + } }