diff --git a/build.sbt b/build.sbt index 9b01a80e5..eebff4978 100644 --- a/build.sbt +++ b/build.sbt @@ -178,21 +178,22 @@ lazy val finch = project.in(file(".")) "io.spray" %% "spray-json" % sprayVersion )) .aggregate( - core, streaming, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest, + core, iteratee, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest, oauth2, examples, sse ) - .dependsOn(core, streaming, generic, circe) + .dependsOn(core, iteratee, generic, circe) lazy val core = project .settings(moduleName := "finch-core") .settings(allSettings) -lazy val streaming = project - .settings(moduleName := "finch-streaming") +lazy val iteratee = project + .settings(moduleName := "finch-iteratee") .settings(allSettings) .settings( libraryDependencies ++= Seq( - "io.iteratee" %% "iteratee-core" % iterateeVersion + "io.iteratee" %% "iteratee-core" % iterateeVersion, + "io.iteratee" %% "iteratee-twitter" % iterateeVersion ) ) .dependsOn(core) @@ -261,7 +262,7 @@ lazy val circe = project "io.circe" %% "circe-jackson28" % circeJacksonVersion ) ) - .dependsOn(core, streaming, jsonTest % "test") + .dependsOn(core, iteratee, jsonTest % "test") lazy val playjson = project .settings(moduleName :="finch-playjson") diff --git a/circe/src/main/scala/io/finch/circe/Decoders.scala b/circe/src/main/scala/io/finch/circe/Decoders.scala index a641fd8c6..0a72537a1 100644 --- a/circe/src/main/scala/io/finch/circe/Decoders.scala +++ b/circe/src/main/scala/io/finch/circe/Decoders.scala @@ -5,7 +5,7 @@ import io.catbird.util._ import io.circe.Decoder import io.circe.jawn._ import io.circe.streaming._ -import io.finch.Decode +import io.finch.{Application, Decode} import io.finch.internal.HttpContent import io.finch.iteratee.AsyncDecode import java.nio.charset.StandardCharsets @@ -25,14 +25,16 @@ trait Decoders { attemptJson.fold[Try[A]](Throw.apply, Return.apply) } - implicit def asyncDecodeCirce[A : Decoder]: AsyncDecode.Json[A] = AsyncDecode.instance((enum, cs) => { - val parsed = cs match { - case StandardCharsets.UTF_8 => - enum.map(_.asByteArray).through(byteParser[Future]) - case _ => - enum.map(_.asString(cs)).through(stringParser[Future]) - } - parsed.through(decoder[Future, A]) - }) + implicit def asyncDecodeCirce[A : Decoder]: AsyncDecode.Json[A] = { + AsyncDecode.instance[A, Application.Json]((enum, cs) => { + val parsed = cs match { + case StandardCharsets.UTF_8 => + enum.map(_.asByteArray).through(byteParser[Future]) + case _ => + enum.map(_.asString(cs)).through(stringParser[Future]) + } + parsed.through(decoder[Future, A]) + }) + } } diff --git a/circe/src/main/scala/io/finch/circe/Encoders.scala b/circe/src/main/scala/io/finch/circe/Encoders.scala index 7f650dc14..eae909a11 100644 --- a/circe/src/main/scala/io/finch/circe/Encoders.scala +++ b/circe/src/main/scala/io/finch/circe/Encoders.scala @@ -1,11 +1,14 @@ package io.finch.circe import com.twitter.io.Buf +import io.catbird.util._ import io.circe.{Encoder, Json} -import io.finch.Encode +import io.finch.{Application, Encode} +import io.finch.iteratee.AsyncEncode import java.nio.charset.Charset + trait Encoders { protected def print(json: Json, cs: Charset): Buf @@ -15,4 +18,7 @@ trait Encoders { */ implicit def encodeCirce[A](implicit e: Encoder[A]): Encode.Json[A] = Encode.json((a, cs) => print(e(a), cs)) + + implicit def asyncEncodeCirce[A](implicit e: Encoder[A]): AsyncEncode.Json[A] = + AsyncEncode.instance[A, Application.Json]((enum, cs) => enum.map(a => print(e(a), cs))) } diff --git a/examples/src/main/scala/io/finch/iteratee/Main.scala b/examples/src/main/scala/io/finch/streaming/Main.scala similarity index 99% rename from examples/src/main/scala/io/finch/iteratee/Main.scala rename to examples/src/main/scala/io/finch/streaming/Main.scala index 7efa262a3..5f58fba70 100644 --- a/examples/src/main/scala/io/finch/iteratee/Main.scala +++ b/examples/src/main/scala/io/finch/streaming/Main.scala @@ -1,4 +1,4 @@ -package io.finch.iteratee +package io.finch.streaming import java.util.concurrent.atomic.AtomicLong diff --git a/examples/src/test/scala/io/finch/iteratee/StreamingSpec.scala b/examples/src/test/scala/io/finch/streaming/StreamingSpec.scala similarity index 97% rename from examples/src/test/scala/io/finch/iteratee/StreamingSpec.scala rename to examples/src/test/scala/io/finch/streaming/StreamingSpec.scala index 1d778e096..3b7d19e45 100644 --- a/examples/src/test/scala/io/finch/iteratee/StreamingSpec.scala +++ b/examples/src/test/scala/io/finch/streaming/StreamingSpec.scala @@ -1,4 +1,4 @@ -package io.finch.iteratee +package io.finch.streaming import com.twitter.util.Await import io.finch.Input diff --git a/iteratee/src/main/scala/io/finch/iteratee/AsyncDecode.scala b/iteratee/src/main/scala/io/finch/iteratee/AsyncDecode.scala new file mode 100644 index 000000000..2ec53846b --- /dev/null +++ b/iteratee/src/main/scala/io/finch/iteratee/AsyncDecode.scala @@ -0,0 +1,40 @@ +package io.finch.iteratee + +import java.nio.charset.Charset + +import com.twitter.io.Buf +import com.twitter.util.Future +import io.catbird.util._ +import io.finch.Application +import io.finch.internal._ +import io.iteratee.Enumerator + +/** + * Decodes an HTTP streamed payload represented as [[Enumerator]] (encoded with [[Charset]]) into + * an [[Enumerator]] of arbitrary type `A`. + */ +trait AsyncDecode[A] { + + type ContentType <: String + + def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] +} + +object AsyncDecode extends DecodeInstances { + + type Aux[A, CT <: String] = AsyncDecode[A] {type ContentType = CT} + + type Json[A] = Aux[A, Application.Json] +} + +trait DecodeInstances { + def instance[A, CT <: String] + (f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): AsyncDecode.Aux[A, CT] = new AsyncDecode[A] { + override type ContentType = CT + + override def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs) + } + + implicit def buf2bufDecode[CT <: String]: AsyncDecode.Aux[Buf, CT] = + instance[Buf, CT]((enum, _) => enum) +} diff --git a/iteratee/src/main/scala/io/finch/iteratee/AsyncEncode.scala b/iteratee/src/main/scala/io/finch/iteratee/AsyncEncode.scala new file mode 100644 index 000000000..0519fe69a --- /dev/null +++ b/iteratee/src/main/scala/io/finch/iteratee/AsyncEncode.scala @@ -0,0 +1,37 @@ +package io.finch.iteratee + +import java.nio.charset.Charset + +import com.twitter.io.Buf +import com.twitter.util.Future +import io.finch.Application +import io.iteratee.Enumerator + +trait AsyncEncode[A] { + + type ContentType <: String + + def apply(enumerator: Enumerator[Future, A], cs: Charset): Enumerator[Future, Buf] +} + +object AsyncEncode extends EncodeInstances { + + type Aux[A, CT <: String] = AsyncEncode[A] { type ContentType = CT } + + type Json[A] = Aux[A, Application.Json] +} + +trait EncodeInstances { + + def instance[A, CT <: String] + (f: (Enumerator[Future, A], Charset) => Enumerator[Future, Buf]): AsyncEncode.Aux[A, CT] = { + new AsyncEncode[A] { + + override type ContentType = CT + + override def apply(enumerator: Enumerator[Future, A], cs: Charset): Enumerator[Future, Buf] = f(enumerator, cs) + } + } + + implicit def buf2bufEncode[CT <: String]: AsyncEncode.Aux[Buf, CT] = instance((enum, _) => enum) +} diff --git a/iteratee/src/main/scala/io/finch/iteratee/package.scala b/iteratee/src/main/scala/io/finch/iteratee/package.scala new file mode 100644 index 000000000..dd0af11b8 --- /dev/null +++ b/iteratee/src/main/scala/io/finch/iteratee/package.scala @@ -0,0 +1,77 @@ +package io.finch + +import com.twitter.finagle.http.Response +import com.twitter.io.{Buf, Reader} +import com.twitter.util.Future +import io.catbird.util._ +import io.finch.internal._ +import io.finch.items.RequestItem +import io.finch.iteratee.AsyncEncode +import io.iteratee.Enumerator +import io.iteratee.twitter.FutureModule +import shapeless.Witness + +/** + * Iteratee module + */ +package object iteratee extends IterateeInstances { + + import syntax._ + + private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = { + reader.read(Int.MaxValue).intoEnumerator.flatMap({ + case None => empty[Buf] + case Some(buf) => enumOne(buf).append(enumeratorFromReader(reader)) + }) + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def enumeratorBody[A, CT <: String](implicit decode: AsyncDecode.Aux[A, CT]): Endpoint[Enumerator[Future, A]] = { + new Endpoint[Enumerator[Future, A]] { + final def apply(input: Input): Endpoint.Result[Enumerator[Future, A]] = { + if (!input.request.isChunked) { + EndpointResult.Skipped + } else { + val req = input.request + EndpointResult.Matched( + input, + Rerunnable(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8))) + ) + } + } + + final override def item: RequestItem = items.BodyItem + final override def toString: String = "asyncJsonBody" + } + } + + /** + * An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as + * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. + */ + def enumeratorJsonBody[A](implicit ad: AsyncDecode.Aux[A, Application.Json]): Endpoint[Enumerator[Future, A]] = + enumeratorBody[A, Application.Json] + +} + +trait IterateeInstances extends FutureModule { + + implicit def enumeratorToResponse[A, CT <: String](implicit + ae: AsyncEncode.Aux[A, CT], + w: Witness.Aux[CT] + ): ToResponse.Aux[Enumerator[Future, A], CT] = { + ToResponse.instance[Enumerator[Future, A], CT]((enum, cs) => { + val response = Response() + response.setChunked(true) + response.contentType = w.value + val writer = response.writer + val iteratee = foreachM((buf: Buf) => writer.write(buf)) + ae(enum, cs).into(iteratee).ensure(writer.close()) + response + }) + } + +} diff --git a/streaming/src/main/scala/io/finch/iteratee/AsyncDecode.scala b/streaming/src/main/scala/io/finch/iteratee/AsyncDecode.scala deleted file mode 100644 index 1f857a096..000000000 --- a/streaming/src/main/scala/io/finch/iteratee/AsyncDecode.scala +++ /dev/null @@ -1,39 +0,0 @@ -package io.finch.iteratee - -import java.nio.charset.Charset - -import com.twitter.io.Buf -import com.twitter.util.Future -import io.finch.{Application, Text} -import io.finch.internal._ -import io.iteratee.Enumerator - -trait AsyncDecode[A] { - - type ContentType <: String - - def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] -} - -object AsyncDecode extends DecodeInstances { - - type Aux[A, CT <: String] = AsyncDecode[A] { type ContentType = CT } - - type Json[A] = Aux[A, Application.Json] - type Text[A] = Aux[A, Text.Plain] -} - -trait DecodeInstances { - def instance[A, CT](f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): AsyncDecode.Aux[A, CT] = { - new AsyncDecode[A] { - override type ContentType = CT - override def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs) - } - } - - implicit def buf2bufDecode[CT <: String]: AsyncDecode.Aux[Buf, CT] = - instance[Buf, CT]((enum, _) => enum) - - implicit def buf2stringDecode[CT <: String]: AsyncDecode.Aux[String, CT] = - instance[String, CT]((enum,cs) => enum.map(_.asString(cs))) -} \ No newline at end of file diff --git a/streaming/src/main/scala/io/finch/iteratee/package.scala b/streaming/src/main/scala/io/finch/iteratee/package.scala deleted file mode 100644 index 8a016d45e..000000000 --- a/streaming/src/main/scala/io/finch/iteratee/package.scala +++ /dev/null @@ -1,68 +0,0 @@ -package io.finch - -import java.nio.charset.Charset - -import com.twitter.finagle.http.Response -import com.twitter.io.{Buf, Reader} -import com.twitter.util.Future -import io.catbird.util._ -import io.finch.internal._ -import io.finch.items.RequestItem -import io.iteratee.{Enumeratee, Enumerator, Iteratee} -import shapeless.Witness - -/** - * Streaming module - */ -package object iteratee extends IterateeInstances { - - private[finch] def enumeratorFromReader(reader: Reader, cs: Charset): Enumerator[Future, String] = { - Enumerator.liftM(reader.read(Int.MaxValue)).flatMap({ - case None => Enumerator.empty[Future, String] - case Some(buf) => Enumerator.enumOne[Future, String](buf.asString(cs)).append(enumeratorFromReader(reader, cs)) - }) - } - - /** - * An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as - * an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests. - */ - def enumeratorBody[A](implicit ee: Enumeratee[Future, String, A]): Endpoint[Enumerator[Future, A]] = { - new Endpoint[Enumerator[Future, A]] { - final def apply(input: Input): Endpoint.Result[Enumerator[Future, A]] = { - if (!input.request.isChunked) { - EndpointResult.Skipped - } else { - val req = input.request - EndpointResult.Matched( - input, - Rerunnable(Output.payload(enumeratorFromReader(req.reader, req.charsetOrUtf8).through(ee))) - ) - } - } - - final override def item: RequestItem = items.BodyItem - final override def toString: String = "asyncJsonBody" - } - } - -} - -trait IterateeInstances { - - implicit def enumeratorToResponse[A, CT <: String](implicit - encode: Encode.Aux[A, CT], - w: Witness.Aux[CT] - ): ToResponse[Enumerator[Future, A]] = { - ToResponse.instance[Enumerator[Future, A], CT]((enum, cs) => { - val response = Response() - response.setChunked(true) - response.contentType = w.value - val writer = response.writer - val iteratee = Iteratee.foldM[Future, Buf, Unit](())((_, buf) => writer.write(buf)).ensure(writer.close()) - enum.map(encode.apply(_, cs)).into(iteratee) - response - }) - } - -} \ No newline at end of file