Skip to content

Commit

Permalink
Merge pull request #578 from finagle/vk/rerunnable
Browse files Browse the repository at this point in the history
Use Rerunnable
  • Loading branch information
vkostyukov committed Apr 3, 2016
2 parents 3ff0a93 + 26d7eff commit b047b1b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.openjdk.jmh.annotations._

@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 6, time = 2, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 6, time = 2, timeUnit = TimeUnit.SECONDS)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(2)
abstract class FinchBenchmark
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy val buildSettings = Seq(

lazy val finagleVersion = "6.34.0"
lazy val circeVersion = "0.4.0-RC1"
lazy val catbirdVersion = "0.3.0"
lazy val shapelessVersion = "2.3.0"
lazy val catsVersion = "0.4.1"
lazy val sprayVersion = "1.3.2"
Expand Down Expand Up @@ -42,6 +43,7 @@ val baseSettings = Seq(
"org.typelevel" %% "cats-core" % catsVersion,
"com.twitter" %% "finagle-http" % finagleVersion,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"io.catbird" %% "catbird-util" % catbirdVersion,
compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)
) ++ testDependencies.map(_ % "test"),
resolvers ++= Seq(
Expand Down
75 changes: 26 additions & 49 deletions core/src/main/scala/io/finch/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package io.finch

import scala.reflect.ClassTag

import cats.{Alternative, Eval}
import cats.Alternative
import com.twitter.finagle.Service
import com.twitter.finagle.http.{Cookie, Request, Response, Status}
import com.twitter.util.{Future, Return, Throw, Try}
import io.catbird.util.Rerunnable
import io.finch.internal._
import shapeless._
import shapeless.ops.adjoin.Adjoin
Expand Down Expand Up @@ -90,8 +91,7 @@ trait Endpoint[A] { self =>
final def mapAsync[B](fn: A => Future[B]): Endpoint[B] = new Endpoint[B] {
def apply(input: Input): Endpoint.Result[B] =
self(input).map {
case (remainder, output) =>
(remainder, output.map(f => f.flatMap(oa => oa.traverse(a => fn(a)))))
case (remainder, output) => remainder -> output.flatMapF(oa => oa.traverse(a => fn(a)))
}

override def item = self.item
Expand All @@ -110,8 +110,7 @@ trait Endpoint[A] { self =>
final def mapOutputAsync[B](fn: A => Future[Output[B]]): Endpoint[B] = new Endpoint[B] {
def apply(input: Input): Endpoint.Result[B] =
self(input).map {
case (remainder, output) =>
(remainder, output.map(f => f.flatMap { oa =>
case (remainder, output) => remainder -> output.flatMapF { oa =>
val fob = oa.traverse(fn).map(oob => oob.flatten)

fob.map { ob =>
Expand All @@ -120,7 +119,7 @@ trait Endpoint[A] { self =>

ob2
}
}))
}
}

override def item = self.item
Expand All @@ -145,22 +144,21 @@ trait Endpoint[A] { self =>
final def transform[B](fn: Future[Output[A]] => Future[Output[B]]): Endpoint[B] = new Endpoint[B] {
override def apply(input: Input): Endpoint.Result[B] = {
self(input).map {
case (remainder, output) =>
(remainder, output.map(fn))
case (remainder, output) => remainder -> new Rerunnable[Output[B]] {
override def run: Future[Output[B]] = fn(output.run)
}
}
}
}

final def product[B](other: Endpoint[B]): Endpoint[(A, B)] = new Endpoint[(A, B)] {
private[this] def join(
foa: Future[Output[A]],
fob: Future[Output[B]]
): Future[Output[(A, B)]] = Future.join(foa.liftToTry, fob.liftToTry).flatMap {
case (Return(oa), Return(ob)) => Future.value(oa.flatMap(a => ob.map(b => (a, b))))
case (Throw(oa), Throw(ob)) => Future.exception(collectExceptions(oa, ob))
case (Throw(e), _) => Future.exception(e)
case (_, Throw(e)) => Future.exception(e)
}
private[this] def join(both: (Try[Output[A]], Try[Output[B]])): Future[Output[(A, B)]] =
both match {
case (Return(oa), Return(ob)) => Future.value(oa.flatMap(a => ob.map(b => (a, b))))
case (Throw(oa), Throw(ob)) => Future.exception(collectExceptions(oa, ob))
case (Throw(e), _) => Future.exception(e)
case (_, Throw(e)) => Future.exception(e)
}

private[this] def collectExceptions(a: Throwable, b: Throwable): Error.RequestErrors = {
def collect(e: Throwable): Seq[Throwable] = e match {
Expand All @@ -175,7 +173,7 @@ trait Endpoint[A] { self =>
self(input).flatMap {
case (remainder1, outputA) => other(remainder1).map {
case (remainder2, outputB) =>
(remainder2, for { ofa <- outputA; ofb <- outputB } yield join(ofa, ofb))
remainder2 -> outputA.liftToTry.product(outputB.liftToTry).flatMapF(join)
}
}

Expand Down Expand Up @@ -233,7 +231,7 @@ trait Endpoint[A] { self =>
*/
final def |[B >: A](other: Endpoint[B]): Endpoint[B] = new Endpoint[B] {
private[this] def aToB(o: Endpoint.Result[A]): Endpoint.Result[B] =
o.map { case (r, oo) => (r, oo.map(_.asInstanceOf[Future[Output[B]]])) }
o.asInstanceOf[Endpoint.Result[B]]

def apply(input: Input): Endpoint.Result[B] =
(self(input), other(input)) match {
Expand Down Expand Up @@ -282,7 +280,7 @@ trait Endpoint[A] { self =>

def apply(req: Request): Future[Response] = safeEndpoint(Input(req)) match {
case Some((remainder, output)) if remainder.isEmpty =>
output.map(f => f.map(o => o.toResponse[CT](req.version))).value
output.map(oa => oa.toResponse[CT](req.version)).run
case _ => Future.value(Response(req.version, Status.NotFound))
}
}
Expand All @@ -292,16 +290,7 @@ trait Endpoint[A] { self =>
* handle any matching throwable from the underlying future.
*/
final def rescue[B >: A](pf: PartialFunction[Throwable, Future[Output[B]]]): Endpoint[B] =
new Endpoint[B] {
def apply(input: Input): Endpoint.Result[B] =
self(input).map {
case (remainder, output) =>
(remainder, output.map(f => f.rescue(pf)))
}

override def item = self.item
override def toString = self.toString
}
transform(foa => foa.rescue(pf))

/**
* Recovers from any exception occurred in this endpoint by creating a new endpoint that will
Expand Down Expand Up @@ -368,40 +357,32 @@ trait Endpoint[A] { self =>
def apply(input: Input): Endpoint.Result[Option[A]] =
self(input).map {
case (remainder, output) =>
(remainder, output.map(f =>
f.liftToTry.map(f =>
f.toOption.fold(Output.None: Output[Option[A]])(o => o.map(Some.apply)))))
remainder -> output.liftToTry
.map(toa => toa.toOption.fold(Output.None: Output[Option[A]])(o => o.map(Some.apply)))
}

override def item = self.item
override def toString = self.toString
}

private[this] def withOutput[B](fn: Output[A] => Output[B]): Endpoint[B] = new Endpoint[B] {
def apply(input: Input): Endpoint.Result[B] =
self(input).map {
case (remainder, output) => (remainder, output.map(f => f.map(o => fn(o))))
}

override def item = self.item
override def toString = self.toString
}
private[this] def withOutput[B](fn: Output[A] => Output[B]): Endpoint[B] =
transform(foa => foa.map(oa => fn(oa)))
}

/**
* Provides extension methods for [[Endpoint]] to support coproduct and path syntax.
*/
object Endpoint {

type Result[A] = Option[(Input, Eval[Future[Output[A]]])]
type Result[A] = Option[(Input, Rerunnable[Output[A]])]

/**
* Creates an [[Endpoint]] from the given [[Output]].
*/
def apply(mapper: Mapper[shapeless.HNil]): Endpoint[mapper.Out] = mapper(/)

private[finch] val Empty: Endpoint[HNil] = embed(items.MultipleItems)(input =>
Some((input, Eval.now(Future.value(Output.payload(HNil: HNil)))))
Some(input -> Rerunnable(Output.payload(HNil: HNil)))
)

private[finch] def embed[A](i: items.RequestItem)(f: Input => Result[A]): Endpoint[A] =
Expand Down Expand Up @@ -553,11 +534,7 @@ object Endpoint {
override def product[A, B](fa: Endpoint[A], fb: Endpoint[B]): Endpoint[(A, B)] = fa.product(fb)

override def pure[A](x: A): Endpoint[A] = new Endpoint[A] {
override def apply(input: Input): Result[A] = Some(input -> Eval.now(Future.value(Ok(x))))
}

override def pureEval[A](x: Eval[A]): Endpoint[A] = new Endpoint[A] {
override def apply(input: Input): Result[A] = Some(input -> x.map(out => Future.value(Ok(out))))
override def apply(input: Input): Result[A] = Some(input -> Rerunnable(Output.payload(x)))
}

override def empty[A]: Endpoint[A] = new Endpoint[A] {
Expand Down
58 changes: 36 additions & 22 deletions core/src/main/scala/io/finch/Endpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package io.finch

import java.util.UUID

import cats.Eval
import com.twitter.concurrent.AsyncStream
import com.twitter.finagle.http.{Cookie, Method, Request}
import com.twitter.finagle.http.exp.Multipart.FileUpload
import com.twitter.finagle.netty3.ChannelBufferBuf
import com.twitter.io.{Buf, Charsets}
import com.twitter.util.{Base64StringEncoder, Future, Try}
import io.catbird.util.Rerunnable
import io.finch.internal.TooFastString
import shapeless._

Expand All @@ -17,8 +17,9 @@ import shapeless._
*/
trait Endpoints {

private[this] val hnilFutureOutput: Eval[Future[Output[HNil]]] =
Eval.now(Future.value(Output.payload(HNil)))
private[this] val hnilFutureOutput: Rerunnable[Output[HNil]] = new Rerunnable[Output[HNil]] {
override val run = Future.value(Output.payload(HNil))
}

type Endpoint0 = Endpoint[HNil]
type Endpoint2[A, B] = Endpoint[A :: B :: HNil]
Expand All @@ -30,7 +31,7 @@ trait Endpoints {
private[finch] class Matcher(s: String) extends Endpoint[HNil] {
def apply(input: Input): Endpoint.Result[HNil] =
input.headOption.flatMap {
case `s` => Some((input.drop(1), hnilFutureOutput))
case `s` => Some(input.drop(1) -> hnilFutureOutput)
case _ => None
}

Expand All @@ -50,7 +51,9 @@ trait Endpoints {
for {
ss <- input.headOption
aa <- f(ss)
} yield (input.drop(1), Eval.now(Future.value(Output.payload(aa))))
} yield input.drop(1) -> new Rerunnable[Output[A]] {
override def run = Future.value(Output.payload(aa))
}

def apply(n: String): Endpoint[A] = copy[A](name = n)

Expand All @@ -59,7 +62,9 @@ trait Endpoints {

private[finch] case class StringExtractor(name: String) extends Endpoint[String] {
def apply(input: Input): Endpoint.Result[String] =
input.headOption.map(s => (input.drop(1), Eval.now(Future.value(Output.payload(s)))))
input.headOption.map(s => input.drop(1) -> new Rerunnable[Output[String]] {
override def run = Future.value(Output.payload(s))
})

def apply(n: String): Endpoint[String] = copy(name = n)

Expand All @@ -73,10 +78,12 @@ trait Endpoints {
name: String,
f: String => Option[A]) extends Endpoint[Seq[A]] {
def apply(input: Input): Endpoint.Result[Seq[A]] =
Some((input.copy(path = Nil), Eval.now(Future.value(Output.payload(for {
s <- input.path
a <- f(s)
} yield a)))))
Some(input.copy(path = Nil) -> new Rerunnable[Output[Seq[A]]] {
override def run = Future.value(Output.payload(for {
s <- input.path
a <- f(s)
} yield a))
})

def apply(n: String): Endpoint[Seq[A]] = copy[A](name = n)

Expand All @@ -87,8 +94,10 @@ trait Endpoints {
if (s.length != 36) None
else try Some(UUID.fromString(s)) catch { case _: Exception => None }

private[this] def result[A](i: Input, a: A): (Input, Eval[Future[Output[A]]]) =
(i.drop(1), Eval.now(Future.value(Output.payload(a))))
private[this] def result[A](i: Input, a: A): (Input, Rerunnable[Output[A]]) =
i.drop(1) -> new Rerunnable[Output[A]] {
override def run = Future.value(Output.payload(a))
}

/**
* A matching [[Endpoint]] that reads a string value from the current path segment.
Expand Down Expand Up @@ -170,7 +179,7 @@ trait Endpoints {
*/
object * extends Endpoint[HNil] {
def apply(input: Input): Endpoint.Result[HNil] =
Some((input.copy(path = Nil), hnilFutureOutput))
Some(input.copy(path = Nil) -> hnilFutureOutput)

override def toString: String = "*"
}
Expand All @@ -180,7 +189,7 @@ trait Endpoints {
*/
object / extends Endpoint[HNil] {
def apply(input: Input): Endpoint.Result[HNil] =
Some((input, hnilFutureOutput))
Some(input -> hnilFutureOutput)

override def toString: String = ""
}
Expand Down Expand Up @@ -287,20 +296,24 @@ trait Endpoints {

private[this] def option[A](item: items.RequestItem)(f: Request => A): Endpoint[A] =
Endpoint.embed(item)(input =>
Some((input, Eval.later(Future.value(Output.payload(f(input.request))))))
)
Some(input -> new Rerunnable[Output[A]] {
override def run = Future.value(Output.payload(f(input.request)))
}))

private[this] def exists[A](item: items.RequestItem)(f: Request => Option[A]): Endpoint[A] =
Endpoint.embed(item)(input =>
f(input.request).map(s => (input, Eval.now(Future.value(Output.payload(s)))))
f(input.request).map(s => input -> new Rerunnable[Output[A]] {
override def run = Future.value(Output.payload(s))
})
)

private[this] def matches[A]
(item: items.RequestItem)
(p: Request => Boolean)
(f: Request => A): Endpoint[A] = Endpoint.embed(item)(input =>
if (p(input.request)) Some((input, Eval.later(Future.value(Output.payload(f(input.request))))))
else None
if (p(input.request)) Some(input -> new Rerunnable[Output[A]] {
override def run = Future.value(Output.payload(f(input.request)))
}) else None
)

/**
Expand Down Expand Up @@ -443,13 +456,14 @@ trait Endpoints {
private[this] val expected = "Basic " + Base64StringEncoder.encode(userInfo.getBytes)

def apply[A](e: Endpoint[A]): Endpoint[A] = new Endpoint[A] {
private[this] val failedOutput: Eval[Future[Output[A]]] =
Eval.now(Future.value(Unauthorized(BasicAuthFailed)))
private[this] val failedOutput = new Rerunnable[Output[A]] {
override def run = Future.value(Unauthorized(BasicAuthFailed))
}

def apply(input: Input): Endpoint.Result[A] =
input.request.authorization.flatMap {
case `expected` => e(input)
case _ => Some((input.copy(path = Seq.empty), failedOutput))
case _ => Some(input.copy(path = Seq.empty) -> failedOutput)
}

override def toString: String = s"BasicAuth($e)"
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/io/finch/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ object Output {
}

implicit class EndpointResultOps[A](val o: Endpoint.Result[A]) extends AnyVal {
private[finch] def poll: Option[Try[A]] = o.flatMap(_._2.value.poll.map(_.map(_.value)))
private[finch] def output: Option[Output[A]] = o.map({ case (_, oa) => Await.result(oa.value) })
private[finch] def poll: Option[Try[A]] = o.flatMap(_._2.run.poll.map(_.map(_.value)))
private[finch] def output: Option[Output[A]] = o.map({ case (_, oa) => Await.result(oa.run) })
private[finch] def value: Option[A] = output.map(oa => oa.value)
private[finch] def remainder: Option[Input] = o.map(_._1)
}
Expand Down
Loading

0 comments on commit b047b1b

Please sign in to comment.