Skip to content

Commit

Permalink
🐛 Update CacheableQueueTest (#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrebruninmaif committed Dec 17, 2020
1 parent 3197c35 commit 1633ef2
Showing 1 changed file with 7 additions and 32 deletions.
39 changes: 7 additions & 32 deletions izanami-server/test/libs/streams/CacheableQueueTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,23 @@ class CacheableQueueTest extends WordSpec with MustMatchers with OptionValues {

implicit val system = ActorSystem()
implicit val mat = Materializer(system)

import system.dispatcher

"Cacheable queue" must {

"handle heavy concurent publish fail with backpressure strategy" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)
"handle publish fail with backpressure strategy" in {
val queue = CacheableQueue[String](50, queueBufferSize = 50)

val hasFailed = new AtomicBoolean(false)

val allOffer = Future.sequence((0 to 700).map { i =>
val allOffer = Future.sequence((0 to 60).map { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
case _ =>
}
result
})

Try {
Await.result(allOffer, 20.seconds)
}
Expand All @@ -40,34 +39,12 @@ class CacheableQueueTest extends WordSpec with MustMatchers with OptionValues {
hasFailed.get() mustBe true
}

"handle heavy publish with consumer" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)

queue.rawSource.runWith(Sink.ignore)
queue.sourceWithCache.runWith(Sink.ignore)

val hasFailed = new AtomicBoolean(false)

val allOffer = Source((0 to 1000).toList).mapAsync(1) { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
case _ =>
}
result
}

Await.result(allOffer.runWith(Sink.ignore), 20.seconds)

hasFailed.get() mustBe false
}

"handle normmal publish without consumer" in {
val queue = CacheableQueue[String](500, queueBufferSize = 500)
"handle heavy publish" in {
val queue = CacheableQueue[String](50, queueBufferSize = 1000)

val hasFailed = new AtomicBoolean(false)

val allOffer = Source((0 to 1000).toList).mapAsync(1) { i =>
val allOffer = Source((0 to 1000).toList).mapAsync(50) { i =>
val result = queue.offer(s"message-$i")
result.onComplete {
case Failure(e) => hasFailed.set(true)
Expand All @@ -80,7 +57,5 @@ class CacheableQueueTest extends WordSpec with MustMatchers with OptionValues {

hasFailed.get() mustBe false
}

}

}

0 comments on commit 1633ef2

Please sign in to comment.