Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize node cleanup for mod_last #4274

Merged
merged 19 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion big_tests/tests/last_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
-include_lib("escalus/include/escalus.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("eunit/include/eunit.hrl").

-import(config_parser_helper, [mod_config_with_auto_backend/2]).

Expand All @@ -37,7 +38,7 @@ groups() ->
valid_test_cases() -> [online_user_query,
last_online_user,
last_offline_user,
last_server].
last_server, sessions_cleanup].

invalid_test_cases() -> [user_not_subscribed_receives_error].

Expand Down Expand Up @@ -169,6 +170,36 @@ user_not_subscribed_receives_error(Config) ->
ok
end).

sessions_cleanup(Config) ->
N = distributed_helper:mim(),
HostType = domain_helper:host_type(),
Server = domain_helper:domain(),
CreateUser = fun(Name) ->
SID = {erlang:system_time(microsecond), spawn(fun() -> ok end)},
JID = mongoose_helper:make_jid(Name, Server, <<"res">>),
Priority = 0,
Info = #{},
distributed_helper:rpc(N, ejabberd_sm, open_session, [HostType, SID, JID, Priority, Info])
end,
Names = [<<"user", (list_to_binary((integer_to_list(X))))/binary>> || X <- lists:seq(1, 345)],
measure("create users", fun() ->
lists:foreach(CreateUser, Names)
end),
%% Check that user3 is properly updated
%% User should be registered if we want to use mod_last_api
{ok, _} = distributed_helper:rpc(N, mongoose_account_api, register_user, [<<"user3">>, Server, <<"secret123">>]),
Jid3 = mongoose_helper:make_jid(<<"user3">>, Server, <<>>),
{ok, _} = distributed_helper:rpc(N, mod_last_api, set_last, [Jid3, 1714000000, <<"old status">>]),
{ok, #{timestamp := 1714000000}} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]),
measure("node cleanup", fun() ->
distributed_helper:rpc(N#{timeout => timer:minutes(1)}, mongoose_hooks, node_cleanup, [node()])
end),
{ok, #{timestamp := TS, status := Status} = Data} = distributed_helper:rpc(N, mod_last_api, get_last, [Jid3]),
?assertNotEqual(TS, 1714000000, Data),
?assertEqual(Status, <<>>, Data),
distributed_helper:rpc(N, mongoose_metrics, update, [HostType, sessionCount, -345]),
{ok, _} = distributed_helper:rpc(N, mongoose_account_api, unregister_user, [<<"user3">>, Server]).


%%-----------------------------------------------------------------
%% Helpers
Expand All @@ -193,3 +224,8 @@ answer_last_activity(IQ = #xmlel{name = <<"iq">>}) ->

required_modules() ->
[{mod_last, mod_config_with_auto_backend(mod_last, #{iqdisc => one_queue})}].

measure(Text, F) ->
{Time, _} = timer:tc(F),
ct:pal("Time ~ts = ~p", [Text, Time]),
ok.
80 changes: 77 additions & 3 deletions big_tests/tests/rdbms_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ rdbms_queries_cases() ->
test_failed_transaction_with_execute_wrapped,
test_failed_wrapper_transaction,
test_incremental_upsert,
arguments_from_two_tables].
arguments_from_two_tables,
test_upsert_many1,
test_upsert_many2,
test_upsert_many1_replaces_existing,
test_upsert_many2_replaces_existing].

suite() ->
escalus:suite().
Expand Down Expand Up @@ -602,6 +606,63 @@ do_test_incremental_upsert(Config) ->
SelectResult = sql_query(Config, <<"SELECT timestamp FROM inbox">>),
?assertEqual({selected, [{<<"43">>}]}, selected_to_binary(SelectResult)).

test_upsert_many1(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 1, upsert_many_last1, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>],
Update = [0, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, 1} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update).

test_upsert_many2(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 2, upsert_many_last2, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>],
Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>],
Update = [0, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, 2} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update).

test_upsert_many1_replaces_existing(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 1, upsert_many_last1, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"kate">>, 0, <<>>],
Update1 = [0, <<>>],
Insert2 = [<<"localhost">>, <<"kate">>, 10, <<>>],
Update2 = [10, <<>>],
%% Replace returns wrong numbers with MySQL (2 instead of 1, 4 instead of 2)
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert1, Update1),
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last1, Insert2, Update2),
SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>),
?assertEqual({selected, [{<<"10">>}]}, selected_to_binary(SelectResult)).

