Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request streaming support through circe-streaming #812

Merged
merged 17 commits into from
Aug 23, 2017

Conversation

sergeykolbasov
Copy link
Collaborator

@sergeykolbasov sergeykolbasov commented Jul 13, 2017

Related to #667

  • Added a project finch-iteratee that depends on io.iteratee
  • Added default decoder atop of Enumeratee[Future, Buf, A] to finch-circe

With this implementation user would need to add following imports:

import io.finch.iteratee._

to get an access to enumeratorBody[A, CT] and enumeratorJsonBody[A] endpoints (beside adding a new dependency finch-iteratee).

@codecov-io
Copy link

codecov-io commented Jul 13, 2017

Codecov Report

Merging #812 into master will decrease coverage by 0.31%.
The diff coverage is 71.11%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #812      +/-   ##
==========================================
- Coverage   80.25%   79.94%   -0.32%     
==========================================
  Files          46       49       +3     
  Lines         699      738      +39     
  Branches       32       35       +3     
==========================================
+ Hits          561      590      +29     
- Misses        138      148      +10
Impacted Files Coverage Δ
...amples/src/main/scala/io/finch/iteratee/Main.scala 0% <0%> (ø)
circe/src/main/scala/io/finch/circe/package.scala 100% <100%> (ø) ⬆️
...e/src/main/scala/io/finch/iteratee/Enumerate.scala 100% <100%> (ø)
...ut/src/main/scala/io/finch/argonaut/Decoders.scala 100% <100%> (+16.66%) ⬆️
circe/src/main/scala/io/finch/circe/Decoders.scala 100% <100%> (ø) ⬆️
...tee/src/main/scala/io/finch/iteratee/package.scala 95.45% <95.45%> (ø)
core/src/main/scala/io/finch/Endpoint.scala 74.8% <0%> (+1.57%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0adbae8...f16ee5f. Read the comment docs.

Copy link
Collaborator

@vkostyukov vkostyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's really cool @imliar! Just some nits from my side around the structure. I like this direction and it seems the next good step(s) would be to allow serving Endpoint[Enumerator[Future, A] by (pretty much what Travis did).

  1. Providing Enumeratee[Future, A, Buf] from within finch-circe and
  2. Providing ToResponse[Enumerator[Future, A] from within finch-iteratee.

Let me If you want to do that as well (either as part of this PR or separately). This is a good work!

build.sbt Outdated

lazy val core = project
.settings(moduleName := "finch-core")
.settings(allSettings)

lazy val streaming = project
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it iteratee instead (since it basically provides support to it).

import io.iteratee.Enumeratee

trait Streaming {
implicit def decoderEnumeratee[A : Decoder]: Enumeratee[Future, Buf, A] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's totally fine if we provide this instance under just io.finch.circe._.

/**
* Streaming module
*/
package object streaming {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend Module[Fturue] (as Travis did here: #557)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there is no need for it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about syntax?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I guess it would save few symbols when lifting into a Enumerator but also add some overhead here and there, such as defining a monad, instead of picking it from imports.
Not sure if it worth it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just point monad instance to one deinfed din catbird. I'm ok not doing this now (extending module), but I think it's a right thing to do at the end of the day.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, Buf]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def asyncBufBody: Endpoint[Enumerator[Future, Buf]] = new Endpoint[Enumerator[Future, Buf]] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine not to expose this endpoint. Let's just expose one that deals with A (not Buf).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, instead I'll add an implicit identity Enumeratee[Future, Buf, Buf] in scope.
Just in case if someone would like to use this endpoint for uploading binary data

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Good idea!

* An evaluating [[Endpoint]] that reads a required chunked streaming binary body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def asyncJsonBody[A](implicit ee: Enumeratee[Future, Buf, A]): Endpoint[Enumerator[Future, A]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really coupled to JSON as far as I can tell and can work with anything that's know how to go from Buf to A. How do you feel about renaming it to enumeratorBody?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, agree on that

@sergeykolbasov
Copy link
Collaborator Author

@vkostyukov
I think I'll add it under the same PR here

@sergeykolbasov sergeykolbasov force-pushed the request-streaming branch 2 times, most recently from 319d052 to e2a6b52 Compare July 14, 2017 22:59
def apply(enumerator: Enumerator[Future, A], cs: Charset): Enumerator[Future, Buf]
}

object AsyncEncode extends EncodeInstances {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more. Maybe it's an overkill?

Looks like we can go from Enumerator[Future, A] to Enumerator[Future, Buf] just by using Encode.Aux[CT, A] (it takes charset). Maybe we don't really need this type class? I understand it makes things symmetric, but I think if we remove it and rename AsyncDecode to something like Enumerate it should all line up pretty nicely.

Copy link
Collaborator

@vkostyukov vkostyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @imliar! I think it looks pretty great already and I'm really happy with how things are lined up.

I added some small nits to the review. Once they are addressed, this is ready to get merged (I'm totally fine if we decide to test it in a separate commit). I'd like to see this go out public as part of 0.16-M2.


trait IterateeInstances extends FutureModule {

implicit def enumeratorToResponse[A, CT <: String](implicit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this has to go in its own trait (do we need to de-prioritize it?)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkostyukov for the sake of API cleanness and readability only, to separate "internal" stuff from user API


import syntax._

private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we embed this under the defined enumeratorBody endpoint? I don't think it's used anywhere else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for testing

import syntax._

private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = {
reader.read(Int.MaxValue).intoEnumerator.flatMap({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No need for wrapping (). Just .flatMap {} should work.

* An evaluating [[Endpoint]] that reads a required chunked streaming JSON body, interpreted as
* an `Enumerator[Future, A]`. The returned [[Endpoint]] only matches chunked (streamed) requests.
*/
def enumeratorJsonBody[A](implicit ad: Enumerate.Aux[A, Application.Json]): Endpoint[Enumerator[Future, A]] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!

response.setChunked(true)
response.contentType = w.value
val writer = response.writer
val iteratee = foreachM((buf: Buf) => writer.write(buf))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add new-line delimiters between each streamed object? I think we do that for AsyncStream, but I'm not sure what the right answer here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno, what if it's a buf binary stream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also circe-streaming supports decoding of only arrays or objects, so they should be valid JSON structures. I assume that response structure and behaviour should be symmetrical

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I'm fine with that - let's just make sure to document it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, it's gonna be invalid construction currently, just concatenated JSON objects. May be \n is not a bad idea in the end.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, Twitter's streaming API is new-line delimited as well.

Copy link
Collaborator Author

@sergeykolbasov sergeykolbasov Jul 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I know it, but confusing part for me is that circe-streaming works in a different manner, waiting for correct JSON structure as an input.
So when we're talking about JSON streaming, request and response formats are going to be different.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they don't have to. I'm pretty sure, circe-streaming (along with Jawn) can parse line-delimited JSON objects just fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I take that back - I'm not so sure. Let me play with it a little bit. If it doesn't work, I think, it's our responsibility to add/fix it in Circe.

Copy link
Collaborator Author

@sergeykolbasov sergeykolbasov Jul 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trait EnumerateInstances {
def instance[A, CT <: String]
(f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): Enumerate.Aux[A, CT] = new Enumerate[A] {
override type ContentType = CT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think it overrides anything.

(f: (Enumerator[Future, Buf], Charset) => Enumerator[Future, A]): Enumerate.Aux[A, CT] = new Enumerate[A] {
override type ContentType = CT

override def apply(enumerator: Enumerator[Future, Buf], cs: Charset): Enumerator[Future, A] = f(enumerator, cs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: no need for override.

@sergeykolbasov
Copy link
Collaborator Author

@vkostyukov I'd like to add tests under this PR. It'll take day or something. Is there any rush?

@vkostyukov
Copy link
Collaborator

@imliar No rush at all! Just let me know when you think it's ready!

@vkostyukov
Copy link
Collaborator

@imliar I have a quick question / topic for discussion. One of the pain points of AsyncStream was a difficulty of signaling mid-stream that we're no longer interested in sending/receiving data. Do you think there are ways for us to do that with iteratee? I'd imagine we need to use ensure (to close a reader) for an inbound Enumerator such that when we throw/raise (I guess, enumerate an error element, i.e., Future.exception) on it, it will close the reader. Do you think we can do the same for the outbound stream?

@vkostyukov vkostyukov requested a review from rpless July 18, 2017 17:06
Copy link
Collaborator

@rpless rpless left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, a lot of great work here @imliar!

I'm not sure I how I feel about adding iteratee as a dependency to the circe module, but I suppose its unavoidable if we want to provide a streaming mechanism for circe.

The only other thing we might want to add (probably doesn't have to be in this PR) is an enumeratorText, to match jsonAsyncStreamToResponse and textAsyncStreamToResponse that we provide in the core module.

@sergeykolbasov
Copy link
Collaborator Author

@vkostyukov

Could you clarify more, please.
Are we talking about "backpressure" in sense how it could be implemented in HTTP 1.1? You want to close a connection when there is an exception during reader.read() process?

@sergeykolbasov
Copy link
Collaborator Author

sergeykolbasov commented Jul 19, 2017

@rpless io.iteratee is already in use by circe-streaming anyway.

Not sure if it's meaningful to have default enumerator for text, I've followed a principle of YAGNI in this PR.

@sergeykolbasov
Copy link
Collaborator Author

Anyway, I think we have external dependency for now

import syntax._

private[finch] def enumeratorFromReader(reader: Reader): Enumerator[Future, Buf] = {
reader.read(Int.MaxValue).intoEnumerator.flatMap {
Copy link
Collaborator

@vkostyukov vkostyukov Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned that in the main thread, but I think we should do ensure(reader.discard()) here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not here, but after function call. There is no sense to add .ensure for every chunk

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that makes sense.

@vkostyukov
Copy link
Collaborator

@imliar I wouldn't think of that as a back-pressure it's more like an API for users to control the stream. And you're right - there is not much we can do in HTTP/1.1 besides just cutting the connection (and it's totally fine).

I think we should be fine on the outbound path and essentially failing an Enumerator should result in a closed connection. An inbound path is slightly more complicated, this is why I decided to start this discussion. Assuming we have ensure(reader.discard()) in place, I was wondering if we can trigger that mid-stream somehow.

@sergeykolbasov
Copy link
Collaborator Author

sergeykolbasov commented Jul 19, 2017

@vkostyukov I'm afraid there is not much space for inbound stream control.
Enumerator is immutable, enumeratee & iteratee have no backpressure, so there is no way to send exception back to enumerator. I mean, futures give some kind of control in terms of how fast reader is proccessed, but there is no way to stop processing at all on enumerator side.

May be @travisbrown could give some advice on that.

@vkostyukov
Copy link
Collaborator

I see. I think we still need ensure(reader.discard()) to make sure we cut the connection if one of the reader.read() failed (I guess it should get it triggered).

@vkostyukov
Copy link
Collaborator

@imliar Do you want to wait for circe/circe#712 to make sure we match inbound/outbound streaming approach or you think it's fine to ship this now as is?

Copy link
Collaborator

@vkostyukov vkostyukov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits from me - I really like where things end up!


object Enumerate extends EnumerateInstances {

type Aux[A, CT <: String] = Enumerate[A] {type ContentType = CT}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add implicitNoFound here to produce more friendly compile error message?

}

final override def item: RequestItem = items.BodyItem
final override def toString: String = "asyncJsonBody"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it's named differently now.


implicit def enumeratorToJsonResponse[A](implicit
e: Encode.Aux[A, Application.Json],
w: Witness.Aux[Application.Json]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation looks a bit weird to my taste.

@sergeykolbasov
Copy link
Collaborator Author

@vkostyukov I think it won't hurt nobody if we wait for circe first. Meanwhile could think about it closing of stream

@@ -62,4 +101,9 @@ object JsonLaws {
new DecodeJsonLaws[A] {
val decode: Decode.Json[A] = implicitly[Decode.Json[A]]
}

def enumeratorDecoding[A : Enumerate.Json]: EnumerateJsonLaws[A] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enumerating?

EndpointResult.Skipped
} else {
val req = input.request
def enum = enumeratorFromReader(req.reader).ensureEval(Eval.later(Future.value(req.reader.discard())))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably prefer if enumeratorFromReader returned an enumerator that already ensures to discard the reader.

@sergeykolbasov sergeykolbasov force-pushed the request-streaming branch 2 times, most recently from 2822b71 to 1daecc8 Compare August 22, 2017 19:47
Update specs and some outdated code
@sergeykolbasov
Copy link
Collaborator Author

@vkostyukov there is a binary incompatibility between latest circe and argonaut (different jawn version dependencies). It hurts! And also breaks tests

@travisbrown
Copy link
Collaborator

@imliar I've got a PR open for that on the Argonaut side.

Is there evidence that using argonaut-jawn is significantly better in the context of Finch?

@travisbrown
Copy link
Collaborator

Also there is a jawn-argonaut artifact published from jawn that should be more or less a drop-in replacement for argonaut-jawn and has the most recent versions (also Erik just said he may be removing this in the longer term).

@vkostyukov
Copy link
Collaborator

I don't see a reason to stick with Jawn in the finch-argonaut context. If it helps, let's just drop this dependency now.

@sergeykolbasov
Copy link
Collaborator Author

Removed argonaut-jawn. Tests passed. Yay.

@vkostyukov
Copy link
Collaborator

Thanks a lot for all the hard work here, @imliar! I can "squash-and-merge" unless you're willing to update your branch (so it's one commit).

@rpless Mind taking a look as well?

@rpless
Copy link
Collaborator

rpless commented Aug 23, 2017

@vkostyukov +1 from me. Great work @imliar!

@vkostyukov vkostyukov merged commit 02990b4 into finagle:master Aug 23, 2017
@sergeykolbasov
Copy link
Collaborator Author

Happy to help 😄

@sergeykolbasov sergeykolbasov mentioned this pull request Oct 8, 2017
9 tasks
jguitana pushed a commit to jguitana/finch that referenced this pull request Nov 24, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants