Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-klippenstein committed Jan 25, 2020
1 parent 6f59dc0 commit 00011b8
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import kotlin.coroutines.CoroutineContext
*/
internal class SubtreeManager<StateT, OutputT : Any>(
private val contextForChildren: CoroutineContext,
private val emitActionToParent: (WorkflowAction<StateT, OutputT>) -> Any?,
private val parentDiagnosticId: Long,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
private val idCounter: IdCounter? = null
Expand Down Expand Up @@ -150,16 +151,9 @@ internal class SubtreeManager<StateT, OutputT : Any>(
* Uses [selector] to invoke [WorkflowNode.tick] for every running child workflow this instance
* is managing.
*/
fun <T : Any> tickChildren(
selector: SelectBuilder<T?>,
handler: (WorkflowAction<StateT, OutputT>) -> T?
) {
fun <T : Any> tickChildren(selector: SelectBuilder<T?>) {
children.forEachActive { child ->
child.workflowNode.tick(selector) { output ->
val componentUpdate = child.acceptChildOutput(output)
@Suppress("UNCHECKED_CAST")
return@tick handler(componentUpdate as WorkflowAction<StateT, OutputT>)
}
child.workflowNode.tick(selector)
}
}

Expand Down Expand Up @@ -187,16 +181,25 @@ internal class SubtreeManager<StateT, OutputT : Any>(
handler: (ChildOutputT) -> WorkflowAction<StateT, OutputT>
): WorkflowChildNode<ChildPropsT, ChildOutputT, StateT, OutputT> {
val id = child.id(key)
lateinit var node: WorkflowChildNode<ChildPropsT, ChildOutputT, StateT, OutputT>

fun acceptChildOutput(output: ChildOutputT): Any? {
val action = node.acceptChildOutput(output)
return emitActionToParent(action)
}

val workflowNode = WorkflowNode(
id,
child.asStatefulWorkflow(),
initialProps,
snapshotCache[id],
contextForChildren,
::acceptChildOutput,
parentDiagnosticId,
diagnosticListener,
idCounter
)
return WorkflowChildNode(child, handler, workflowNode)
.also { node = it }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ internal open class RealWorkflowLoop : WorkflowLoop {
}

// Tick the workflow tree.
rootNode.tick(this) { it }
rootNode.tick(this)
}
}
// Compiler gets confused, and thinks both that this throw is unreachable, and without the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import com.squareup.workflow.diagnostic.createId
import com.squareup.workflow.internal.RealRenderContext.WorkerRunner
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
Expand All @@ -40,6 +39,8 @@ import kotlin.coroutines.CoroutineContext
/**
* A node in a state machine tree. Manages the actual state for a given [Workflow].
*
* @param emitOutputToParent A function that this node will call when it needs to emit an output
* value to its parent. Returns either the output to be emitted from the root workflow, or null.
* @param initialState Allows unit tests to start the node from a given state, instead of calling
* [StatefulWorkflow.initialState].
*/
Expand All @@ -50,6 +51,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
initialProps: PropsT,
snapshot: ByteString?,
baseContext: CoroutineContext,
private val emitOutputToParent: (OutputT) -> Any? = { it },
parentDiagnosticId: Long? = null,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
private val idCounter: IdCounter? = null,
Expand All @@ -70,7 +72,9 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
internal val diagnosticId = idCounter.createId()

private val subtreeManager =
SubtreeManager<StateT, OutputT>(coroutineContext, diagnosticId, diagnosticListener, idCounter)
SubtreeManager<StateT, OutputT>(
coroutineContext, ::applyAction, diagnosticId, diagnosticListener, idCounter
)

private val workers = ActiveStagingList<WorkerChildNode<*, *, *>>()

Expand Down Expand Up @@ -158,20 +162,9 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
*
* It is an error to call this method after calling [cancel].
*/
@UseExperimental(InternalCoroutinesApi::class)
fun <T : Any> tick(
selector: SelectBuilder<T?>,
handler: (OutputT) -> T?
) {
fun acceptUpdate(action: WorkflowAction<StateT, OutputT>): T? {
val (newState, output) = action.applyTo(state)
diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output)
state = newState
return output?.let(handler)
}

fun <T : Any> tick(selector: SelectBuilder<T?>) {
// Listen for any child workflow updates.
subtreeManager.tickChildren(selector, ::acceptUpdate)
subtreeManager.tickChildren(selector)

// Listen for any subscription updates.
workers.forEachActive { child ->
Expand All @@ -188,7 +181,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
} else {
val update = child.acceptUpdate(valueOrDone.value)
@Suppress("UNCHECKED_CAST")
acceptUpdate(update as WorkflowAction<StateT, OutputT>)
return@onReceive applyAction(update as WorkflowAction<StateT, OutputT>)
}
}
}
Expand All @@ -198,7 +191,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
with(selector) {
eventActionsChannel.onReceive { action ->
diagnosticListener?.onSinkReceived(diagnosticId, action)
acceptUpdate(action)
return@onReceive applyAction(action)
}
}
}
Expand Down Expand Up @@ -256,6 +249,18 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
lastProps = newProps
}

