Skip to content

Commit

Permalink
WIP: AsyncDecode added
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeykolbasov committed Jul 14, 2017
1 parent 6fe91d7 commit 63e13d5
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 107 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ lazy val circe = project
"io.circe" %% "circe-jackson28" % circeJacksonVersion
)
)
.dependsOn(core, jsonTest % "test")
.dependsOn(core, streaming, jsonTest % "test")

lazy val playjson = project
.settings(moduleName :="finch-playjson")
Expand Down
17 changes: 16 additions & 1 deletion circe/src/main/scala/io/finch/circe/Decoders.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package io.finch.circe

import com.twitter.util.{Return, Throw, Try}
import com.twitter.util.{Future, Return, Throw, Try}
import io.catbird.util._
import io.circe.Decoder
import io.circe.jawn._
import io.circe.streaming._
import io.finch.Decode
import io.finch.internal.HttpContent
import io.finch.iteratee.AsyncDecode
import java.nio.charset.StandardCharsets


trait Decoders {

/**
Expand All @@ -20,4 +24,15 @@ trait Decoders {

attemptJson.fold[Try[A]](Throw.apply, Return.apply)
}

implicit def asyncDecodeCirce[A : Decoder]: AsyncDecode.Json[A] = AsyncDecode.instance((enum, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
enum.map(_.asByteArray).through(byteParser[Future])
case _ =>
enum.map(_.asString(cs)).through(stringParser[Future])
}
parsed.through(decoder[Future, A])
})

}
1 change: 1 addition & 0 deletions circe/src/main/scala/io/finch/circe/Encoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.circe.{Encoder, Json}
import io.finch.Encode
import java.nio.charset.Charset


trait Encoders {

protected def print(json: Json, cs: Charset): Buf
Expand Down
16 changes: 0 additions & 16 deletions circe/src/main/scala/io/finch/circe/Streaming.scala

This file was deleted.

5 changes: 0 additions & 5 deletions circe/src/main/scala/io/finch/circe/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,5 @@ package object circe extends Encoders with Decoders {
Buf.ByteArray.Owned(io.circe.jackson.jacksonPrint(json).getBytes(cs.name))
}

/**
* Provides Enumeratee[Future, Json, A] for stream decoding
*/
object streaming extends Streaming

}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.finch.streaming
package io.finch.iteratee

import java.util.concurrent.atomic.AtomicLong

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.finch.streaming
package io.finch.iteratee

import com.twitter.util.Await
import io.finch.Input
Expand Down
39 changes: 39 additions & 0 deletions streaming/src/main/scala/io/finch/iteratee/AsyncDecode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.finch.iteratee

import java.nio.charset.Charset

import com.twitter.io.Buf
import com.twitter.util.Future
import io.finch.{Application, Text}
import io.finch.internal._
import io.iteratee.Enumerator

trait AsyncDecode[A] {

type ContentType <: String

def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A]
}

object AsyncDecode extends DecodeInstances {

type Aux[A, CT <: String] = AsyncDecode[A] { type ContentType = CT }

type Json[A] = Aux[A, Application.Json]
type Text[A] = Aux[A, Text.Plain]
}

trait DecodeInstances {
def instance[A, CT](f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): AsyncDecode.Aux[A, CT] = {
new AsyncDecode[A] {
override type ContentType = CT
override def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs)
}
}

implicit def buf2bufDecode[CT <: String]: AsyncDecode.Aux[Buf, CT] =
instance[Buf, CT]((enum, _) => enum)

implicit def buf2stringDecode[CT <: String]: AsyncDecode.Aux[String, CT] =
instance[String, CT]((enum,cs) => enum.map(_.asString(cs)))
}
68 changes: 68 additions & 0 deletions streaming/src/main/scala/io/finch/iteratee/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.finch

import java.nio.charset.Charset

import com.twitter.finagle.http.Response
import com.twitter.io.{Buf, Reader}
import com.twitter.util.Future
import io.catbird.util._
import io.finch.internal._
import io.finch.items.RequestItem
import io.iteratee.{Enumeratee, Enumerator, Iteratee}
import shapeless.Witness

/**
* Streaming module
*/
package object iteratee extends IterateeInstances {

private[finch] def enumeratorFromReader(reader: Reader, cs: Charset): Enumerator[Future, String] = {
Enumerator.liftM(reader.read(Int.MaxValue)).flatMap({
case None => Enumerator.empty[Future, String]
case Some(buf) => Enumerator.enumOne[Future, String](buf.asString(cs)).append(enumeratorFromReader(reader, cs))
})
}

/**
* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorBody[A](implicit ee: Enumeratee[Future, String, A]): Endpoint[Enumerator[Future, A]] = {
new Endpoint[Enumerator[Future, A]] {
final def apply(input: Input): Endpoint.Result[Enumerator[Future, A]] = {
if (!input.request.isChunked) {
EndpointResult.Skipped
} else {
val req = input.request
EndpointResult.Matched(
input,
Rerunnable(Output.payload(enumeratorFromReader(req.reader, req.charsetOrUtf8).through(ee)))
)
}
}

final override def item: RequestItem = items.BodyItem
final override def toString: String = "asyncJsonBody"
}
}

}

trait IterateeInstances {

implicit def enumeratorToResponse[A, CT <: String](implicit
encode: Encode.Aux[A, CT],
w: Witness.Aux[CT]
): ToResponse[Enumerator[Future, A]] = {
ToResponse.instance[Enumerator[Future, A], CT]((enum, cs) => {
val response = Response()
response.setChunked(true)
response.contentType = w.value
val writer = response.writer
val iteratee = Iteratee.foldM[Future, Buf, Unit](())((_, buf) => writer.write(buf)).ensure(writer.close())
enum.map(encode.apply(_, cs)).into(iteratee)
response
})
}

}
49 changes: 0 additions & 49 deletions streaming/src/main/scala/io/finch/streaming/package.scala

This file was deleted.

This file was deleted.

0 comments on commit 63e13d5

Please sign in to comment.