Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parEvalMap + Sync => interrupt called for Scope that cannot be interrupted #2371

Closed
taig opened this issue Apr 20, 2021 · 4 comments
Closed

Comments

@taig
Copy link
Contributor

taig commented Apr 20, 2021

import cats.effect.{Concurrent, IO, IOApp, Sync}
import fs2.Stream

object Main extends IOApp.Simple {
  def source[F[_]](implicit F: Concurrent[F]): Stream[F, Int] =
    Stream.emits(1 to 100).covary[F].parEvalMap(maxConcurrent = 2)(x => F.pure(x))

  def runF[F[_]: Sync](source: Stream[F, Int]): F[Int] = source.compile.lastOrError

  override def run: IO[Unit] = runF[IO](source[IO]).void
}
java.lang.IllegalStateException: Scope#interrupt called for Scope that cannot be interrupted
	at fs2.internal.Scope.interruptWhen(Scope.scala:400)
	at fs2.Pull$.$anonfun$compile$12(Pull.scala:1013)
	at fs2.internal.Scope.$anonfun$acquireResource$2(Scope.scala:189)
	at cats.effect.IOFiber.runLoop(IOFiber.scala:493)
	at cats.effect.IOFiber.execR(IOFiber.scala:1077)
	at cats.effect.IOFiber.run(IOFiber.scala:137)
	at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:372)

This is an error I encountered while upgrading to fs2 3. Note that when replacing the Sync constraint with Concurrent (and thereby probably also the inferred compiler instance), the code works as expected. Also parEvalMapUnordered does not trigger the error.

Edit: I tried to come up with a naive replacement in the meantime that just enqueues a bunch of Deferreds but was surprised to see the exact same stacktrace while using the usual fs2.Stream combinators such as concurrently or merge, so the issue is probably more fundamental than parEvalMap.

@mpilquist
Copy link
Member

Yeah, this happens any time a stream containing interruptWhen is compiled with the "synchronous compiler" provided here:

private final class SyncTarget[F[_]: Sync] extends MonadCancelTarget[F] {
private[fs2] def unique: F[Unique.Token] = Sync[F].unique
private[fs2] def ref[A](a: A): F[Ref[F, A]] = Ref[F].of(a)
private[fs2] def interruptContext(root: Unique.Token): Option[F[InterruptContext[F]]] = None
}

With a full Concurrent[F], you get the fully featured compiler here:

private final class ConcurrentTarget[F[_]](
protected implicit val F: Concurrent[F]
) extends MonadCancelTarget[F]()(F) {
private[fs2] def unique: F[Unique.Token] = Concurrent[F].unique
private[fs2] def ref[A](a: A): F[Ref[F, A]] = F.ref(a)
private[fs2] def interruptContext(root: Unique.Token): Option[F[InterruptContext[F]]] = Some(
InterruptContext(root, F.unit)
)
}

Conceptually, interruptWhen could capture a Concurrent[F] constraint, but then translation/mapK gets much more difficult (from 2.5.x:

def translate[F[_], G[_], O](
stream: FreeC[F, O, Unit],
fK: F ~> G
)(implicit G: TranslateInterrupt[G]): FreeC[G, O, Unit] = {
val concurrent: Option[Concurrent[G]] = G.concurrentInstance
def translateAlgEffect[R](self: AlgEffect[F, R]): AlgEffect[G, R] =
self match {
// safe to cast, used in translate only
// if interruption has to be supported concurrent for G has to be passed
case a: Acquire[F, r] =>
Acquire[G, r](fK(a.resource), (r, ec) => fK(a.release(r, ec)))
case e: Eval[F, R] => Eval[G, R](fK(e.value))
case OpenScope(_) => OpenScope[G](concurrent)
case c: CloseScope => c
case g: GetScope[_] => g
case i: InterruptWhen[_] => InterruptWhen[G](fK(i.haltOnSignal))
}
def translateStep[X](next: FreeC[F, X, Unit], isMainLevel: Boolean): FreeC[G, X, Unit] =
next.viewL match {
case result: Result[Unit] => result
case view: ViewL.View[F, X, y, Unit] =>
view.step match {
case output: Output[X] =>
output.transformWith {
case r @ Result.Pure(_) if isMainLevel =>
translateStep(view.next(r), isMainLevel)
case r @ Result.Pure(_) =>
// Cast is safe here, as at this point the evaluation of this Step will end
// and the remainder of the free will be passed as a result in Bind. As such
// next Step will have this to evaluate, and will try to translate again.
view.next(r).asInstanceOf[FreeC[G, X, Unit]]
case r @ Result.Fail(_) => translateStep(view.next(r), isMainLevel)
case r @ Result.Interrupted(_, _) => translateStep(view.next(r), isMainLevel)
}
case stepU: Step[f, x] =>
val step: Step[F, x] = stepU.asInstanceOf[Step[F, x]]
Step[G, x](
stream = translateStep[x](step.stream, false),
scope = step.scope
).transformWith { r =>
translateStep[X](view.next(r.asInstanceOf[Result[y]]), isMainLevel)
}
case alg: AlgEffect[F, r] =>
translateAlgEffect(alg)
.transformWith(r =>
translateStep(view.next(r.asInstanceOf[Result[y]]), isMainLevel)
)
}
}
translateStep[O](stream, true)
}
)

@taig
Copy link
Contributor Author

taig commented May 10, 2021

Thank you for the detailed explanation

@kubukoz
Copy link
Member

kubukoz commented May 28, 2021

What do you think about having this in the error message?

@rossabaker
Copy link
Member

This one is a menace with Doobie. Sharing in case it saves anyone else some time.

I couldn't find interruptWhen in our code or in Doobie's. I started tearing up fs2, and came up with these functions that can embed an InterruptWhen in the stream. More or less the ones that require Concurrent, which isn't surprising in hindsight:

    221:  broadcastThrough
    542:  concurrently
    636:  debounce
    874:  either
   1061:  evalFilterAsync
   1081:  evalFilterNotAsync
   1498:  hold
   1506:  holdOption
   1511:  holdResource
   1524:  holdOptionResource
   1584:  interruptAfter
   1604:  interruptWhen
   1633:  interruptWhen
   1639:  interruptWhen
   1646:  interruptWhen
   1744:  mapAsync
   1753:  mapAsyncUnordered
   1800:  switchMap
   1859:  merge
   1920:  mergeHaltBoth
   1929:  mergeHaltL
   1938:  mergeHaltR
   2065:  parEvalMapUnordered
   2164:  parZip
   2172:  parZipWith
   2178:  pauseWhen
   2205:  prefetch
   2211:  prefetchN
   2642:  timeout
   3785:  observe
   3789:  observeAsync
   3839:  observeEither
   3937:  parJoin
   4063:  parJoinUnbounded
   4465:  timed

WeakAsync.liftK to get an IO ~> ConnectionIO gets us demoted to the Sync compiler, which doobie uses internally in a couple places. We called its PHLOS with a stream sourced from mapAsync and ate flaming death.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants