Skip to content

Commit

Permalink
Request streaming support through circe-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeykolbasov committed Jul 13, 2017
1 parent 27f1b6b commit 6fe91d7
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 2 deletions.
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ lazy val playVersion = "2.6.0-RC2"
lazy val jacksonVersion = "2.8.8"
lazy val argonautVersion = "6.2"
lazy val json4sVersion = "3.5.2"
lazy val iterateeVersion = "0.12.0"

lazy val compilerOptions = Seq(
"-deprecation",
Expand Down Expand Up @@ -177,15 +178,25 @@ lazy val finch = project.in(file("."))
"io.spray" %% "spray-json" % sprayVersion
))
.aggregate(
core, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
core, streaming, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
oauth2, examples, sse
)
.dependsOn(core, generic, circe)
.dependsOn(core, streaming, generic, circe)

lazy val core = project
.settings(moduleName := "finch-core")
.settings(allSettings)

lazy val streaming = project
.settings(moduleName := "finch-streaming")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"io.iteratee" %% "iteratee-core" % iterateeVersion
)
)
.dependsOn(core)

lazy val generic = project
.settings(moduleName := "finch-generic")
.settings(allSettings)
Expand Down Expand Up @@ -244,6 +255,7 @@ lazy val circe = project
.settings(
libraryDependencies ++= Seq(
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-streaming" % circeVersion,
"io.circe" %% "circe-jawn" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion % "test",
"io.circe" %% "circe-jackson28" % circeJacksonVersion
Expand Down
16 changes: 16 additions & 0 deletions circe/src/main/scala/io/finch/circe/Streaming.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.finch.circe

import com.twitter.io.Buf
import com.twitter.util.Future
import io.catbird.util._
import io.circe.Decoder
import io.circe.streaming.{byteParser, decoder}
import io.iteratee.Enumeratee

trait Streaming {
implicit def decoderEnumeratee[A : Decoder]: Enumeratee[Future, Buf, A] =
Enumeratee
.map[Future, Buf, Array[Byte]](Buf.ByteArray.Owned.extract)
.andThen(byteParser[Future])
.andThen(decoder[Future, A])
}
6 changes: 6 additions & 0 deletions circe/src/main/scala/io/finch/circe/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ package object circe extends Encoders with Decoders {
else
Buf.ByteArray.Owned(io.circe.jackson.jacksonPrint(json).getBytes(cs.name))
}

/**
* Provides Enumeratee[Future, Json, A] for stream decoding
*/
object streaming extends Streaming

}

49 changes: 49 additions & 0 deletions streaming/src/main/scala/io/finch/streaming/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.finch

import com.twitter.io.{Buf, Reader}
import com.twitter.util.Future
import io.catbird.util._
import io.finch.Endpoint.Result
import io.finch.items.RequestItem
import io.iteratee.{Enumeratee, Enumerator}

/**
* Streaming module
*/
package object streaming {

private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = {
Enumerator.liftM(reader.read(Int.MaxValue)).flatMap({
case None => Enumerator.empty[Future, Buf]
case Some(buf) => Enumerator.enumOne[Future, Buf](buf).append(enumeratorFromReader(reader))
})
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, Buf]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def asyncBufBody: Endpoint[Enumerator[Future, Buf]] = new Endpoint[Enumerator[Future, Buf]] {
final def apply(input: Input): Endpoint.Result[Enumerator[Future, Buf]] =
if (!input.request.isChunked) EndpointResult.Skipped
else EndpointResult.Matched(input,
Rerunnable(Output.payload(enumeratorFromReader(input.request.reader))))

final override def item: RequestItem = items.BodyItem
final override def toString: String = "asyncBufBody"
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def asyncJsonBody[A](implicit ee: Enumeratee[Future, Buf, A]): Endpoint[Enumerator[Future, A]] = {
new Endpoint[Enumerator[Future, A]] {
final def apply(input: Input): Result[Enumerator[Future, A]] = asyncBufBody.map(_.through(ee))(input)

final override def item: RequestItem = items.BodyItem
final override def toString: String = "asyncJsonBody"
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.finch.streaming

import com.twitter.finagle.http.Request
import com.twitter.io.Buf
import com.twitter.util.Await
import io.catbird.util._
import io.finch.Input
import org.scalatest.{FlatSpec, Matchers}

class EndpointStreamingSpec extends FlatSpec with Matchers {

"asyncBufBody" should "interpret chunked request as Enumerator[Future, Buf]" in {
val req = Request()
req.setChunked(chunked = true)
val writer = req.writer

val vector = Vector("foo", "bar")

def write(vector: Vector[String]): Unit = {
if (vector.isEmpty) {
writer.close()
} else {
writer.write(Buf.Utf8(vector.head)).foreach(_ => write(vector.tail))
}
}
write(vector)

val enumerator = asyncBufBody(Input(req, Seq.empty)).awaitValueUnsafe().get

Await.result(enumerator.map(Buf.Utf8.unapply).map(_.get).toVector) shouldBe vector
}

}

0 comments on commit 6fe91d7

Please sign in to comment.