Skip to content

Commit

Permalink
Merge pull request #486 from MAIF/fix-cacheablequeue
Browse files Browse the repository at this point in the history
Improve cacheable queue (#472)
  • Loading branch information
pierrebruninmaif authored Dec 17, 2020
2 parents f977f10 + bce59a5 commit 4582f6e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 22 deletions.
51 changes: 29 additions & 22 deletions izanami-server/app/libs/streams/streams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package libs.streams

import akka.NotUsed
import akka.stream.OverflowStrategy.backpressure
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Source, SourceQueueWithComplete, Zip}
import akka.stream.{FlowShape, Materializer, QueueOfferResult}
import akka.stream.scaladsl.{Broadcast, BroadcastHub, Flow, GraphDSL, Keep, Sink, Source, SourceQueueWithComplete, Zip}
import akka.stream.{FlowShape, Materializer, OverflowStrategies, OverflowStrategy, QueueOfferResult}
import cats.implicits._
import domains.Key
import libs.streams.CacheableQueue.{Element, QueueElement}
Expand All @@ -24,9 +24,8 @@ object syntax {
{ err =>
IzanamiLogger.error(s"Error parsing $v : $err")
List.empty[(Key, V)]
}, { v =>
List((k, v))
}
},
v => List((k, v))
)
}
}
Expand All @@ -41,9 +40,7 @@ object Flows {

val bcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[Out, Int]())
val count = Flow[In].fold(0) { (acc, _) =>
acc + 1
}
val count = Flow[In].fold(0)((acc, _) => acc + 1)

bcast ~> count ~> zip.in1
bcast ~> aFlow ~> zip.in0
Expand All @@ -54,10 +51,11 @@ object Flows {

}

case class CacheableQueue[T](queue: SourceQueueWithComplete[QueueElement[T]],
sourceWithCache: Source[T, NotUsed],
rawSource: Source[T, NotUsed])
extends SourceQueueWithComplete[T] {
case class CacheableQueue[T](
queue: SourceQueueWithComplete[QueueElement[T]],
sourceWithCache: Source[T, NotUsed],
rawSource: Source[T, NotUsed]
) extends SourceQueueWithComplete[T] {

override def offer(elem: T): Future[QueueOfferResult] = queue.offer(Element(elem))
override def watchCompletion() = queue.watchCompletion()
Expand Down Expand Up @@ -92,13 +90,18 @@ object CacheableQueue {
case class State[T](current: T, elements: Seq[T] = Seq.empty, capacity: Int) extends QueueState[T]
case class Starter[T](elements: Seq[T] = Seq.empty, capacity: Int) extends QueueState[T]

def apply[T](capacity: Int, queueBufferSize: Int = 50, broadcastCapacity: Int = 256)(
def apply[T](
capacity: Int,
queueBufferSize: Int = 50,
broadcastCapacity: Int = 256,
overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure
)(
implicit mat: Materializer
): CacheableQueue[T] = {

val (queue, rawSource: Source[QueueElement[T], NotUsed]) =
Source
.queue[QueueElement[T]](queueBufferSize, backpressure)
.queue[QueueElement[T]](queueBufferSize, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink(2))(Keep.both)
.run()

Expand Down Expand Up @@ -132,16 +135,20 @@ object CacheableQueue {
}
}

val sourceWithCache = source.mapMaterializedValue { n =>
queue.offer(Fake[T]())
n
}
val nativeSource = rawSource
.collect {
case Element(e) => e
}
sourceWithCache.runWith(Sink.ignore)
nativeSource.runWith(Sink.ignore)
CacheableQueue(
queue,
source.mapMaterializedValue { n =>
queue.offer(Fake[T]())
n
},
rawSource
.collect {
case Element(e) => e
}
sourceWithCache,
nativeSource
)
}

Expand Down
86 changes: 86 additions & 0 deletions izanami-server/test/libs/streams/CacheableQueueTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package libs.streams

import akka.actor.ActorSystem
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.{MustMatchers, OptionValues, WordSpec}

import java.util.concurrent.atomic.AtomicBoolean
import scala.util.{Failure, Try}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}

class CacheableQueueTest extends WordSpec with MustMatchers with OptionValues {

implicit val system = ActorSystem()
implicit val mat = Materializer(system)
import system.dispatcher

"Cacheable queue" must {

"handle heavy concurent publish fail with backpressure strategy" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)

val hasFailed = new AtomicBoolean(false)

val allOffer = Future.sequence((0 to 700).map { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
case _ =>
}
result
})

Try {
Await.result(allOffer, 20.seconds)
}

// Fail because too many concurrent offer
hasFailed.get() mustBe true
}

"handle heavy publish with consumer" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)

queue.rawSource.runWith(Sink.ignore)
queue.sourceWithCache.runWith(Sink.ignore)

val hasFailed = new AtomicBoolean(false)

val allOffer = Source((0 to 1000).toList).mapAsync(1) { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
case _ =>
}
result
}

Await.result(allOffer.runWith(Sink.ignore), 20.seconds)

hasFailed.get() mustBe false
}

"handle normmal publish without consumer" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)

val hasFailed = new AtomicBoolean(false)

val allOffer = Source((0 to 1000).toList).mapAsync(1) { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
case _ =>
}
result
}

Await.result(allOffer.runWith(Sink.ignore), 20.seconds)

hasFailed.get() mustBe false
}

}

}

0 comments on commit 4582f6e

Please sign in to comment.