/**
* Applies [action] to this workflow's [state] and
* [emits an output to its parent][emitOutputToParent] if necessary.
*/
private fun <T : Any> applyAction(action: WorkflowAction<StateT, OutputT>): T? {
val (newState, output) = action.applyTo(state)
diagnosticListener?.onWorkflowAction(diagnosticId, action, state, newState, output)
state = newState
@Suppress("UNCHECKED_CAST")
return output?.let(emitOutputToParent) as T?
}

private fun <T> createWorkerNode(
worker: Worker<T>,
key: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,15 @@ class SubtreeManagerTest {
private val context = Unconfined

@Test fun `render starts new child`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()

manager.render(workflow, "props", key = "", handler = { fail() })
assertEquals(1, workflow.started)
}

@Test fun `render doesn't start existing child`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()
fun render() = manager.render(workflow, "props", key = "", handler = { fail() })
.also { manager.commitRenderedChildren() }
Expand All @@ -121,8 +119,7 @@ class SubtreeManagerTest {
}

@Test fun `render restarts child after tearing down`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()
fun render() = manager.render(workflow, "props", key = "", handler = { fail() })
.also { manager.commitRenderedChildren() }
Expand All @@ -138,8 +135,7 @@ class SubtreeManagerTest {
}

@Test fun `render throws on duplicate key`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()
manager.render(workflow, "props", "foo", handler = { fail() })

Expand All @@ -153,8 +149,7 @@ class SubtreeManagerTest {
}

@Test fun `render returns child rendering`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()

val (composeProps, composeState) = manager.render(
Expand All @@ -165,8 +160,7 @@ class SubtreeManagerTest {
}

@Test fun `tick children handles child output`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()
val handler: StringHandler = { output ->
action { setOutput("case output:$output") }
Expand All @@ -189,8 +183,7 @@ class SubtreeManagerTest {
}

@Test fun `render updates child's output handler`() {
val manager =
SubtreeManager<String, String>(context, parentDiagnosticId = 0, diagnosticListener = null)
val manager = subtreeManagerForTest<String, String>()
val workflow = TestWorkflow()
fun render(handler: StringHandler) =
manager.render(workflow, "props", key = "", handler = handler)
Expand Down Expand Up @@ -219,7 +212,7 @@ class SubtreeManagerTest {

// See https://github.com/square/workflow/issues/404
@Test fun `createChildSnapshot snapshots eagerly`() {
val manager = SubtreeManager<Unit, Nothing>(Unconfined, parentDiagnosticId = 0)
val manager = subtreeManagerForTest<Unit, Nothing>()
val workflow = SnapshotTestWorkflow()
assertEquals(0, workflow.snapshots)

Expand All @@ -232,7 +225,7 @@ class SubtreeManagerTest {

// See https://github.com/square/workflow/issues/404
@Test fun `createChildSnapshot serializes lazily`() {
val manager = SubtreeManager<Unit, Nothing>(Unconfined, parentDiagnosticId = 0)
val manager = subtreeManagerForTest<Unit, Nothing>()
val workflow = SnapshotTestWorkflow()
assertEquals(0, workflow.serializes)

Expand All @@ -246,11 +239,9 @@ class SubtreeManagerTest {
assertEquals(1, workflow.serializes)
}

private suspend fun <S, O : Any> SubtreeManager<S, O>.tickAction(): WorkflowAction<S, O>? {
return select {
tickChildren(this) { update ->
return@tickChildren update
}
}
}
private suspend fun <S, O : Any> SubtreeManager<S, O>.tickAction(): WorkflowAction<S, O>? =
select { tickChildren(this) }

private fun <S, O : Any> subtreeManagerForTest() =
SubtreeManager<S, O>(context, emitActionToParent = { it }, parentDiagnosticId = 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,15 @@ class WorkflowNodeTest {
return ""
}
}
val node = WorkflowNode(workflow.id(), workflow, "", null, context)
val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" })
node.render(workflow, "")

sink.send("event")

val result = runBlocking {
withTimeout(10) {
select<String?> {
node.tick(this) { "tick:$it" }
node.tick(this)
}
}
}
Expand All @@ -200,22 +200,22 @@ class WorkflowNodeTest {
return ""
}
}
val node = WorkflowNode(workflow.id(), workflow, "", null, context)
val node = WorkflowNode(workflow.id(), workflow, "", null, context, { "tick:$it" })
node.render(workflow, "")

sink.send("event")
sink.send("event2")

val result = runBlocking {
withTimeout(10) {
List(2) { i ->
List(2) {
select<String?> {
node.tick(this) { "tick$i:$it" }
node.tick(this)
}
}
}
}
assertEquals(listOf("tick0:event", "tick1:event2"), result)
assertEquals(listOf("tick:event", "tick:event2"), result)
}

@Test fun `send allows subsequent events on same rendering`() {
Expand Down Expand Up @@ -312,7 +312,7 @@ class WorkflowNodeTest {
try {
withTimeout(1) {
select<String?> {
node.tick(this) { it }
node.tick(this)
}
}
fail("Expected exception")
Expand All @@ -324,7 +324,7 @@ class WorkflowNodeTest {

withTimeout(1) {
select<String?> {
node.tick(this) { it }
node.tick(this)
}
}
}
Expand Down Expand Up @@ -379,7 +379,7 @@ class WorkflowNodeTest {
// This tick will process the event handler, it won't close the channel yet.
withTimeout(1) {
select<String?> {
node.tick(this) { it }
node.tick(this)
}
}

Expand Down Expand Up @@ -739,7 +739,8 @@ class WorkflowNodeTest {
snapshot = null,
baseContext = Unconfined,
parentDiagnosticId = 42,
diagnosticListener = listener
diagnosticListener = listener,
emitOutputToParent = { null }
)
node.render(workflow.asStatefulWorkflow(), "new props")
listener.consumeEvents()
Expand All @@ -749,7 +750,7 @@ class WorkflowNodeTest {
// update.
launch(start = UNDISPATCHED) {
select<String?> {
node.tick(this) { null }
node.tick(this)
}
}
yield()
Expand Down Expand Up @@ -795,7 +796,8 @@ class WorkflowNodeTest {
snapshot = null,
baseContext = Unconfined,
parentDiagnosticId = 42,
diagnosticListener = listener
diagnosticListener = listener,
emitOutputToParent = { null }
)
val rendering = node.render(workflow.asStatefulWorkflow(), "new props")
listener.consumeEvents()
Expand All @@ -805,7 +807,7 @@ class WorkflowNodeTest {
// update.
launch(start = UNDISPATCHED) {
select<String?> {
node.tick(this) { null }
node.tick(this)
}
}
yield()
Expand Down

0 comments on commit 00011b8

Please sign in to comment.