test_upsert_many2_replaces_existing(Config) ->
erase_last(Config),
sql_prepare_upsert_many(Config, 2, upsert_many_last2, last,
[<<"server">>, <<"username">>, <<"seconds">>, <<"state">>],
[<<"seconds">>, <<"state">>],
[<<"server">>, <<"username">>]),
Insert1 = [<<"localhost">>, <<"alice">>, 0, <<>>],
Insert3 = [<<"localhost">>, <<"alice">>, 10, <<>>],
Insert2 = [<<"localhost">>, <<"bob">>, 0, <<>>],
Insert4 = [<<"localhost">>, <<"bob">>, 10, <<>>],
Update1 = [0, <<>>],
Update3 = [10, <<>>],
%% Records keys must be unique (i.e. we cannot insert alice twice)
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert1 ++ Insert2, Update1),
{updated, _} = sql_execute_upsert_many(Config, upsert_many_last2, Insert3 ++ Insert4, Update3),
SelectResult = sql_query(Config, <<"SELECT seconds FROM last">>),
?assertEqual({selected, [{<<"10">>}, {<<"10">>}]}, selected_to_binary(SelectResult)).

%%--------------------------------------------------------------------
%% Text searching
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -635,6 +696,9 @@ sql_prepare(_Config, Name, Table, Fields, Query) ->
sql_prepare_upsert(_Config, Name, Table, Insert, Update, Unique, Incr) ->
escalus_ejabberd:rpc(rdbms_queries, prepare_upsert, [host_type(), Name, Table, Insert, Update, Unique, Incr]).

sql_prepare_upsert_many(_Config, RecordCount, Name, Table, Insert, Update, Unique) ->
escalus_ejabberd:rpc(rdbms_queries, prepare_upsert_many, [host_type(), RecordCount, Name, Table, Insert, Update, Unique]).

sql_execute(Config, Name, Parameters) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, execute, ScopeAndTag ++ [Name, Parameters]).
Expand Down Expand Up @@ -670,6 +734,10 @@ sql_execute_upsert(Config, Name, Insert, Update, Unique) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(rdbms_queries, execute_upsert, ScopeAndTag ++ [Name, Insert, Update, Unique]).

sql_execute_upsert_many(Config, Name, Insert, Update) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(rdbms_queries, execute_upsert_many, ScopeAndTag ++ [Name, Insert, Update]).

sql_query_request(Config, Query) ->
ScopeAndTag = scope_and_tag(Config),
slow_rpc(mongoose_rdbms, sql_query_request, ScopeAndTag ++ [Query]).
Expand Down Expand Up @@ -725,6 +793,9 @@ decode_boolean(_Config, Value) ->
erase_table(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM test_types">>).

erase_last(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM last">>).

erase_users(Config) ->
{updated, _} = sql_query(Config, <<"DELETE FROM users">>),
{updated, _} = sql_query(Config, <<"DELETE FROM last">>).
Expand Down Expand Up @@ -765,11 +836,14 @@ integer_to_binary_or_null(X) -> integer_to_binary(X).
%% Helper function to transform values to an uniform format.
%% Single tuple, single element case.
%% In ODBC int32 is integer, but int64 is binary.
selected_to_binary({selected, [{Value}]}) when is_integer(Value) ->
{selected, [{integer_to_binary(Value)}]};
selected_to_binary({selected, Rows}) ->
{selected, [row_to_binary(Row) || Row <- Rows]};
selected_to_binary(Other) ->
Other.

row_to_binary(Row) ->
list_to_tuple([value_to_binary(Value) || Value <- tuple_to_list(Row)]).

selected_to_sorted({selected, Rows}) ->
{selected, lists:sort(Rows)};
selected_to_sorted(Other) ->
Expand Down
39 changes: 36 additions & 3 deletions src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
is_offline/1,
get_user_present_pids/2,
sync/0,
run_session_cleanup_hook/1,
session_cleanup/1,
sessions_cleanup/1,
terminate_session/2,
sm_backend/0
]).
Expand Down Expand Up @@ -354,8 +355,8 @@
ejabberd_sm ! {unregister_iq_handler, Host, XMLNS},
ok.

