Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request streaming support through circe-streaming #812

Merged
merged 17 commits into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions argonaut/src/main/scala/io/finch/argonaut/Decoders.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.finch.argonaut

import argonaut.{CursorHistory, DecodeJson, Json}
import argonaut.JawnParser._
import com.twitter.util.{Return, Throw, Try}
import argonaut._
import argonaut.DecodeJson
import cats.syntax.either._
import com.twitter.util.{Return, Throw}
import io.finch._
import io.finch.internal.HttpContent
import java.nio.charset.StandardCharsets
import jawn.Parser
import scala.util.{Failure, Success}

trait Decoders {

Expand All @@ -16,18 +14,9 @@ trait Decoders {
*/
implicit def decodeArgonaut[A](implicit d: DecodeJson[A]): Decode.Json[A] =
Decode.json { (b, cs) =>
val err: (String, CursorHistory) => Try[A] = { (str, hist) => Throw(new Exception(str)) }

val attemptJson = cs match {
case StandardCharsets.UTF_8 =>
Parser.parseFromByteBuffer[Json](b.asByteBuffer)(facade)
case _ =>
Parser.parseFromString[Json](b.asString(cs))(facade)
}

attemptJson match {
case Success(value) => d.decodeJson(value).fold(err, Return.apply)
case Failure(error) => Throw(error)
Parse.parse(b.asString(cs)).flatMap(_.as[A].result.leftMap(_._1)) match {
case Right(result) => Return(result)
case Left(error) => Throw(new Exception(error))
}
}
}
35 changes: 24 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ lazy val finagleVersion = "6.45.0"
lazy val twitterServerVersion = "1.30.0"
lazy val finagleOAuth2Version = "0.6.45"
lazy val finagleHttpAuthVersion = "0.1.0"
lazy val circeVersion = "0.8.0"
lazy val circeJacksonVersion = "0.8.0"
lazy val circeVersion = "0.9.0-M1"
lazy val circeJacksonVersion = "0.9.0-M1"
lazy val catbirdVersion = "0.15.0"
lazy val shapelessVersion = "2.3.2"
lazy val catsVersion = "0.9.0"
lazy val catsVersion = "1.0.0-MF"
lazy val sprayVersion = "1.3.3"
lazy val playVersion = "2.6.0-RC2"
lazy val jacksonVersion = "2.8.8"
lazy val argonautVersion = "6.2"
lazy val json4sVersion = "3.5.2"
lazy val iterateeVersion = "0.13.0"

lazy val compilerOptions = Seq(
"-deprecation",
Expand Down Expand Up @@ -177,15 +178,26 @@ lazy val finch = project.in(file("."))
"io.spray" %% "spray-json" % sprayVersion
))
.aggregate(
core, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
core, iteratee, generic, argonaut, jackson, json4s, circe, playjson, sprayjson, benchmarks, test, jsonTest,
oauth2, examples, sse
)
.dependsOn(core, generic, circe)
.dependsOn(core, iteratee, generic, circe)

lazy val core = project
.settings(moduleName := "finch-core")
.settings(allSettings)

lazy val iteratee = project
.settings(moduleName := "finch-iteratee")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"io.iteratee" %% "iteratee-core" % iterateeVersion,
"io.iteratee" %% "iteratee-twitter" % iterateeVersion
)
)
.dependsOn(core % "compile->compile;test->test")

lazy val generic = project
.settings(moduleName := "finch-generic")
.settings(allSettings)
Expand All @@ -206,17 +218,17 @@ lazy val jsonTest = project.in(file("json-test"))
.settings(
libraryDependencies ++= Seq(
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-jawn" % circeVersion
"io.circe" %% "circe-jawn" % circeVersion,
"io.circe" %% "circe-streaming" % circeVersion
) ++ testDependencies
)
.dependsOn(core)
.dependsOn(core, iteratee)

