diff --git a/build.sbt b/build.sbt index 99e871fbf..ec0009ee1 100644 --- a/build.sbt +++ b/build.sbt @@ -21,6 +21,7 @@ lazy val playVersion = "2.6.0-RC2" lazy val jacksonVersion = "2.8.8" lazy val argonautVersion = "6.2" lazy val json4sVersion = "3.5.2" +lazy val iterateeVersion = "0.12.0" lazy val compilerOptions = Seq( "-deprecation", @@ -177,15 +178,25 @@ lazy val finch = project.in(file(".")) "io.spray" %% "spray-json" % sprayVersion )) .aggregate( - core, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest, + core, streaming, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest, oauth2, examples, sse ) - .dependsOn(core, generic, circe) + .dependsOn(core, streaming, generic, circe) lazy val core = project .settings(moduleName := "finch-core") .settings(allSettings) +lazy val streaming = project + .settings(moduleName := "finch-streaming") + .settings(allSettings) + .settings( + libraryDependencies ++= Seq( + "io.iteratee" %% "iteratee-core" % iterateeVersion + ) + ) + .dependsOn(core) + lazy val generic = project .settings(moduleName := "finch-generic") .settings(allSettings) @@ -244,6 +255,7 @@ lazy val circe = project .settings( libraryDependencies ++= Seq( "io.circe" %% "circe-core" % circeVersion, + "io.circe" %% "circe-streaming" % circeVersion, "io.circe" %% "circe-jawn" % circeVersion, "io.circe" %% "circe-generic" % circeVersion % "test", "io.circe" %% "circe-jackson28" % circeJacksonVersion diff --git a/circe/src/main/scala/io/finch/circe/Streaming.scala b/circe/src/main/scala/io/finch/circe/Streaming.scala new file mode 100644 index 000000000..d3de29e4d --- /dev/null +++ b/circe/src/main/scala/io/finch/circe/Streaming.scala @@ -0,0 +1,16 @@ +package io.finch.circe + +import com.twitter.io.Buf +import com.twitter.util.Future +import io.catbird.util._ +import io.circe.Decoder +import io.circe.streaming.{byteParser, decoder} +import io.iteratee.Enumeratee + +trait Streaming { + implicit def decoderEnumeratee[A : Decoder]: Enumeratee[Future, Buf, A] = + Enumeratee + .map[Future, Buf, Array[Byte]](Buf.ByteArray.Owned.extract) + .andThen(byteParser[Future]) + .andThen(decoder[Future, A]) +} diff --git a/circe/src/main/scala/io/finch/circe/package.scala b/circe/src/main/scala/io/finch/circe/package.scala index 807462d18..e5dba26a4 100644 --- a/circe/src/main/scala/io/finch/circe/package.scala +++ b/circe/src/main/scala/io/finch/circe/package.scala @@ -28,5 +28,11 @@ package object circe extends Encoders with Decoders { else Buf.ByteArray.Owned(io.circe.jackson.jacksonPrint(json).getBytes(cs.name)) } + + /** + * Provides Enumeratee[Future, Json, A] for stream decoding + */ + object streaming extends Streaming + } diff --git a/streaming/src/main/scala/io/finch/streaming/package.scala b/streaming/src/main/scala/io/finch/streaming/package.scala new file mode 100644 index 000000000..fffb726b8 --- /dev/null +++ b/streaming/src/main/scala/io/finch/streaming/package.scala @@ -0,0 +1,49 @@ +package io.finch + +import com.twitter.io.{Buf, Reader} +import com.twitter.util.Future +import io.catbird.util._ +import io.finch.Endpoint.Result +import io.finch.items.RequestItem +import io.iteratee.{Enumeratee, Enumerator} + +/** + * Streaming module + */ +package object streaming { + + private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = { + Enumerator.liftM(reader.read(Int.MaxValue)).flatMap({ + case None => Enumerator.empty[Future, Buf] + case Some(buf) => Enumerator.enumOne[Future, Buf](buf).append(enumeratorFromReader(reader)) + }) + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as + * an `Enumerator[Future, Buf]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def asyncBufBody: Endpoint[Enumerator[Future, Buf]] = new Endpoint[Enumerator[Future, Buf]] { + final def apply(input: Input): Endpoint.Result[Enumerator[Future, Buf]] = + if (!input.request.isChunked) EndpointResult.Skipped + else EndpointResult.Matched(input, + Rerunnable(Output.payload(enumeratorFromReader(input.request.reader)))) + + final override def item: RequestItem = items.BodyItem + final override def toString: String = "asyncBufBody" + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def asyncJsonBody[A](implicit ee: Enumeratee[Future, Buf, A]): Endpoint[Enumerator[Future, A]] = { + new Endpoint[Enumerator[Future, A]] { + final def apply(input: Input): Result[Enumerator[Future, A]] = asyncBufBody.map(_.through(ee))(input) + + final override def item: RequestItem = items.BodyItem + final override def toString: String = "asyncJsonBody" + } + } + +} diff --git a/streaming/src/test/scala/io/finch/streaming/EndpointStreamingSpec.scala b/streaming/src/test/scala/io/finch/streaming/EndpointStreamingSpec.scala new file mode 100644 index 000000000..faa02afe3 --- /dev/null +++ b/streaming/src/test/scala/io/finch/streaming/EndpointStreamingSpec.scala @@ -0,0 +1,33 @@ +package io.finch.streaming + +import com.twitter.finagle.http.Request +import com.twitter.io.Buf +import com.twitter.util.Await +import io.catbird.util._ +import io.finch.Input +import org.scalatest.{FlatSpec, Matchers} + +class EndpointStreamingSpec extends FlatSpec with Matchers { + + "asyncBufBody" should "interpret chunked request as Enumerator[Future, Buf]" in { + val req = Request() + req.setChunked(chunked = true) + val writer = req.writer + + val vector = Vector("foo", "bar") + + def write(vector: Vector[String]): Unit = { + if (vector.isEmpty) { + writer.close() + } else { + writer.write(Buf.Utf8(vector.head)).foreach(_ => write(vector.tail)) + } + } + write(vector) + + val enumerator = asyncBufBody(Input(req, Seq.empty)).awaitValueUnsafe().get + + Await.result(enumerator.map(Buf.Utf8.unapply).map(_.get).toVector) shouldBe vector + } + +}