-spec run_session_cleanup_hook(#session{}) -> mongoose_acc:t().
run_session_cleanup_hook(#session{usr = {U, S, R}, sid = SID}) ->
-spec session_cleanup(#session{}) -> mongoose_acc:t().
session_cleanup(#session{usr = {U, S, R}, sid = SID}) ->
{ok, HostType} = mongoose_domain_api:get_domain_host_type(S),
Acc = mongoose_acc:new(
#{location => ?LOCATION,
Expand All @@ -364,6 +365,38 @@
element => undefined}),
mongoose_hooks:session_cleanup(S, Acc, U, R, SID).

-spec sessions_cleanup([#session{}]) -> ok.
sessions_cleanup(Sessions) ->
SerSess = [{Server, Session} || Session = #session{usr = {_, Server, _}} <- Sessions],
Servers = lists:usort([Server || {Server, _Session} <- SerSess]),
Map = maps:from_list([{Server, server_to_host_type(Server)} || Server <- Servers]),
HTSession = [{maps:get(Server, Map), Session} || {Server, Session} <- SerSess],
HT2Session = group_sessions(lists:sort(HTSession)),
[mongoose_hooks:sessions_cleanup(HostType, HTSessions)
|| {HostType, HTSessions} <- HT2Session, HostType =/= undefined],
ok.

%% Group sessions by HostType.
%% Sessions should be sorted.
group_sessions([{HostType, Session} | Sessions]) ->
{Acc, Sessions2} = group_sessions(HostType, [Session], Sessions),
[{HostType, Acc} | group_sessions(Sessions2)];
group_sessions([]) ->
[].

group_sessions(HostType, Acc, [{HostType, Session} | Sessions]) ->
group_sessions(HostType, [Session | Acc], Sessions);
group_sessions(_HostType, Acc, Sessions) ->
{lists:reverse(Acc), Sessions}.

server_to_host_type(Server) ->
case mongoose_domain_api:get_domain_host_type(Server) of
{ok, HostType} ->
HostType;
_ ->
undefined

Check warning on line 397 in src/ejabberd_sm.erl

View check run for this annotation

Codecov / codecov/patch

src/ejabberd_sm.erl#L397

Added line #L397 was not covered by tests
end.

-spec terminate_session(jid:jid() | pid(), binary()) -> ok | no_session.
terminate_session(#jid{} = Jid, Reason) ->
case get_session_pid(Jid) of
Expand Down
9 changes: 5 additions & 4 deletions src/ejabberd_sm_cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ cleanup(Node) ->
%% This is a full table scan, but cleanup is rare.
Tuples = ets:select(?TABLE, [{R, [Guard], ['$_']}]),
cets:delete_many(?TABLE, [Key || {Key, _, _} <- Tuples]),
lists:foreach(fun(Tuple) ->
Session = tuple_to_session(Tuple),
ejabberd_sm:run_session_cleanup_hook(Session)
end, Tuples).
Sessions = tuples_to_sessions(Tuples),
ejabberd_sm:sessions_cleanup(Sessions),
lists:foreach(fun(Session) ->
ejabberd_sm:session_cleanup(Session)
end, Sessions).

-spec total_count() -> integer().
total_count() ->
Expand Down
3 changes: 2 additions & 1 deletion src/ejabberd_sm_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ cleanup(Node) ->
[{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, Node}],
['$_']}]),
ejabberd_sm:sessions_cleanup(Es),
lists:foreach(fun(#session{sid = SID} = Session) ->
mnesia:delete({session, SID}),
ejabberd_sm:run_session_cleanup_hook(Session)
ejabberd_sm:session_cleanup(Session)
end, Es)
end,
mnesia:async_dirty(F).
Expand Down
21 changes: 14 additions & 7 deletions src/ejabberd_sm_redis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,19 @@ get_sessions(User, Server, Resource) ->
Session :: ejabberd_sm:session()) -> ok | {error, term()}.
set_session(User, Server, Resource, Session) ->
OldSessions = get_sessions(User, Server, Resource),
Node = sid_to_node(Session#session.sid),
case lists:keysearch(Session#session.sid, #session.sid, OldSessions) of
{value, OldSession} ->
BOldSession = term_to_binary(OldSession),
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)],
mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)],
["SREM", hash(User, Server), BOldSession],
["SREM", hash(User, Server, Resource), BOldSession],
["SADD", hash(User, Server), BSession],
["SADD", hash(User, Server, Resource), BSession]]);
false ->
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SADD", n(node()), hash(User, Server, Resource, Session#session.sid)],
mongoose_redis:cmds([["SADD", n(Node), hash(User, Server, Resource, Session#session.sid)],
["SADD", hash(User, Server), BSession],
["SADD", hash(User, Server, Resource), BSession]])
end.
Expand All @@ -103,7 +104,7 @@ delete_session(SID, User, Server, Resource) ->
BSession = term_to_binary(Session),
mongoose_redis:cmds([["SREM", hash(User, Server), BSession],
["SREM", hash(User, Server, Resource), BSession],
["SREM", n(node()), hash(User, Server, Resource, SID)]]);
["SREM", n(sid_to_node(SID)), hash(User, Server, Resource, SID)]]);
false ->
ok
end.
Expand All @@ -116,7 +117,7 @@ cleanup(Node) ->
maybe_initial_cleanup(Node, Initial) ->
Hashes = mongoose_redis:cmd(["SMEMBERS", n(Node)]),
mongoose_redis:cmd(["DEL", n(Node)]),
lists:foreach(fun(H) ->
Sessions = lists:map(fun(H) ->
[_, U, S, R | SIDEncoded] = re:split(H, ":"),
%% Add possible removed ":" from encoded SID
SID = binary_to_term(mongoose_bin:join(SIDEncoded, <<":">>)),
Expand All @@ -125,10 +126,12 @@ maybe_initial_cleanup(Node, Initial) ->
true ->
ok;
false ->
ejabberd_sm:run_session_cleanup_hook(#session{usr = {U, S, R},
sid = SID})
Session = #session{usr = {U, S, R}, sid = SID},
ejabberd_sm:session_cleanup(Session),
Session
end
end, Hashes).
end, Hashes),
ejabberd_sm:sessions_cleanup(Sessions).