lazy val argonaut = project
.settings(moduleName := "finch-argonaut")
.settings(allSettings)
.settings(libraryDependencies ++= Seq(
"io.argonaut" %% "argonaut" % argonautVersion,
"io.argonaut" %% "argonaut-jawn" % argonautVersion
"io.argonaut" %% "argonaut" % argonautVersion
))
.dependsOn(core, jsonTest % "test")

Expand Down Expand Up @@ -244,12 +256,13 @@ lazy val circe = project
.settings(
libraryDependencies ++= Seq(
"io.circe" %% "circe-core" % circeVersion,
"io.circe" %% "circe-streaming" % circeVersion,
"io.circe" %% "circe-jawn" % circeVersion,
"io.circe" %% "circe-generic" % circeVersion % "test",
"io.circe" %% "circe-jackson28" % circeJacksonVersion
)
)
.dependsOn(core, jsonTest % "test")
.dependsOn(core, iteratee, jsonTest % "test")

lazy val playjson = project
.settings(moduleName :="finch-playjson")
Expand Down Expand Up @@ -318,7 +331,7 @@ lazy val examples = project
"com.github.finagle" %% "finagle-oauth2" % finagleOAuth2Version
)
)
.dependsOn(core, circe, jackson, oauth2)
.dependsOn(core, circe, jackson, oauth2, iteratee)

lazy val benchmarks = project
.settings(moduleName := "finch-benchmarks")
Expand Down
21 changes: 19 additions & 2 deletions circe/src/main/scala/io/finch/circe/Decoders.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package io.finch.circe

import com.twitter.util.{Return, Throw, Try}
import com.twitter.util.{Future, Return, Throw, Try}
import io.catbird.util._
import io.circe.Decoder
import io.circe.jawn._
import io.finch.Decode
import io.circe.streaming._
import io.finch.{Application, Decode}
import io.finch.internal.HttpContent
import io.finch.iteratee.Enumerate
import java.nio.charset.StandardCharsets


