Skip to content

Commit

Permalink
WIP Simplified action queueing implementation.
Browse files Browse the repository at this point in the history
Based on #916.

Implements and closes #910.
  • Loading branch information
zach-klippenstein committed Jan 25, 2020
1 parent 4f55451 commit 010f9fc
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.squareup.workflow.internal

/**
* TODO write documentation
*/
internal typealias PendingUpdate = () -> Any?

interface PendingUpdateSink {
/**
* Enqueues [update] and suspends until the update has been accepted for execution.
*
* Use this method to accept backpressure from the runtime when there is contention.
*/
suspend fun update(update: PendingUpdate)

/**
* Enqueues [update] and returns immediately.
*
* Use this method for updates triggered from non-coroutine contexts, such as UI events.
*/
fun enqueueUpdate(update: PendingUpdate)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.squareup.workflow.Sink
import com.squareup.workflow.Worker
import com.squareup.workflow.Workflow
import com.squareup.workflow.WorkflowAction
import kotlinx.coroutines.channels.SendChannel

/**
* An implementation of [RenderContext] that builds a [Behavior] via [freeze].
Expand All @@ -33,7 +32,7 @@ import kotlinx.coroutines.channels.SendChannel
class RealRenderContext<StateT, OutputT : Any>(
private val renderer: Renderer<StateT, OutputT>,
private val workerRunner: WorkerRunner<StateT, OutputT>,
private val eventActionsChannel: SendChannel<WorkflowAction<StateT, OutputT>>
private val pendingUpdateSink: PendingUpdateSink
) : RenderContext<StateT, OutputT>, Sink<WorkflowAction<StateT, OutputT>> {

interface Renderer<StateT, OutputT : Any> {
Expand Down Expand Up @@ -72,7 +71,7 @@ class RealRenderContext<StateT, OutputT : Any>(
// Run the handler synchronously, so we only have to emit the resulting action and don't
// need the update channel to be generic on each event type.
val action = handler(event)
eventActionsChannel.offer(action)
pendingUpdateSink.enqueueUpdate { TODO() }
}
}

Expand All @@ -82,7 +81,7 @@ class RealRenderContext<StateT, OutputT : Any>(
"Expected sink to not be sent to until after the render pass. Received action: $value"
)
}
eventActionsChannel.offer(value)
pendingUpdateChannel.offer(value)
}

override fun <ChildPropsT, ChildOutputT : Any, ChildRenderingT> renderChild(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus

/**
Expand All @@ -42,23 +40,33 @@ import kotlinx.coroutines.plus
internal fun <T> CoroutineScope.launchWorker(
worker: Worker<T>,
key: String,
pendingUpdateSink: PendingUpdateSink,
workerDiagnosticId: Long,
workflowDiagnosticId: Long,
diagnosticListener: WorkflowDiagnosticListener?
): ReceiveChannel<ValueOrDone<T>> = worker.runWithNullCheck()
.wireUpDebugger(workerDiagnosticId, workflowDiagnosticId, diagnosticListener)
.transformToValueOrDone()
.catch { e ->
// Workers that failed (as opposed to just cancelled) should have their failure reason
// re-thrown from the workflow runtime. If we don't unwrap the cause here, they'll just
// cause the runtime to cancel.
val cancellationCause = e.unwrapCancellationCause()
throw cancellationCause ?: e
) {
val workerFlow = worker.runWithNullCheck()
.wireUpDebugger(workerDiagnosticId, workflowDiagnosticId, diagnosticListener)
.transformToValueOrDone()
.catch { e ->
// Workers that failed (as opposed to just cancelled) should have their failure reason
// re-thrown from the workflow runtime. If we don't unwrap the cause here, they'll just
// cause the runtime to cancel.
val cancellationCause = e.unwrapCancellationCause()
throw cancellationCause ?: e
}
val workerScope = createWorkerScope(worker, key)

workerScope.launch {
workerFlow.collect {
pendingUpdateSink.update { TODO() }
}
// produceIn implicitly creates a buffer (it uses a Channel to bridge between contexts). This
// operator is required to override the default buffer size.
.buffer(RENDEZVOUS)
.produceIn(createWorkerScope(worker, key))
}
}
// // produceIn implicitly creates a buffer (it uses a Channel to bridge between contexts). This
// // operator is required to override the default buffer size.
// .buffer(RENDEZVOUS)
// .produceIn(createWorkerScope(worker, key))

/**
* In unit tests, if you use a mocking library to create a Worker, the run method will return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.SelectBuilder
import okio.ByteString
import kotlin.coroutines.CoroutineContext
Expand All @@ -51,6 +49,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
initialProps: PropsT,
snapshot: ByteString?,
baseContext: CoroutineContext,
private val pendingUpdateSink: PendingUpdateSink,
private val emitOutputToParent: (OutputT) -> Any? = { it },
parentDiagnosticId: Long? = null,
private val diagnosticListener: WorkflowDiagnosticListener? = null,
Expand Down Expand Up @@ -81,8 +80,6 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(

private var lastProps: PropsT = initialProps

private val eventActionsChannel = Channel<WorkflowAction<StateT, OutputT>>(capacity = UNLIMITED)

init {
var restoredFromSnapshot = false
state = if (initialState != null) {
Expand Down Expand Up @@ -222,7 +219,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
val context = RealRenderContext(
renderer = subtreeManager,
workerRunner = this,
eventActionsChannel = eventActionsChannel
pendingUpdateChannel = pendingUpdateChannel
)
diagnosticListener?.onBeforeWorkflowRendered(diagnosticId, props, state)
val rendering = workflow.render(props, state, context)
Expand Down Expand Up @@ -270,8 +267,11 @@ internal class WorkflowNode<PropsT, StateT, OutputT : Any, RenderingT>(
workerId = idCounter.createId()
diagnosticListener.onWorkerStarted(workerId, diagnosticId, key, worker.toString())
}
val workerChannel = launchWorker(worker, key, workerId, diagnosticId, diagnosticListener)
return WorkerChildNode(worker, key, workerChannel, handler = handler)
// val workerChannel = launchWorker(worker, key, workerId, diagnosticId, diagnosticListener)
// return WorkerChildNode(worker, key, workerChannel, handler = handler)
val pendingUpdate = {

}
}

private fun ByteString.restoreState(): Snapshot? {
Expand Down

0 comments on commit 010f9fc

Please sign in to comment.