Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send partial bodies #55

Merged
merged 4 commits into from
Apr 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions src/http2_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
-export([
send_headers/3,
send_body/3,
send_body/4,
is_push/1,
new_stream/1,
new_stream/2,
Expand Down Expand Up @@ -73,7 +74,8 @@
pid :: pid(),
send_window_size :: non_neg_integer(),
recv_window_size :: non_neg_integer(),
queued_data :: undefined | done | binary()
queued_data :: undefined | done | binary(),
body_complete = false :: boolean()
}).
-type stream() :: #stream{}.

Expand All @@ -100,6 +102,11 @@

-type connection() :: #connection{}.

-type send_body_option() :: {send_end_stream, boolean()}.
-type send_body_opts() :: [send_body_option()].

-export_type([send_body_option/0, send_body_opts/0]).

-spec start_client_link(gen_tcp | ssl,
inet:ip_address() | inet:hostname(),
inet:port_number(),
Expand Down Expand Up @@ -208,9 +215,15 @@ send_headers(Pid, StreamId, Headers) ->

-spec send_body(pid(), stream_id(), binary()) -> ok.
send_body(Pid, StreamId, Body) ->
gen_fsm:send_all_state_event(Pid, {send_body, StreamId, Body}),
gen_fsm:send_all_state_event(Pid, {send_body, StreamId, Body, []}),
ok.
-spec send_body(pid(), stream_id(), binary(), send_body_opts()) -> ok.
send_body(Pid, StreamId, Body, Opts) ->
gen_fsm:send_all_state_event(Pid, {send_body, StreamId, Body, Opts}),
ok.



-spec get_peer(pid()) ->
{ok, {inet:ip_address(), inet:port_number()}} | {error, term()}.
get_peer(Pid) ->
Expand Down Expand Up @@ -427,8 +440,10 @@ route_frame({H, Payload},
Delta =
case proplists:get_value(?SETTINGS_INITIAL_WINDOW_SIZE, PList) of
undefined ->
lager:debug("[~p] IWS undefined", [Conn#connection.type]),
0;
NewIWS ->
lager:debug("old IWS: ~p new IWS: ~p", [OldIWS, NewIWS]),
OldIWS - NewIWS
end,
NewSendSettings = http2_frame_settings:overlay(SS, Payload),
Expand Down Expand Up @@ -902,10 +917,14 @@ s_send_what_we_can(SWS, MFS, Stream) ->
{Frame, SentBytes, NewS} =
case MaxToSend > QueueSize of
true ->
Flags = case Stream#stream.body_complete of
true -> ?FLAG_END_STREAM;
false -> 0
end,
%% We have the power to send everything
{{#frame_header{
stream_id=Stream#stream.id,
flags=?FLAG_END_STREAM,
flags=Flags,
type=?DATA,
length=QueueSize
},
Expand Down Expand Up @@ -982,17 +1001,24 @@ handle_event({send_headers, StreamId, Headers},
Conn#connection{
encode_context=NewContext
}};
handle_event({send_body, StreamId, Body},
handle_event({send_body, StreamId, Body, Opts},
StateName,
#connection{}=Conn) ->
lager:debug("[~p] Send Body Stream ~p",
[Conn#connection.type, StreamId]),
Stream = get_stream(StreamId, Conn#connection.streams),
BodyComplete = proplists:get_value(send_end_stream, Opts, true),
OldBody = Stream#stream.queued_data,
NewBody = case is_binary(OldBody) of
true -> <<OldBody/binary, Body/binary>>;
false -> Body
end,
{NewSWS, NewS} =
s_send_what_we_can(Conn#connection.send_window_size,
Conn#connection.send_settings#settings.max_frame_size,
Stream#stream{
queued_data=Body
queued_data=NewBody,
body_complete=BodyComplete
}),

{next_state, StateName,
Expand Down
38 changes: 33 additions & 5 deletions test/client_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
all() ->
[
{group, default_handler},
{group, peer_handler}
{group, peer_handler},
{group, double_body_handler}
].

groups() -> [{default_handler, [complex_request,
upgrade_tcp_connection,
basic_push]},
{peer_handler, [get_peer_in_handler]}
{peer_handler, [get_peer_in_handler]},
{double_body_handler, [send_body_opts]}
].

init_per_suite(Config) ->
Expand All @@ -23,11 +25,16 @@ init_per_group(default_handler, Config) ->
%% We'll start up a chatterbox server once, with this data_dir.
NewConfig = [{www_root, data_dir},{initial_window_size,99999999}|Config],
chatterbox_test_buddy:start(NewConfig);
init_per_group(_, Config) ->
NewConfig = [{stream_callback_mod, peer_test_handler},
init_per_group(double_body_handler, Config) ->
NewConfig = [{stream_callback_mod, double_body_handler},
{initial_window_size,99999999}|Config],
chatterbox_test_buddy:start(NewConfig),
Config.
Config;
init_per_group(peer_handler, Config) ->
NewConfig = [{stream_callback_mod, peer_test_handler},
{initial_window_size,99999999}|Config],
chatterbox_test_buddy:start(NewConfig);
init_per_group(_, Config) -> Config.

init_per_testcase(_, Config) ->
Config.
Expand Down Expand Up @@ -118,3 +125,24 @@ get_peer_in_handler(_Config) ->
ct:pal("Response Headers: ~p", [ResponseHeaders]),
ct:pal("Response Body: ~p", [ResponseBody]),
ok.

send_body_opts(_Config) ->
{ok, Client} = http2_client:start_link(),
RequestHeaders =
[
{<<":method">>, <<"GET">>},
{<<":path">>, <<"/index.html">>},
{<<":scheme">>, <<"https">>},
{<<":authority">>, <<"localhost:8080">>},
{<<"accept">>, <<"*/*">>},
{<<"accept-encoding">>, <<"gzip, deflate">>},
{<<"user-agent">>, <<"chattercli/0.0.1 :D">>}
],

ExpectedResponseBody = <<"BodyPart1\nBodyPart2">>,

{ok, {ResponseHeaders, ResponseBody}} = http2_client:sync_request(Client, RequestHeaders, <<>>),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a better way to test this using http2c. It will allow you to inspect the response as a series of frames. You can then assert that the response is made of two data frames, and that they're broken up correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a level of abstraction lower than we need to test. as a user I don't care how you send my data, just that you do as fast as possible.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really doing that though. For better or worse, this test would pass if the server held back all the response body until it was ready to send it all.

It might mean that we have to add some kind of streaming functionality to the http2_client, so that we can see the response coming in pieces. We might have to add some kind of timer delay to the double_body_handler to take advantage of that.

ct:pal("Response Headers: ~p", [ResponseHeaders]),
ct:pal("Response Body: ~p", [ResponseBody]),
?assertEqual(ExpectedResponseBody, iolist_to_binary(ResponseBody)),
ok.
51 changes: 51 additions & 0 deletions test/double_body_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-module(double_body_handler).

-include_lib("chatterbox/include/http2.hrl").

-behaviour(http2_stream).

-export([
init/2,
on_receive_request_headers/2,
on_send_push_promise/2,
on_receive_request_data/2,
on_request_end_stream/1
]).

-record(state, {conn_pid :: pid(),
stream_id :: integer()
}).

-spec init(pid(), integer()) -> {ok, any()}.
init(ConnPid, StreamId) -> {ok, #state{conn_pid=ConnPid,
stream_id=StreamId}}.

-spec on_receive_request_headers(
Headers :: hpack:headers(),
CallbackState :: any()) -> {ok, NewState :: any()}.
on_receive_request_headers(_Headers, State) -> {ok, State}.

-spec on_send_push_promise(
Headers :: hpack:headers(),
CallbackState :: any()) -> {ok, NewState :: any()}.
on_send_push_promise(_Headers, State) -> {ok, State}.

-spec on_receive_request_data(
iodata(),
CallbackState :: any())-> {ok, NewState :: any()}.
on_receive_request_data(_Data, State) -> {ok, State}.

-spec on_request_end_stream(
CallbackState :: any()) ->
{ok, NewState :: any()}.
on_request_end_stream(State=#state{conn_pid=ConnPid,
stream_id=StreamId}) ->
ResponseHeaders = [
{<<":status">>,<<"200">>}
],
http2_connection:send_headers(ConnPid, StreamId, ResponseHeaders),
http2_connection:send_body(ConnPid, StreamId, <<"BodyPart1\n">>,
[{send_end_stream, false}]),
http2_connection:send_body(ConnPid, StreamId, <<"BodyPart2">>),
{ok, State}.

57 changes: 56 additions & 1 deletion test/flow_control_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
all() ->
[
exceed_server_connection_receive_window,
exceed_server_stream_receive_window
exceed_server_stream_receive_window,
server_buffer_response
].

init_per_suite(Config) ->
Expand Down Expand Up @@ -38,6 +39,14 @@ init_per_testcase(
{flow_control, manual}
|Config],
chatterbox_test_buddy:start(PreChatterConfig);
init_per_testcase(server_buffer_response, Config) ->
PreChatterConfig =
[
{stream_callback_mod, flow_control_handler},
{initial_window_size, 64},
{flow_control, manual}
|Config],
chatterbox_test_buddy:start(PreChatterConfig);
init_per_testcase(_, Config) ->
Config.

Expand Down Expand Up @@ -77,6 +86,52 @@ exceed_server_stream_receive_window(_Config) ->
?assertEqual(?FLOW_CONTROL_ERROR, RstStream#rst_stream.error_code),
ok.

server_buffer_response(_Config) ->
WindowSize = 64,
application:load(chatterbox),
application:set_env(chatterbox, client_initial_window_size, WindowSize),
Headers = [{<<":path">>, <<"/">>},
{<<":method">>, <<"GET">>}],
{ok, Client} = http2c:start_link(),
{ok, {HeadersBin, _EC}} = hpack:encode(Headers, hpack:new_context()),
HF = {#frame_header{length=byte_size(HeadersBin),
type=?HEADERS,
flags=?FLAG_END_HEADERS bor ?FLAG_END_STREAM,
stream_id=3},
#headers{block_fragment=HeadersBin}},
http2c:send_unaltered_frames(Client, [HF]),

timer:sleep(300),
Resp1 = http2c:get_frames(Client, 3),
Size1 = data_frame_size(Resp1),
?assertEqual(WindowSize, Size1),
send_window_update(Client, 64),

timer:sleep(200),
Resp2 = http2c:get_frames(Client, 3),
Size2 = data_frame_size(Resp2),
?assertEqual(WindowSize, Size2),

send_window_update(Client, 64),
timer:sleep(200),
Resp3 = http2c:get_frames(Client, 3),
Size3 = data_frame_size(Resp3),
%% (68 * 2) - (64 * 2) = 8
?assertEqual(8, Size3).

data_frame_size(Frames) ->
DataFrames = lists:filter(fun({#frame_header{type=?DATA}, _}) -> true;
(_) -> false end, Frames),
lists:foldl(fun({_FH, #data{data=Data}}, Acc) ->
Acc + byte_size(Data) end, 0, DataFrames).

send_window_update(Client, Size) ->
http2c:send_unaltered_frames(Client,
[{#frame_header{length=4,
type=?WINDOW_UPDATE,
stream_id=3},
#window_update{window_size_increment=Size}}
]).
send_n_bytes(N) ->
%% We're up and running with a ridiculously small connection
%% window size of 64 bytes. The problem is that each stream will
Expand Down
55 changes: 55 additions & 0 deletions test/flow_control_handler.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-module(flow_control_handler).

-include_lib("chatterbox/include/http2.hrl").

-define(SEND_BYTES, 68).

-behaviour(http2_stream).

-export([
init/2,
on_receive_request_headers/2,
on_send_push_promise/2,
on_receive_request_data/2,
on_request_end_stream/1
]).

-record(state, {conn_pid :: pid(),
stream_id :: integer()
}).

-spec init(pid(), integer()) -> {ok, any()}.
init(ConnPid, StreamId) ->
{ok, #state{conn_pid=ConnPid,
stream_id=StreamId}}.

-spec on_receive_request_headers(
Headers :: hpack:headers(),
CallbackState :: any()) -> {ok, NewState :: any()}.
on_receive_request_headers(_Headers, State) -> {ok, State}.

-spec on_send_push_promise(
Headers :: hpack:headers(),
CallbackState :: any()) -> {ok, NewState :: any()}.
on_send_push_promise(_Headers, State) -> {ok, State}.

-spec on_receive_request_data(
iodata(),
CallbackState :: any())-> {ok, NewState :: any()}.
on_receive_request_data(_Data, State) -> {ok, State}.

-spec on_request_end_stream(
CallbackState :: any()) ->
{ok, NewState :: any()}.
on_request_end_stream(State=#state{conn_pid=ConnPid,
stream_id=StreamId}) ->
ResponseHeaders = [
{<<":status">>,<<"200">>}
],
http2_connection:send_headers(ConnPid, StreamId, ResponseHeaders),
http2_connection:send_body(ConnPid, StreamId, crypto:rand_bytes(?SEND_BYTES),
[{send_end_stream, false}]),
timer:sleep(200),
http2_connection:send_body(ConnPid, StreamId, crypto:rand_bytes(?SEND_BYTES)),
{ok, State}.

3 changes: 2 additions & 1 deletion test/http2c.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ init([]) ->
{_SSH, ServerSettings} = http2_frame:read({Transport, Socket}, 1000),
http2_frame_settings:ack({Transport, Socket}),

ClientSettings = #settings{},
ClientSettings = chatterbox:settings(client),
lager:debug("[client] settings: ~p", [http2_settings:to_proplist(ClientSettings)]),

BinToSend = http2_frame_settings:send(#settings{}, ClientSettings),
Transport:send(Socket, BinToSend),
Expand Down