Skip to content

Commit

Permalink
Rename module to io.finch.iteratee
Browse files Browse the repository at this point in the history
Add AsyncEncode
  • Loading branch information
sergeykolbasov committed Jul 14, 2017
1 parent 63e13d5 commit 319d052
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 126 deletions.
13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,22 @@ lazy val finch = project.in(file("."))
"io.spray" %% "spray-json" % sprayVersion
))
.aggregate(
core, streaming, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
core, iteratee, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
oauth2, examples, sse
)
.dependsOn(core, streaming, generic, circe)
.dependsOn(core, iteratee, generic, circe)

lazy val core = project
.settings(moduleName := "finch-core")
.settings(allSettings)

lazy val streaming = project
.settings(moduleName := "finch-streaming")
lazy val iteratee = project
.settings(moduleName := "finch-iteratee")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"io.iteratee" %% "iteratee-core" % iterateeVersion
"io.iteratee" %% "iteratee-core" % iterateeVersion,
"io.iteratee" %% "iteratee-twitter" % iterateeVersion
)
)
.dependsOn(core)
Expand Down Expand Up @@ -261,7 +262,7 @@ lazy val circe = project
"io.circe" %% "circe-jackson28" % circeJacksonVersion
)
)
.dependsOn(core, streaming, jsonTest % "test")
.dependsOn(core, iteratee, jsonTest % "test")

lazy val playjson = project
.settings(moduleName :="finch-playjson")
Expand Down
22 changes: 12 additions & 10 deletions circe/src/main/scala/io/finch/circe/Decoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.catbird.util._
import io.circe.Decoder
import io.circe.jawn._
import io.circe.streaming._
import io.finch.Decode
import io.finch.{Application, Decode}
import io.finch.internal.HttpContent
import io.finch.iteratee.AsyncDecode
import java.nio.charset.StandardCharsets
Expand All @@ -25,14 +25,16 @@ trait Decoders {
attemptJson.fold[Try[A]](Throw.apply, Return.apply)
}

implicit def asyncDecodeCirce[A : Decoder]: AsyncDecode.Json[A] = AsyncDecode.instance((enum, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
enum.map(_.asByteArray).through(byteParser[Future])
case _ =>
enum.map(_.asString(cs)).through(stringParser[Future])
}
parsed.through(decoder[Future, A])
})
implicit def asyncDecodeCirce[A : Decoder]: AsyncDecode.Json[A] = {
AsyncDecode.instance[A, Application.Json]((enum, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
enum.map(_.asByteArray).through(byteParser[Future])
case _ =>
enum.map(_.asString(cs)).through(stringParser[Future])
}
parsed.through(decoder[Future, A])
})
}

}
8 changes: 7 additions & 1 deletion circe/src/main/scala/io/finch/circe/Encoders.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.finch.circe

import com.twitter.io.Buf
import io.catbird.util._
import io.circe.{Encoder, Json}
import io.finch.Encode
import io.finch.{Application, Encode}
import io.finch.iteratee.AsyncEncode
import java.nio.charset.Charset



trait Encoders {

protected def print(json: Json, cs: Charset): Buf
Expand All @@ -15,4 +18,7 @@ trait Encoders {
*/
implicit def encodeCirce[A](implicit e: Encoder[A]): Encode.Json[A] =
Encode.json((a, cs) => print(e(a), cs))

implicit def asyncEncodeCirce[A](implicit e: Encoder[A]): AsyncEncode.Json[A] =
AsyncEncode.instance[A, Application.Json]((enum, cs) => enum.map(a => print(e(a), cs)))
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.finch.iteratee
package io.finch.streaming

import java.util.concurrent.atomic.AtomicLong

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.finch.iteratee
package io.finch.streaming

import com.twitter.util.Await
import io.finch.Input
Expand Down
40 changes: 40 additions & 0 deletions iteratee/src/main/scala/io/finch/iteratee/AsyncDecode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.finch.iteratee

import java.nio.charset.Charset

import com.twitter.io.Buf
import com.twitter.util.Future
import io.catbird.util._
import io.finch.Application
import io.finch.internal._
import io.iteratee.Enumerator

/**
* Decodes an HTTP streamed payload represented as [[Enumerator]] (encoded with [[Charset]]) into
* an [[Enumerator]] of arbitrary type `A`.
*/
trait AsyncDecode[A] {

type ContentType <: String

def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A]
}

object AsyncDecode extends DecodeInstances {

type Aux[A, CT <: String] = AsyncDecode[A] {type ContentType = CT}

type Json[A] = Aux[A, Application.Json]
}

trait DecodeInstances {
def instance[A, CT <: String]
(f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): AsyncDecode.Aux[A, CT] = new AsyncDecode[A] {
override type ContentType = CT

override def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs)
}

implicit def buf2bufDecode[CT <: String]: AsyncDecode.Aux[Buf, CT] =
instance[Buf, CT]((enum, _) => enum)
}
37 changes: 37 additions & 0 deletions iteratee/src/main/scala/io/finch/iteratee/AsyncEncode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.finch.iteratee

import java.nio.charset.Charset

import com.twitter.io.Buf
import com.twitter.util.Future
import io.finch.Application
import io.iteratee.Enumerator

trait AsyncEncode[A] {

type ContentType <: String

def apply(enumerator: Enumerator[Future, A], cs: Charset): Enumerator[Future, Buf]
}

object AsyncEncode extends EncodeInstances {

type Aux[A, CT <: String] = AsyncEncode[A] { type ContentType = CT }

type Json[A] = Aux[A, Application.Json]
}

trait EncodeInstances {

def instance[A, CT <: String]
(f: (Enumerator[Future, A], Charset) => Enumerator[Future, Buf]): AsyncEncode.Aux[A, CT] = {
new AsyncEncode[A] {

override type ContentType = CT

override def apply(enumerator: Enumerator[Future, A], cs: Charset): Enumerator[Future, Buf] = f(enumerator, cs)
}
}

implicit def buf2bufEncode[CT <: String]: AsyncEncode.Aux[Buf, CT] = instance((enum, _) => enum)
}
77 changes: 77 additions & 0 deletions iteratee/src/main/scala/io/finch/iteratee/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.finch

import com.twitter.finagle.http.Response
import com.twitter.io.{Buf, Reader}
import com.twitter.util.Future
import io.catbird.util._
import io.finch.internal._
import io.finch.items.RequestItem
import io.finch.iteratee.AsyncEncode
import io.iteratee.Enumerator
import io.iteratee.twitter.FutureModule
import shapeless.Witness

/**
* Iteratee module
*/
package object iteratee extends IterateeInstances {

import syntax._

private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = {
reader.read(Int.MaxValue).intoEnumerator.flatMap({
case None => empty[Buf]
case Some(buf) => enumOne(buf).append(enumeratorFromReader(reader))
})
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorBody[A, CT <: String](implicit decode: AsyncDecode.Aux[A, CT]): Endpoint[Enumerator[Future, A]] = {
new Endpoint[Enumerator[Future, A]] {
final def apply(input: Input): Endpoint.Result[Enumerator[Future, A]] = {
if (!input.request.isChunked) {
EndpointResult.Skipped
} else {
val req = input.request
EndpointResult.Matched(
input,
Rerunnable(Output.payload(decode(enumeratorFromReader(req.reader), req.charsetOrUtf8)))
)
}
}

final override def item: RequestItem = items.BodyItem
final override def toString: String = "asyncJsonBody"
}
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorJsonBody[A](implicit ad: AsyncDecode.Aux[A, Application.Json]): Endpoint[Enumerator[Future, A]] =
enumeratorBody[A, Application.Json]

}

trait IterateeInstances extends FutureModule {

implicit def enumeratorToResponse[A, CT <: String](implicit
ae: AsyncEncode.Aux[A, CT],
w: Witness.Aux[CT]
): ToResponse.Aux[Enumerator[Future, A], CT] = {
ToResponse.instance[Enumerator[Future, A], CT]((enum, cs) => {
val response = Response()
response.setChunked(true)
response.contentType = w.value
val writer = response.writer
val iteratee = foreachM((buf: Buf) => writer.write(buf))
ae(enum, cs).into(iteratee).ensure(writer.close())
response
})
}

}
39 changes: 0 additions & 39 deletions streaming/src/main/scala/io/finch/iteratee/AsyncDecode.scala

This file was deleted.

68 changes: 0 additions & 68 deletions streaming/src/main/scala/io/finch/iteratee/package.scala

This file was deleted.

0 comments on commit 319d052

Please sign in to comment.