]> granicus.if.org Git - ejabberd/commitdiff
Keep last handled stanzas number in cache rather than session table
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Fri, 30 Nov 2018 13:18:49 +0000 (16:18 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Fri, 30 Nov 2018 13:19:00 +0000 (16:19 +0300)
rebar.config
src/ejabberd_sm.erl
src/mod_stream_mgmt.erl

index e7e9120ae002676685d9d087219f614beba9662c..b03a6bba7e1cf172061532d5aa8a74e8b1cb331c 100644 (file)
@@ -20,7 +20,7 @@
 
 {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.7"}},
         {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.13"}}},
-        {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.16"}}},
+        {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "6493974"}}},
         {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}},
         {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}},
         {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}},
index d9e211656eeca70d7994dfaa45efdc65abea7e4e..de871a11c5d139faba3923445d89fa658e0c9a8f 100644 (file)
@@ -50,8 +50,6 @@
         set_presence/7,
         unset_presence/6,
         close_session_unset_presence/5,
-        set_offline_info/5,
-        get_offline_info/4,
         dirty_get_sessions_list/0,
         dirty_get_my_sessions_list/0,
         get_vh_session_list/1,
@@ -78,8 +76,7 @@
         host_down/1,
         make_sid/0,
         clean_cache/1,
-        config_reloaded/0,
-        is_online/1
+        config_reloaded/0
        ]).
 
 -export([init/1, handle_call/3, handle_cast/2,
@@ -211,14 +208,14 @@ get_user_resources(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
     Mod = get_sm_backend(LServer),
-    Ss = online(get_sessions(Mod, LUser, LServer)),
+    Ss = get_sessions(Mod, LUser, LServer),
     [element(3, S#session.usr) || S <- clean_session_list(Ss)].
 
 -spec get_user_present_resources(binary(), binary()) -> [tuple()].
 
 get_user_present_resources(LUser, LServer) ->
     Mod = get_sm_backend(LServer),
-    Ss = online(get_sessions(Mod, LUser, LServer)),
+    Ss = get_sessions(Mod, LUser, LServer),
     [{S#session.priority, element(3, S#session.usr)}
      || S <- clean_session_list(Ss), is_integer(S#session.priority)].
 
@@ -229,7 +226,7 @@ get_user_ip(User, Server, Resource) ->
     LServer = jid:nameprep(Server),
     LResource = jid:resourceprep(Resource),
     Mod = get_sm_backend(LServer),
-    case online(get_sessions(Mod, LUser, LServer, LResource)) of
+    case get_sessions(Mod, LUser, LServer, LResource) of
        [] ->
            undefined;
        Ss ->
@@ -242,7 +239,7 @@ get_user_info(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
     Mod = get_sm_backend(LServer),
-    Ss = online(get_sessions(Mod, LUser, LServer)),
+    Ss = get_sessions(Mod, LUser, LServer),
     [{LResource, [{node, node(Pid)}, {ts, Ts}, {pid, Pid},
                  {priority, Priority} | Info]}
      || #session{usr = {_, _, LResource},
@@ -257,7 +254,7 @@ get_user_info(User, Server, Resource) ->
     LServer = jid:nameprep(Server),
     LResource = jid:resourceprep(Resource),
     Mod = get_sm_backend(LServer),
-    case online(get_sessions(Mod, LUser, LServer, LResource)) of
+    case get_sessions(Mod, LUser, LServer, LResource) of
        [] ->
            offline;
        Ss ->
@@ -316,7 +313,7 @@ get_session_sid(User, Server, Resource) ->
     LServer = jid:nameprep(Server),
     LResource = jid:resourceprep(Resource),
     Mod = get_sm_backend(LServer),
-    case online(get_sessions(Mod, LUser, LServer, LResource)) of
+    case get_sessions(Mod, LUser, LServer, LResource) of
        [] ->
            none;
        Ss ->
@@ -330,43 +327,15 @@ get_session_sids(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
     Mod = get_sm_backend(LServer),
-    Sessions = online(get_sessions(Mod, LUser, LServer)),
+    Sessions = get_sessions(Mod, LUser, LServer),
     [SID || #session{sid = SID} <- Sessions].
 
--spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
-
-set_offline_info(SID, User, Server, Resource, Info) ->
-    LUser = jid:nodeprep(User),
-    LServer = jid:nameprep(Server),
-    LResource = jid:resourceprep(Resource),
-    set_session(SID, LUser, LServer, LResource, undefined, [offline | Info]).
-
--spec get_offline_info(erlang:timestamp(), binary(), binary(),
-                       binary()) -> none | info().
-
-get_offline_info(Time, User, Server, Resource) ->
-    LUser = jid:nodeprep(User),
-    LServer = jid:nameprep(Server),
-    LResource = jid:resourceprep(Resource),
-    Mod = get_sm_backend(LServer),
-    case get_sessions(Mod, LUser, LServer, LResource) of
-       [#session{sid = {Time, _}, info = Info}] ->
-           case proplists:get_bool(offline, Info) of
-               true ->
-                   Info;
-               false ->
-                   none
-           end;
-       _ ->
-           none
-    end.
-
 -spec dirty_get_sessions_list() -> [ljid()].
 
 dirty_get_sessions_list() ->
     lists:flatmap(
       fun(Mod) ->
-             [S#session.usr || S <- online(get_sessions(Mod))]
+             [S#session.usr || S <- get_sessions(Mod)]
       end, get_sm_backends()).
 
 -spec dirty_get_my_sessions_list() -> [#session{}].
@@ -374,7 +343,7 @@ dirty_get_sessions_list() ->
 dirty_get_my_sessions_list() ->
     lists:flatmap(
       fun(Mod) ->
-             [S || S <- online(get_sessions(Mod)),
+             [S || S <- get_sessions(Mod),
                    node(element(2, S#session.sid)) == node()]
       end, get_sm_backends()).
 
@@ -383,14 +352,14 @@ dirty_get_my_sessions_list() ->
 get_vh_session_list(Server) ->
     LServer = jid:nameprep(Server),
     Mod = get_sm_backend(LServer),
-    [S#session.usr || S <- online(get_sessions(Mod, LServer))].
+    [S#session.usr || S <- get_sessions(Mod, LServer)].
 
 -spec get_all_pids() -> [pid()].
 
 get_all_pids() ->
     lists:flatmap(
       fun(Mod) ->
-             [element(2, S#session.sid) || S <- online(get_sessions(Mod))]
+             [element(2, S#session.sid) || S <- get_sessions(Mod)]
       end, get_sm_backends()).
 
 -spec get_vh_session_number(binary()) -> non_neg_integer().
@@ -398,7 +367,7 @@ get_all_pids() ->
 get_vh_session_number(Server) ->
     LServer = jid:nameprep(Server),
     Mod = get_sm_backend(LServer),
-    length(online(get_sessions(Mod, LServer))).
+    length(get_sessions(Mod, LServer)).
 
 %% Why the hell do we have so many similar kicks?
 c2s_handle_info(#{lang := Lang} = State, replaced) ->
@@ -579,16 +548,6 @@ delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) ->
            ok
     end.
 
--spec online([#session{}]) -> [#session{}].
-
-online(Sessions) ->
-    lists:filter(fun is_online/1, Sessions).
-
--spec is_online(#session{}) -> boolean().
-
-is_online(#session{info = Info}) ->
-    not proplists:get_bool(offline, Info).
-
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 -spec do_route(jid(), term()) -> any().
 do_route(#jid{lresource = <<"">>} = To, Term) ->
@@ -600,7 +559,7 @@ do_route(To, Term) ->
     ?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]),
     {U, S, R} = jid:tolower(To),
     Mod = get_sm_backend(S),
-    case online(get_sessions(Mod, U, S, R)) of
+    case get_sessions(Mod, U, S, R) of
        [] ->
            ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]);
        Ss ->
@@ -631,7 +590,7 @@ do_route(#presence{to = To, type = T} = Packet)
                      ejabberd_c2s:route(Pid, {route, Packet1});
                 (_) ->
                      ok
-             end, online(get_sessions(Mod, LUser, LServer)));
+             end, get_sessions(Mod, LUser, LServer));
        false ->
            ok
     end;
@@ -660,7 +619,7 @@ do_route(Packet) ->
     To = xmpp:get_to(Packet),
     {LUser, LServer, LResource} = jid:tolower(To),
     Mod = get_sm_backend(LServer),
-    case online(get_sessions(Mod, LUser, LServer, LResource)) of
+    case get_sessions(Mod, LUser, LServer, LResource) of
        [] ->
            case Packet of
                #message{type = T} when T == chat; T == normal ->
@@ -708,8 +667,8 @@ route_message(#message{to = To, type = Type} = Packet) ->
                                          (P >= 0) and (Type == headline) ->
                                LResource = jid:resourceprep(R),
                                Mod = get_sm_backend(LServer),
-                               case online(get_sessions(Mod, LUser, LServer,
-                                                            LResource)) of
+                               case get_sessions(Mod, LUser, LServer,
+                                                 LResource) of
                                  [] ->
                                      ok; % Race condition
                                  Ss ->
@@ -780,13 +739,9 @@ check_for_sessions_to_replace(User, Server, Resource) ->
 check_existing_resources(LUser, LServer, LResource) ->
     Mod = get_sm_backend(LServer),
     Ss = get_sessions(Mod, LUser, LServer, LResource),
-    {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
-    lists:foreach(fun(S) ->
-                         delete_session(Mod, S)
-                 end, OfflineSs),
-    if OnlineSs == [] -> ok;
+    if Ss == [] -> ok;
        true ->
-          SIDs = [SID || #session{sid = SID} <- OnlineSs],
+          SIDs = [SID || #session{sid = SID} <- Ss],
           MaxSID = lists:max(SIDs),
           lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
                                 ejabberd_c2s:route(Pid, replaced);
@@ -806,22 +761,17 @@ get_resource_sessions(User, Server, Resource) ->
     LServer = jid:nameprep(Server),
     LResource = jid:resourceprep(Resource),
     Mod = get_sm_backend(LServer),
-    [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))].
+    [S#session.sid || S <- get_sessions(Mod, LUser, LServer, LResource)].
 
 -spec check_max_sessions(binary(), binary()) -> ok | replaced.
 check_max_sessions(LUser, LServer) ->
     Mod = get_sm_backend(LServer),
     Ss = get_sessions(Mod, LUser, LServer),
-    {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
     MaxSessions = get_max_user_sessions(LUser, LServer),
-    if length(OnlineSs) =< MaxSessions -> ok;
+    if length(Ss) =< MaxSessions -> ok;
        true ->
-           #session{sid = {_, Pid}} = lists:min(OnlineSs),
+           #session{sid = {_, Pid}} = lists:min(Ss),
            ejabberd_c2s:route(Pid, replaced)
-    end,
-    if length(OfflineSs) =< MaxSessions -> ok;
-       true ->
-           delete_session(Mod, lists:min(OfflineSs))
     end.
 
 %% Get the user_max_session setting
@@ -843,7 +793,7 @@ get_max_user_sessions(LUser, Host) ->
 
 force_update_presence({LUser, LServer}) ->
     Mod = get_sm_backend(LServer),
-    Ss = online(get_sessions(Mod, LUser, LServer)),
+    Ss = get_sessions(Mod, LUser, LServer),
     lists:foreach(fun (#session{sid = {_, Pid}}) ->
                          ejabberd_c2s:resend_presence(Pid)
                  end,
index 1927afa95eca3bd9b8d60bdbb3902bf01b765ceb..546c45a69c35c870174b4aa2754503e77ec17a7b 100644 (file)
@@ -40,6 +40,8 @@
 -include("logger.hrl").
 -include("p1_queue.hrl").
 
+-define(STREAM_MGMT_CACHE, stream_mgmt_cache).
+
 -define(is_sm_packet(Pkt),
        is_record(Pkt, sm_enable) or
        is_record(Pkt, sm_resume) or
@@ -51,7 +53,8 @@
 %%%===================================================================
 %%% API
 %%%===================================================================
-start(Host, _Opts) ->
+start(Host, Opts) ->
+    init_cache(Opts),
     ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50),
     ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE,
                       c2s_stream_started, 50),
@@ -284,23 +287,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
           [jid:encode(JID)]),
     bounce_message_queue(),
     {stop, State};
-c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID,
-                user := U, server := S, resource := R} = State, Reason) ->
-    Result = case MgmtState of
-                timeout ->
-                    Info = [{num_stanzas_in, In}],
-                    %% TODO: Usually, ejabberd_c2s:process_terminated/2 is
-                    %% called later in the hook chain.  We swap the order so
-                    %% that the offline info won't be purged after we stored
-                    %% it.  This should be fixed in a proper way.
-                    State1 = ejabberd_c2s:process_terminated(State, Reason),
-                    ejabberd_sm:set_offline_info(SID, U, S, R, Info),
-                    {stop, State1};
-                _ ->
-                    State
-            end,
+c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
+                sid := {Time, _}, jid := JID} = State, _Reason) ->
+    case MgmtState of
+       timeout ->
+           store_stanzas_in(jid:tolower(JID), Time, In);
+       _ ->
+           ok
+    end,
     route_unacked_stanzas(State),
-    Result;
+    State;
 c2s_terminated(State, _Reason) ->
     State.
 
@@ -641,16 +637,11 @@ inherit_session_state(#{user := U, server := S,
        {term, {R, Time}} ->
            case ejabberd_sm:get_session_pid(U, S, R) of
                none ->
-                   case ejabberd_sm:get_offline_info(Time, U, S, R) of
-                       none ->
+                   case pop_stanzas_in({U, S, R}, Time) of
+                       error ->
                            {error, <<"Previous session PID not found">>};
-                       Info ->
-                           case proplists:get_value(num_stanzas_in, Info) of
-                               undefined ->
-                                   {error, <<"Previous session timed out">>};
-                               H ->
-                                   {error, <<"Previous session timed out">>, H}
-                           end
+                       {ok, H} ->
+                           {error, <<"Previous session timed out">>, H}
                    end;
                OldPID ->
                    OldSID = {Time, OldPID},
@@ -750,6 +741,32 @@ need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) ->
 need_to_enqueue(State, _) ->
     {false, State}.
 
+%%%===================================================================
+%%% Cache-like storage for last handled stanzas
+%%%===================================================================
+init_cache(Opts) ->
+    ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)).
+
+cache_opts(Opts) ->
+    [{max_size, gen_mod:get_opt(cache_size, Opts)},
+     {life_time, infinity}].
+
+-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean().
+store_stanzas_in(LJID, Time, Num) ->
+    ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num,
+                    ejabberd_cluster:get_nodes()).
+
+-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error.
+pop_stanzas_in(LJID, Time) ->
+    case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of
+       {ok, Val} ->
+           ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time},
+                            ejabberd_cluster:get_nodes()),
+           {ok, Val};
+       error ->
+           error
+    end.
+
 %%%===================================================================
 %%% Configuration processing
 %%%===================================================================
@@ -796,6 +813,11 @@ mod_opt_type(resend_on_timeout) ->
     fun(B) when is_boolean(B) -> B;
        (if_offline) -> if_offline
     end;
+mod_opt_type(cache_size) ->
+    fun(I) when is_integer(I), I>0 -> I;
+       (unlimited) -> infinity;
+       (infinity) -> infinity
+    end;
 mod_opt_type(queue_type) ->
     fun(ram) -> ram; (file) -> file end.
 
@@ -804,5 +826,6 @@ mod_options(Host) ->
      {resume_timeout, 300},
      {max_resume_timeout, undefined},
      {ack_timeout, 60},
+     {cache_size, ejabberd_config:cache_size(Host)},
      {resend_on_timeout, false},
      {queue_type, ejabberd_config:default_queue_type(Host)}].