diff --git a/changelog.md b/changelog.md index 02ed420..9322581 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +* 2.3.5 + - Improve produce request encoding performance by 35% + * 2.3.4 - Insert sni only when missing in ssl options - Ensure string() type sni in ssl options diff --git a/include/kpro.hrl b/include/kpro.hrl index 4bd3016..a461fbd 100644 --- a/include/kpro.hrl +++ b/include/kpro.hrl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2017, Klarna AB +%%% Copyright (c) 2017-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ , api :: kpro:api() , vsn :: kpro:vsn() , no_ack = false :: boolean() %% set to true for fire-n-forget requests - , msg :: binary() | kpro:struct() + , msg :: iodata() | kpro:struct() }). -record(kpro_rsp, diff --git a/include/kpro_private.hrl b/include/kpro_private.hrl index 9756d1d..612c34b 100644 --- a/include/kpro_private.hrl +++ b/include/kpro_private.hrl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2014-2017, Klarna AB +%%% Copyright (c) 2014-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -77,8 +77,6 @@ -define(SCHEMA_MODULE, kpro_schema). --define(IS_STRUCT(S), (is_list(S) orelse is_map(S))). - -define(MIN_MAGIC_2_PRODUCE_API_VSN, 3). %% since kafka 0.11 -define(MIN_MAGIC_2_FETCH_API_VSN, 4). %% since kafka 0.11 -define(MIN_INCREMENTAL_FETCH_API_VSN, 7). %% since kafka 1.1.0 diff --git a/src/kpro.erl b/src/kpro.erl index 494445e..464db85 100644 --- a/src/kpro.erl +++ b/src/kpro.erl @@ -288,7 +288,7 @@ encode_request(ClientId, CorrId, Req) -> %% @doc Encode message batch for produce request. -spec encode_batch(magic(), batch_input(), compress_option()) -> binary(). encode_batch(Magic, Batch, Compression) -> - kpro_batch:encode(Magic, Batch, Compression). + iolist_to_binary(kpro_batch:encode(Magic, Batch, Compression)). %% @doc The message-set is not decoded upon receiving (in connection process). %% It is passed as binary to the consumer process and decoded there. diff --git a/src/kpro_batch.erl b/src/kpro_batch.erl index 5f09611..8f405f5 100644 --- a/src/kpro_batch.erl +++ b/src/kpro_batch.erl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2018, Klarna AB +%%% Copyright (c) 2018-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -38,15 +38,15 @@ -define(NO_META, ?KPRO_NO_BATCH_META). %% @doc Encode a list of batch inputs into byte stream. --spec encode(magic(), batch_input(), compress_option()) -> binary(). +-spec encode(magic(), batch_input(), compress_option()) -> iodata(). encode(_MagicVsn = 2, Batch, Compression) -> FirstSequence = -1, NonTxn = #{ producer_id => -1 , producer_epoch => -1 }, - iolist_to_binary(encode_tx(Batch, Compression, FirstSequence, NonTxn)); + encode_tx(Batch, Compression, FirstSequence, NonTxn); encode(MagicVsn, Batch, Compression) -> - iolist_to_binary(kpro_batch_v01:encode(MagicVsn, Batch, Compression)). + kpro_batch_v01:encode(MagicVsn, Batch, Compression). %% @doc Encode a batch of magic version 2. % RecordBatch => @@ -93,15 +93,14 @@ encode_tx([FirstMsg | _] = Batch, Compression, FirstSequence, , enc(int32, Count) % {Count, T8} = dec(int32, T7), , EncodedBatch ], - Body1 = iolist_to_binary(Body0), - CRC = crc32cer:nif(Body1), + CRC = crc32cer:nif(Body0), Body = [ enc(int32, PartitionLeaderEpoch) , enc(int8, Magic) , enc(int32, CRC) - , Body1 + , Body0 ], - Size = kpro_lib:data_size(Body), + Size = iolist_size(Body), [ enc(int64, FirstOffset) , enc(int32, Size) | Body @@ -280,7 +279,7 @@ enc_record(Offset, TsBase, #{value := Value} = M) -> , enc(bytes, Value) , enc_headers(Headers) ], - Size = kpro_lib:data_size(Body), + Size = iolist_size(Body), [enc(varint, Size), Body]. enc_headers(Headers) -> diff --git a/src/kpro_batch_v01.erl b/src/kpro_batch_v01.erl index 0d9e693..dd803d7 100644 --- a/src/kpro_batch_v01.erl +++ b/src/kpro_batch_v01.erl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2014-2018, Klarna AB +%%% Copyright (c) 2014-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -185,7 +185,7 @@ encode_message(Magic, Codec, Ts, Key, Value, Offset) -> , enc(bytes, Value) ], Crc = enc(int32, erlang:crc32(Body)), - Size = kpro_lib:data_size([Crc, Body]), + Size = iolist_size([Crc, Body]), [enc(int64, Offset), enc(int32, Size), Crc, Body diff --git a/src/kpro_connection.erl b/src/kpro_connection.erl index 149c793..a176dca 100644 --- a/src/kpro_connection.erl +++ b/src/kpro_connection.erl @@ -1,5 +1,5 @@ %%% -%%% Copyright (c) 2014-2018, Klarna AB +%%% Copyright (c) 2014-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -412,10 +412,10 @@ handle_msg({From, {send, Request}}, #kpro_req{ref = Ref} -> kpro_sent_reqs:add(Requests, Caller, Ref, API, Vsn) end, - RequestBin = kpro_req_lib:encode(ClientId, CorrId, Request), + RequestIoData = kpro_req_lib:encode(ClientId, CorrId, Request), Res = case Mod of - gen_tcp -> gen_tcp:send(Sock, RequestBin); - ssl -> ssl:send(Sock, RequestBin) + gen_tcp -> gen_tcp:send(Sock, RequestIoData); + ssl -> ssl:send(Sock, RequestIoData) end, case Res of ok -> diff --git a/src/kpro_lib.erl b/src/kpro_lib.erl index 680cfa2..e059eb0 100644 --- a/src/kpro_lib.erl +++ b/src/kpro_lib.erl @@ -1,5 +1,5 @@ %%% -%%% Copyright (c) 2018, Klarna AB +%%% Copyright (c) 2018-2020, Klarna AB %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ -module(kpro_lib). -export([ copy_bytes/2 - , data_size/1 , decode/2 , decode_corr_id/1 , encode/2 @@ -77,9 +76,9 @@ send_and_recv_raw(Req, Sock, Mod, Timeout) -> send_and_recv(#kpro_req{api = API, vsn = Vsn} = Req, Sock, Mod, ClientId, Timeout) -> CorrId = make_corr_id(), - ReqBin = kpro_req_lib:encode(ClientId, CorrId, Req), + ReqIoData = kpro_req_lib:encode(ClientId, CorrId, Req), try - RspBin = send_and_recv_raw(ReqBin, Sock, Mod, Timeout), + RspBin = send_and_recv_raw(ReqIoData, Sock, Mod, Timeout), {CorrId, Body} = decode_corr_id(RspBin), %% assert match CorrId #kpro_rsp{api = API, vsn = Vsn, msg = Msg} = %% assert match API and Vsn kpro_rsp_lib:decode(API, Vsn, Body, _DummyRef = false), @@ -126,11 +125,6 @@ parse_endpoints(Protocol, Str) -> end end, [], string:tokens(Str, ",\n")). -%% @doc Return number of bytes in the given `iodata()'. --spec data_size(iodata()) -> count(). -data_size(IoData) -> - iolist_size(IoData). - %% @doc Encode primitives. -spec encode(primitive_type(), kpro:primitive()) -> iodata(). encode(boolean, true) -> <<1:8/?INT>>; @@ -144,15 +138,12 @@ encode(nullable_string, ?null) -> <<-1:16/?INT>>; encode(nullable_string, Str) -> encode(string, Str); encode(string, Atom) when is_atom(Atom) -> encode(string, atom_to_binary(Atom, utf8)); -encode(string, <<>>) -> <<0:16/?INT>>; -encode(string, L) when is_list(L) -> - encode(string, iolist_to_binary(L)); -encode(string, B) when is_binary(B) -> - Length = size(B), - <>; +encode(string, Str) -> + Length = iolist_size(Str), + [encode(int16, Length), Str]; encode(bytes, ?null) -> <<-1:32/?INT>>; encode(bytes, B) when is_binary(B) orelse is_list(B) -> - Size = data_size(B), + Size = iolist_size(B), case Size =:= 0 of true -> <<-1:32/?INT>>; false -> [<>, B] diff --git a/src/kpro_req_lib.erl b/src/kpro_req_lib.erl index f7ec733..cf690c0 100644 --- a/src/kpro_req_lib.erl +++ b/src/kpro_req_lib.erl @@ -1,4 +1,4 @@ -%%% Copyright (c) 2018, Klarna Bank AB (publ) +%%% Copyright (c) 2018-2020, Klarna Bank AB (publ) %%% %%% Licensed under the Apache License, Version 2.0 (the "License"); %%% you may not use this file except in compliance with the License. @@ -54,6 +54,9 @@ -define(DEFAULT_ACK_TIMEOUT, 10000). -define(FIELD_ENCODE_ERROR(Reason, EncoderStack), {field_encode_error, Reason, EncoderStack}). +-define(IS_NON_EMPTY_KV_LIST(L), (is_list(L) andalso L =/= [] andalso is_tuple(hd(L)))). +-define(IS_STRUCT_DATA(S), (is_map(S) orelse ?IS_NON_EMPTY_KV_LIST(S))). +-define(IS_STRUCT_SCHEMA(S), (is_list(S) orelse is_map(S))). -type vsn() :: kpro:vsn(). -type topic() :: kpro:topic(). @@ -190,6 +193,7 @@ produce(Vsn, Topic, Partition, Batch) -> -spec produce(vsn(), topic(), partition(), binary() | batch_input(), produce_opts()) -> req(). produce(Vsn, Topic, Partition, Batch, Opts) -> + ok = assert_known_api_and_vsn(produce, Vsn), RequiredAcks = required_acks(maps:get(required_acks, Opts, all_isr)), Compression = maps:get(compression, Opts, ?no_compression), AckTimeout = maps:get(ack_timeout, Opts, ?DEFAULT_ACK_TIMEOUT), @@ -209,18 +213,22 @@ produce(Vsn, Topic, Partition, Batch, Opts) -> true = FirstSequence >= 0, %% assert kpro_batch:encode_tx(Batch, Compression, FirstSequence, TxnCtx) end, - Fields = - [{transactional_id, transactional_id(TxnCtx)}, - {acks, RequiredAcks}, - {timeout, AckTimeout}, - {topic_data, [[{topic, Topic}, - {data, [[{partition, Partition}, - {record_set, EncodedBatch} - ]]} - ]]} + Msg = + [ [encode(string, transactional_id(TxnCtx)) || Vsn > 2] + , encode(int16, RequiredAcks) + , encode(int32, AckTimeout) + , encode(int32, 1) %% topic array header + , encode(string, Topic) + , encode(int32, 1) %% partition array header + , encode(int32, Partition) + , encode(bytes, EncodedBatch) ], - Req = make(produce, Vsn, Fields), - Req#kpro_req{no_ack = RequiredAcks =:= 0}. + #kpro_req{ api = produce + , vsn = Vsn + , msg = Msg + , ref = make_ref() + , no_ack = RequiredAcks =:= 0 + }. %% @doc Make `end_txn' request. -spec end_txn(txn_ctx(), commit | abort) -> req(). @@ -352,16 +360,17 @@ required_acks(leader_only) -> 1; required_acks(all_isr) -> -1; required_acks(I) when I >= -1 andalso I =< 1 -> I. -encode_struct(_API, _Vsn, Bin) when is_binary(Bin) -> Bin; -encode_struct(API, Vsn, Fields) -> +encode_struct(API, Vsn, Fields) when ?IS_STRUCT_DATA(Fields) -> Schema = kpro_lib:get_req_schema(API, Vsn), try - bin(enc_struct(Schema, Fields, [{API, Vsn}])) + enc_struct(Schema, Fields, [{API, Vsn}]) catch throw : ?FIELD_ENCODE_ERROR(Reason, Stack) ?BIND_STACKTRACE(Trace) -> ?GET_STACKTRACE(Trace), erlang:raise(error, {Reason, Stack, Fields}, Trace) - end. + end; +encode_struct(_API, _Vsn, IoData) -> + IoData. %% Encode struct. enc_struct([], _Values, _Stack) -> []; @@ -391,7 +400,7 @@ enc_struct_field({array, Schema}, Values, Stack) -> false -> erlang:throw(?FIELD_ENCODE_ERROR(not_array, Stack)) end; -enc_struct_field(Schema, Value, Stack) when ?IS_STRUCT(Schema) -> +enc_struct_field(Schema, Value, Stack) when ?IS_STRUCT_SCHEMA(Schema) -> enc_struct(Schema, Value, Stack); enc_struct_field(Primitive, Value, Stack) when is_atom(Primitive) -> try diff --git a/test/kpro_produce_req_encode_benchmark_tests.erl b/test/kpro_produce_req_encode_benchmark_tests.erl new file mode 100644 index 0000000..39ab417 --- /dev/null +++ b/test/kpro_produce_req_encode_benchmark_tests.erl @@ -0,0 +1,49 @@ +%%% Copyright (c) 2020, Klarna AB +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% + +-module(kpro_produce_req_encode_benchmark_tests). +-include_lib("eunit/include/eunit.hrl"). +-include("kpro.hrl"). + +-define(ITERATIONS, 49999). + +run_test_() -> + Batch = make_batch(1), + %{Min, Max} = kpro_schema:vsn_range(produce), + [{"encode benchmark test for version " ++ integer_to_list(V), + {timeout, timer:seconds(30), + fun() -> + run_test(Batch, V) + end}} || V <- lists:seq(2, 3)]. + +run_test(Batch, Vsn) -> + lists:foreach(fun(_) -> + make_req(Vsn, Batch) + end, lists:seq(0, ?ITERATIONS)). + +make_req(Vsn, Batch) -> + kpro_req_lib:produce(Vsn, <<"dummy-topic-name">>, 0, Batch). + +make_batch(BatchSize) -> + F = fun(I) -> + #{key => iolist_to_binary("key" ++ integer_to_list(I)), + value => make_value() + } + end, + [F(I) || I <- lists:seq(1, BatchSize)]. + +make_value() -> + crypto:strong_rand_bytes(1024). +