-spec total_count() -> integer().
total_count() ->
Expand Down Expand Up @@ -160,3 +163,7 @@ hash(Val1, Val2, Val3, Val4) ->
-spec n(atom()) -> iolist().
n(Node) ->
["n:", atom_to_list(Node)].

sid_to_node(SID) ->
{_, Pid} = SID,
node(Pid).
6 changes: 6 additions & 0 deletions src/hooks/mongoose_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
remove_user/3,
resend_offline_messages_hook/2,
session_cleanup/5,
sessions_cleanup/2,
set_vcard/3,
unacknowledged_message/2,
filter_unacknowledged_messages/3,
Expand Down Expand Up @@ -369,6 +370,11 @@ session_cleanup(Server, Acc, User, Resource, SID) ->
HostType = mongoose_acc:host_type(Acc),
run_hook_for_host_type(session_cleanup, HostType, Acc, Params).

-spec sessions_cleanup(mongooseim:host_type(), [ejabberd_sm:session()]) -> map().
sessions_cleanup(HostType, Sessions) ->
Params = #{sessions => Sessions},
run_hook_for_host_type(sessions_cleanup, HostType, #{host_type => HostType}, Params).

%%% @doc The `set_vcard' hook is called when the caller wants to set the VCard.
-spec set_vcard(HostType, UserJID, VCard) -> Result when
HostType :: mongooseim:host_type(),
Expand Down
Loading