Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow 的常用扩展 #14

Open
kkoshin opened this issue Sep 13, 2023 · 8 comments
Open

Flow 的常用扩展 #14

kkoshin opened this issue Sep 13, 2023 · 8 comments
Milestone

Comments

@kkoshin
Copy link
Member

kkoshin commented Sep 13, 2023

Flow API 相比 Rx 要少很多,对于一些常用的情况需要自己手动组合。

大致的情况是,给上游发送请求增加限制,比如,仅在一定窗口时间内。

对下游的处理来说,一般来说是考虑怎么挑选要处理的请求,比如,只要当前buffer里的最新的一个。

@kkoshin kkoshin changed the title Flow 的常见用法 Flow 的常用扩展 Sep 13, 2023
@kkoshin
Copy link
Member Author

kkoshin commented Oct 9, 2023

常用操作符:

  • debounce:上游发送的元素间隔之间间隔多少,把触发非常频繁的事件(比如按键)合并成一次执行。
    • 类比生活中的电梯,电梯关门之前,有人按的话,会打断电梯,等待电梯关上门后才会运行(电梯关门的时间就是所设定的时间)适用场景:快速按键、快速编辑
  • throttle: 充当定时器,每个时间窗口里(比如 16ms)最多执行一次,和上游发送的元素间隔无关,仅和收到这个元素的时间点有关。

@kkoshin kkoshin added this to the 1.0 milestone Jan 11, 2024
@kkoshin
Copy link
Member Author

kkoshin commented Jan 11, 2024

@kkoshin kkoshin modified the milestones: 1.0, 1.1 Jan 17, 2024
@kkoshin
Copy link
Member Author

kkoshin commented Mar 26, 2024

所谓 Hot/Cold 流,在 Kotlin 里的 Hot 流叫 SharedFlow(注意 Shared 一词),从使用场景出发就是避免在上游创建一堆的 Listenner,在下游订阅与上游链接之间,创建了一个“分发站”。

节省了上游的订阅压力,同时,将订阅时机的以及消息的缓存等颗粒度更小的调整掌握在了下游处。

有意思的是,因为下游的订阅者(此时称呼上从 Collector 改为 Subscriber 了)共享这一条流,所以也就叫 SharedFlow。

https://proandroiddev.com/seven-recipes-to-understand-flows-and-asynchrony-in-kotlin-1bd7fe041480

@kkoshin
Copy link
Member Author

kkoshin commented Mar 26, 2024

One Shot 的场景:适合直接使用 suspend function
Callback 的场景:才适合使用 Flow

Flow 建立在 Coroutine 的基础上,所以 Flow 的数据处理也会当 Scope 的 cancel 而 cancel。

Cold 的 Flow,是一个一对一的订阅关系,收集者之间没有干扰,每次都是一个新的,在调用 collect 时才算是建立了连接,数据才开始发送。
Unlike a cold flow built using the flow builder, a StateFlow is hot: collecting from the flow doesn't trigger any producer code.

常见误区(https://proandroiddev.com/are-you-sure-you-know-how-kotlin-flow-works-e070f6d00cbc):

  • Cold flow 里的收集者收集到的数据一定是一样的。(这个理解是错误的,Cold Flow 的上游如果是一个不受控制的 callbackFlow 产生的,在 collect 之前上游产生的数据都不可能收集到。容易有这个错误的认识是,常见的 cold flow 的例子都习惯用 flowOf,每次 collect 的数据集自然是一样的,和时机无关。)
  • 在 collect 方法后面再调用对应的 suspend function,后面的代码只有等这个 flow 全部结束后才有机会执行到(collect 是一个 suspend function,而 cold Flow 开始 collect 后就会处于 suspended 状态,在收集完所有的数据之前,后面的代码没有机会执行到,特别注意的是,有些 cold flow 的数据集一开始就确定下来,是有限的,而有些是无限的,不可能触发收集完的情况)
  • callbackFlow 无法停止?(调用 close 会关闭这个 flow,类似于 flowOf 的 emit 发送完毕了)
  • flowOf("a", "b", "c").shareIn(coroutineScope, SharingStarted.Eagerly) Eagerly 模式下会立刻触发,可以理解为是立刻收集完所有的,然后就结束了,而且因为默认 replayCount 为 0,最终导致下游收集不到任何数据

@kkoshin
Copy link
Member Author

kkoshin commented Mar 29, 2024

关于 flow,是建立在 kotlin coroutines,也就是本身也是遵循 structure concurrency,与之对应的是,RxJava 是建立在 Java 的 concurrency 上,所以实现细节会有所不同。

另外,Flow 不应该简单的理解为另一个 reactive toolkit,这样就丢失了 kotlin coroutines 背后的设计理念。更合适的理解是 “A suspending forEach”,suspend function 的多次调用,对于 competition 和 error 不需要特别的 event,概念上会简化不少。

推荐阅读:https://code.cash.app/rx-to-coroutines-concepts-cold-flows

@kkoshin
Copy link
Member Author

kkoshin commented Mar 29, 2024

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}

@kkoshin
Copy link
Member Author

kkoshin commented Mar 29, 2024

suspend fun pollForBoosts() {
  while(true) {
    try {
      currentBoosts.value = service.getLatestBoosts().boosts
    } catch (e: Exception) {
      Timber.e(e, "Unable to fetch boosts")
    }
    delay(TimeUnit.MINUTES.toMillis(5)
  }
}

var pollingJob: Job? = null
shouldBePolling.distinctUntilChanged().collect { isPolling ->
  pollingJob?.cancel()
  if (isPolling) {
    pollingJob = launch { pollForBoosts() }
  }
}

可以直接使用 collectLatest 来简化代码,唯一需要注意的是,collectLatest 里应该是一个真正的 suspend function,这样才有意义,因为 cancel 对于非 suspend function 不起作用。

shouldBePolling.distinctUntilChanged().collectLatest { isPolling ->
  if (isPolling) {
    pollForBoosts()
  }
}

@kkoshin
Copy link
Member Author

kkoshin commented Apr 1, 2024

However, if it’s a choice between using a complicated string of operators and writing the same thing with a single transform or flow, it is better to avoid the complicated string of operators. Straightline logic is easier to read than operator chaining.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant