Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce request encoding performance improvement #69

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 2.3.5
- Improve produce request encoding performance by 35%

* 2.3.4
- Insert sni only when missing in ssl options
- Ensure string() type sni in ssl options
Expand Down
4 changes: 2 additions & 2 deletions include/kpro.hrl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%%% Copyright (c) 2017, Klarna AB
%%% Copyright (c) 2017-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@
, api :: kpro:api()
, vsn :: kpro:vsn()
, no_ack = false :: boolean() %% set to true for fire-n-forget requests
, msg :: binary() | kpro:struct()
, msg :: iodata() | kpro:struct()
}).

-record(kpro_rsp,
Expand Down
4 changes: 1 addition & 3 deletions include/kpro_private.hrl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%%% Copyright (c) 2014-2017, Klarna AB
%%% Copyright (c) 2014-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,8 +77,6 @@

-define(SCHEMA_MODULE, kpro_schema).

-define(IS_STRUCT(S), (is_list(S) orelse is_map(S))).

-define(MIN_MAGIC_2_PRODUCE_API_VSN, 3). %% since kafka 0.11
-define(MIN_MAGIC_2_FETCH_API_VSN, 4). %% since kafka 0.11
-define(MIN_INCREMENTAL_FETCH_API_VSN, 7). %% since kafka 1.1.0
Expand Down
2 changes: 1 addition & 1 deletion src/kpro.erl
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ encode_request(ClientId, CorrId, Req) ->
%% @doc Encode message batch for produce request.
-spec encode_batch(magic(), batch_input(), compress_option()) -> binary().
encode_batch(Magic, Batch, Compression) ->
kpro_batch:encode(Magic, Batch, Compression).
iolist_to_binary(kpro_batch:encode(Magic, Batch, Compression)).

%% @doc The message-set is not decoded upon receiving (in connection process).
%% It is passed as binary to the consumer process and decoded there.
Expand Down
17 changes: 8 additions & 9 deletions src/kpro_batch.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%%% Copyright (c) 2018, Klarna AB
%%% Copyright (c) 2018-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,15 +38,15 @@
-define(NO_META, ?KPRO_NO_BATCH_META).

%% @doc Encode a list of batch inputs into byte stream.
-spec encode(magic(), batch_input(), compress_option()) -> binary().
-spec encode(magic(), batch_input(), compress_option()) -> iodata().
encode(_MagicVsn = 2, Batch, Compression) ->
FirstSequence = -1,
NonTxn = #{ producer_id => -1
, producer_epoch => -1
},
iolist_to_binary(encode_tx(Batch, Compression, FirstSequence, NonTxn));
encode_tx(Batch, Compression, FirstSequence, NonTxn);
encode(MagicVsn, Batch, Compression) ->
iolist_to_binary(kpro_batch_v01:encode(MagicVsn, Batch, Compression)).
kpro_batch_v01:encode(MagicVsn, Batch, Compression).

%% @doc Encode a batch of magic version 2.
% RecordBatch =>
Expand Down Expand Up @@ -93,15 +93,14 @@ encode_tx([FirstMsg | _] = Batch, Compression, FirstSequence,
, enc(int32, Count) % {Count, T8} = dec(int32, T7),
, EncodedBatch
],
Body1 = iolist_to_binary(Body0),
CRC = crc32cer:nif(Body1),
CRC = crc32cer:nif(Body0),
Body =
[ enc(int32, PartitionLeaderEpoch)
, enc(int8, Magic)
, enc(int32, CRC)
, Body1
, Body0
],
Size = kpro_lib:data_size(Body),
Size = iolist_size(Body),
[ enc(int64, FirstOffset)
, enc(int32, Size)
| Body
Expand Down Expand Up @@ -280,7 +279,7 @@ enc_record(Offset, TsBase, #{value := Value} = M) ->
, enc(bytes, Value)
, enc_headers(Headers)
],
Size = kpro_lib:data_size(Body),
Size = iolist_size(Body),
[enc(varint, Size), Body].

enc_headers(Headers) ->
Expand Down
4 changes: 2 additions & 2 deletions src/kpro_batch_v01.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%%% Copyright (c) 2014-2018, Klarna AB
%%% Copyright (c) 2014-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -185,7 +185,7 @@ encode_message(Magic, Codec, Ts, Key, Value, Offset) ->
, enc(bytes, Value)
],
Crc = enc(int32, erlang:crc32(Body)),
Size = kpro_lib:data_size([Crc, Body]),
Size = iolist_size([Crc, Body]),
[enc(int64, Offset),
enc(int32, Size),
Crc, Body
Expand Down
8 changes: 4 additions & 4 deletions src/kpro_connection.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2014-2018, Klarna AB
%%% Copyright (c) 2014-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -412,10 +412,10 @@ handle_msg({From, {send, Request}},
#kpro_req{ref = Ref} ->
kpro_sent_reqs:add(Requests, Caller, Ref, API, Vsn)
end,
RequestBin = kpro_req_lib:encode(ClientId, CorrId, Request),
RequestIoData = kpro_req_lib:encode(ClientId, CorrId, Request),
Res = case Mod of
gen_tcp -> gen_tcp:send(Sock, RequestBin);
ssl -> ssl:send(Sock, RequestBin)
gen_tcp -> gen_tcp:send(Sock, RequestIoData);
ssl -> ssl:send(Sock, RequestIoData)
end,
case Res of
ok ->
Expand Down
23 changes: 7 additions & 16 deletions src/kpro_lib.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2018, Klarna AB
%%% Copyright (c) 2018-2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
-module(kpro_lib).

-export([ copy_bytes/2
, data_size/1
, decode/2
, decode_corr_id/1
, encode/2
Expand Down Expand Up @@ -77,9 +76,9 @@ send_and_recv_raw(Req, Sock, Mod, Timeout) ->
send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req,
Sock, Mod, ClientId, Timeout) ->
CorrId = make_corr_id(),
ReqBin = kpro_req_lib:encode(ClientId, CorrId, Req),
ReqIoData = kpro_req_lib:encode(ClientId, CorrId, Req),
try
RspBin = send_and_recv_raw(ReqBin, Sock, Mod, Timeout),
RspBin = send_and_recv_raw(ReqIoData, Sock, Mod, Timeout),
{CorrId, Body} = decode_corr_id(RspBin), %% assert match CorrId
#kpro_rsp{api = API, vsn = Vsn, msg = Msg} = %% assert match API and Vsn
kpro_rsp_lib:decode(API, Vsn, Body, _DummyRef = false),
Expand Down Expand Up @@ -126,11 +125,6 @@ parse_endpoints(Protocol, Str) ->
end
end, [], string:tokens(Str, ",\n")).

%% @doc Return number of bytes in the given `iodata()'.
-spec data_size(iodata()) -> count().
data_size(IoData) ->
iolist_size(IoData).

%% @doc Encode primitives.
-spec encode(primitive_type(), kpro:primitive()) -> iodata().
encode(boolean, true) -> <<1:8/?INT>>;
Expand All @@ -144,15 +138,12 @@ encode(nullable_string, ?null) -> <<-1:16/?INT>>;
encode(nullable_string, Str) -> encode(string, Str);
encode(string, Atom) when is_atom(Atom) ->
encode(string, atom_to_binary(Atom, utf8));
encode(string, <<>>) -> <<0:16/?INT>>;
encode(string, L) when is_list(L) ->
encode(string, iolist_to_binary(L));
encode(string, B) when is_binary(B) ->
Length = size(B),
<<Length:16/?INT, B/binary>>;
encode(string, Str) ->
Length = iolist_size(Str),
[encode(int16, Length), Str];
encode(bytes, ?null) -> <<-1:32/?INT>>;
encode(bytes, B) when is_binary(B) orelse is_list(B) ->
Size = data_size(B),
Size = iolist_size(B),
case Size =:= 0 of
true -> <<-1:32/?INT>>;
false -> [<<Size:32/?INT>>, B]
Expand Down
43 changes: 26 additions & 17 deletions src/kpro_req_lib.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%%% Copyright (c) 2018, Klarna Bank AB (publ)
%%% Copyright (c) 2018-2020, Klarna Bank AB (publ)
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,6 +54,9 @@
-define(DEFAULT_ACK_TIMEOUT, 10000).
-define(FIELD_ENCODE_ERROR(Reason, EncoderStack),
{field_encode_error, Reason, EncoderStack}).
-define(IS_NON_EMPTY_KV_LIST(L), (is_list(L) andalso L =/= [] andalso is_tuple(hd(L)))).
-define(IS_STRUCT_DATA(S), (is_map(S) orelse ?IS_NON_EMPTY_KV_LIST(S))).
-define(IS_STRUCT_SCHEMA(S), (is_list(S) orelse is_map(S))).

-type vsn() :: kpro:vsn().
-type topic() :: kpro:topic().
Expand Down Expand Up @@ -190,6 +193,7 @@ produce(Vsn, Topic, Partition, Batch) ->
-spec produce(vsn(), topic(), partition(),
binary() | batch_input(), produce_opts()) -> req().
produce(Vsn, Topic, Partition, Batch, Opts) ->
ok = assert_known_api_and_vsn(produce, Vsn),
RequiredAcks = required_acks(maps:get(required_acks, Opts, all_isr)),
Compression = maps:get(compression, Opts, ?no_compression),
AckTimeout = maps:get(ack_timeout, Opts, ?DEFAULT_ACK_TIMEOUT),
Expand All @@ -209,18 +213,22 @@ produce(Vsn, Topic, Partition, Batch, Opts) ->
true = FirstSequence >= 0, %% assert
kpro_batch:encode_tx(Batch, Compression, FirstSequence, TxnCtx)
end,
Fields =
[{transactional_id, transactional_id(TxnCtx)},
{acks, RequiredAcks},
{timeout, AckTimeout},
{topic_data, [[{topic, Topic},
{data, [[{partition, Partition},
{record_set, EncodedBatch}
]]}
]]}
Msg =
[ [encode(string, transactional_id(TxnCtx)) || Vsn > 2]
, encode(int16, RequiredAcks)
, encode(int32, AckTimeout)
, encode(int32, 1) %% topic array header
, encode(string, Topic)
, encode(int32, 1) %% partition array header
, encode(int32, Partition)
, encode(bytes, EncodedBatch)
],
Req = make(produce, Vsn, Fields),
Req#kpro_req{no_ack = RequiredAcks =:= 0}.
#kpro_req{ api = produce
, vsn = Vsn
, msg = Msg
, ref = make_ref()
, no_ack = RequiredAcks =:= 0
}.

%% @doc Make `end_txn' request.
-spec end_txn(txn_ctx(), commit | abort) -> req().
Expand Down Expand Up @@ -352,16 +360,17 @@ required_acks(leader_only) -> 1;
required_acks(all_isr) -> -1;
required_acks(I) when I >= -1 andalso I =< 1 -> I.

encode_struct(_API, _Vsn, Bin) when is_binary(Bin) -> Bin;
encode_struct(API, Vsn, Fields) ->
encode_struct(API, Vsn, Fields) when ?IS_STRUCT_DATA(Fields) ->
Schema = kpro_lib:get_req_schema(API, Vsn),
try
bin(enc_struct(Schema, Fields, [{API, Vsn}]))
enc_struct(Schema, Fields, [{API, Vsn}])
catch
throw : ?FIELD_ENCODE_ERROR(Reason, Stack) ?BIND_STACKTRACE(Trace) ->
?GET_STACKTRACE(Trace),
erlang:raise(error, {Reason, Stack, Fields}, Trace)
end.
end;
encode_struct(_API, _Vsn, IoData) ->
IoData.

%% Encode struct.
enc_struct([], _Values, _Stack) -> [];
Expand Down Expand Up @@ -391,7 +400,7 @@ enc_struct_field({array, Schema}, Values, Stack) ->
false ->
erlang:throw(?FIELD_ENCODE_ERROR(not_array, Stack))
end;
enc_struct_field(Schema, Value, Stack) when ?IS_STRUCT(Schema) ->
enc_struct_field(Schema, Value, Stack) when ?IS_STRUCT_SCHEMA(Schema) ->
enc_struct(Schema, Value, Stack);
enc_struct_field(Primitive, Value, Stack) when is_atom(Primitive) ->
try
Expand Down
49 changes: 49 additions & 0 deletions test/kpro_produce_req_encode_benchmark_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
%%% Copyright (c) 2020, Klarna AB
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%

-module(kpro_produce_req_encode_benchmark_tests).
-include_lib("eunit/include/eunit.hrl").
-include("kpro.hrl").

-define(ITERATIONS, 49999).

run_test_() ->
Batch = make_batch(1),
%{Min, Max} = kpro_schema:vsn_range(produce),
[{"encode benchmark test for version " ++ integer_to_list(V),
{timeout, timer:seconds(30),
fun() ->
run_test(Batch, V)
end}} || V <- lists:seq(2, 3)].

run_test(Batch, Vsn) ->
lists:foreach(fun(_) ->
make_req(Vsn, Batch)
end, lists:seq(0, ?ITERATIONS)).

make_req(Vsn, Batch) ->
kpro_req_lib:produce(Vsn, <<"dummy-topic-name">>, 0, Batch).

make_batch(BatchSize) ->
F = fun(I) ->
#{key => iolist_to_binary("key" ++ integer_to_list(I)),
value => make_value()
}
end,
[F(I) || I <- lists:seq(1, BatchSize)].

make_value() ->
crypto:strong_rand_bytes(1024).