trait Decoders {

/**
Expand All @@ -20,4 +24,17 @@ trait Decoders {

attemptJson.fold[Try[A]](Throw.apply, Return.apply)
}

implicit def enumerateCirce[A : Decoder]: Enumerate.Json[A] = {
Enumerate.instance[A, Application.Json]((enum, cs) => {
val parsed = cs match {
case StandardCharsets.UTF_8 =>
enum.map(_.asByteArray).through(byteStreamParser[Future])
case _ =>
enum.map(_.asString(cs)).through(stringStreamParser[Future])
}
parsed.through(decoder[Future, A])
})
}

}
5 changes: 2 additions & 3 deletions circe/src/main/scala/io/finch/circe/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ package object circe extends Encoders with Decoders {
/**
* Provides a [[Printer]] that drops null keys.
*/
object dropNullKeys extends Encoders with Decoders {
private[this] val printer: Printer = Printer.noSpaces.copy(dropNullKeys = true)
object dropNullValues extends Encoders with Decoders {
private[this] val printer: Printer = Printer.noSpaces.copy(dropNullValues = true)
protected def print(json: Json, cs: Charset): Buf =
Buf.ByteBuffer.Owned(printer.prettyByteBuffer(json, cs))
}
Expand All @@ -29,4 +29,3 @@ package object circe extends Encoders with Decoders {
Buf.ByteArray.Owned(io.circe.jackson.jacksonPrint(json).getBytes(cs.name))
}
}

3 changes: 2 additions & 1 deletion circe/src/test/scala/io/finch/circe/test/CirceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.finch.test.AbstractJsonSpec
class CirceSpec extends AbstractJsonSpec {
import io.finch.circe._
checkJson("circe")
checkEnumerateJson("circe")
}

class CirceJacksonSpec extends AbstractJsonSpec {
Expand All @@ -14,6 +15,6 @@ class CirceJacksonSpec extends AbstractJsonSpec {
}

class CirceDropNullKeysSpec extends AbstractJsonSpec {
import io.finch.circe.dropNullKeys._
import io.finch.circe.dropNullValues._
checkJson("circe-dropNullKeys")
}
73 changes: 73 additions & 0 deletions examples/src/main/scala/io/finch/iteratee/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.finch.iteratee

import scala.util.Random

import com.twitter.finagle.Http
import com.twitter.util.{Await, Future}
import io.catbird.util._
import io.circe.generic.auto._
import io.finch._
import io.finch.circe._
import io.iteratee.{Enumerator, Iteratee}

/**
* A Finch application featuring iteratee-based streaming support.
* This approach is more advanced and performant then basic [[com.twitter.concurrent.AsyncStream]]
*
* There are three endpoints in this example:
*
* 1. `sumJson` - streaming request
* 2. `streamJson` - streaming response
* 3. `isPrime` - end-to-end (request - response) streaming
*
* Use the following sbt command to run the application.
*
* {{{
* $ sbt 'examples/runMain io.finch.iteratee.Main'
* }}}
*
* Use the following HTTPie/curl commands to test endpoints.
*
* {{{
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 2}' localhost:8081/sumJson
*
* $ http --stream GET :8081/streamJson
*
* $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 42}' localhost:8081/streamPrime
* }}}
*/
object Main {

private val stream: Stream[Int] = Stream(Random.nextInt()).flatMap(_ => stream)

val sumJson: Endpoint[Result] = post("sumJson" :: enumeratorJsonBody[Number]) { (enum: Enumerator[Future, Number]) =>
enum.into(Iteratee.fold[Future, Number, Result](Result(0))(_ add _)).map(Ok)
}

val streamJson: Endpoint[Enumerator[Future, Number]] = get("streamJson") {
Ok(Enumerator.enumStream[Future, Int](stream).map(Number.apply))
}

val isPrime: Endpoint[Enumerator[Future, IsPrime]] = post("streamPrime" :: enumeratorJsonBody[Number]) {
(enum: Enumerator[Future, Number]) => Ok(enum.map(_.isPrime))
}

def main(args: Array[String]): Unit =
Await.result(Http.server
.withStreaming(enabled = true)
.serve(":8081", (sumJson :+: streamJson :+: isPrime).toServiceAs[Application.Json])
)

}

case class Result(result: Int) {
def add(n: Number): Result = copy(result = result + n.i)
}

case class Number(i: Int) {

def isPrime: IsPrime = IsPrime(!(2 +: (3 to Math.sqrt(i.toDouble).toInt by 2) exists (i % _ == 0)))

}

case class IsPrime(isPrime: Boolean)
48 changes: 48 additions & 0 deletions iteratee/src/main/scala/io/finch/iteratee/Enumerate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.finch.iteratee

import java.nio.charset.Charset
import scala.annotation.implicitNotFound

import com.twitter.io.Buf
import com.twitter.util.Future
import io.finch.Application
import io.iteratee.Enumerator

/**
* Enumerate HTTP streamed payload represented as [[Enumerator]] (encoded with [[Charset]]) into
* an [[Enumerator]] of arbitrary type `A`.
*/
trait Enumerate[A] {

type ContentType <: String

def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A]
}

object Enumerate extends EnumerateInstances {

@implicitNotFound(
"""An Enumerator endpoint requires implicit Enumerate instance in scope, probably decoder for ${A} is missing.

Make sure ${A} is one of the following:

* A com.twitter.io.Buf
* A value of a type with an io.finch.iteratee.Enumerate instance (with the corresponding content-type)
"""
)
type Aux[A, CT <: String] = Enumerate[A] {type ContentType = CT}

type Json[A] = Aux[A, Application.Json]
}

trait EnumerateInstances {
def instance[A, CT <: String]
(f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): Enumerate.Aux[A, CT] = new Enumerate[A] {
type ContentType = CT

def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs)
}

implicit def buf2bufDecode[CT <: String]: Enumerate.Aux[Buf, CT] =
instance[Buf, CT]((enum, _) => enum)
}
Loading