]> granicus.if.org Git - ejabberd/commitdiff
Make sure queue bouncing doesn't yield into infinite recursion
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Wed, 26 Jun 2019 08:56:25 +0000 (11:56 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Wed, 26 Jun 2019 08:56:25 +0000 (11:56 +0300)
src/ejabberd_c2s.erl
src/ejabberd_s2s_out.erl
src/ejabberd_sm.erl
src/mod_stream_mgmt.erl

index 3f5b3aa0c48b68f5b85fa01afa86e50479e4aaf3..a72117ac2c2440239fe47b36d0b80dbbea401b1e 100644 (file)
@@ -47,7 +47,7 @@
 -export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
         open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
         reply/2, copy_state/2, set_timeout/2, route/2,
-        host_up/1, host_down/1, send_ws_ping/1]).
+        host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]).
 
 -include("xmpp.hrl").
 -include("logger.hrl").
@@ -299,7 +299,7 @@ process_terminated(#{sid := SID, socket := Socket,
                     ejabberd_sm:close_session(SID, U, S, R),
                     State
             end,
-    bounce_message_queue(),
+    bounce_message_queue(SID, JID),
     State1;
 process_terminated(#{socket := Socket,
                     stop_reason := {tls, _}} = State, Reason) ->
@@ -882,13 +882,23 @@ resource_conflict_action(U, S, R) ->
            {accept_resource, Rnew}
     end.
 
--spec bounce_message_queue() -> ok.
-bounce_message_queue() ->
-    receive {route, Pkt} ->
-           ejabberd_router:route(Pkt),
-           bounce_message_queue()
-    after 0 ->
-           ok
+-spec bounce_message_queue(ejabberd_sm:sid(), jid:jid()) -> ok.
+bounce_message_queue(SID, JID) ->
+    {U, S, R} = jid:tolower(JID),
+    SIDs = ejabberd_sm:get_session_sids(U, S, R),
+    case lists:member(SID, SIDs) of
+       true ->
+           ?WARNING_MSG("The session for ~s@~s/~s is supposed to "
+                        "be unregistered, but session identifier ~p "
+                        "still presents in the 'session' table",
+                        [U, S, R]);
+       false ->
+           receive {route, Pkt} ->
+                   ejabberd_router:route(Pkt),
+                   bounce_message_queue(SID, JID)
+           after 0 ->
+                   ok
+           end
     end.
 
 -spec new_uniq_id() -> binary().
index 0741690d6b456b8ec3044162643558d9efab320d..5ee2a6dfbf58cef72c0fc08d73daf4c0901ed355 100644 (file)
@@ -310,7 +310,7 @@ terminate(Reason, #{server := LServer,
                 _ -> State#{stop_reason => internal_failure}
     end,
     State2 = bounce_queue(State1),
-    bounce_message_queue(State2).
+    bounce_message_queue({LServer, RServer}, State2).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -332,13 +332,22 @@ bounce_queue(State) ->
              bounce_packet(Pkt, AccState)
       end, State).
 
--spec bounce_message_queue(state()) -> state().
-bounce_message_queue(State) ->
-    receive {route, Pkt} ->
-           State1 = bounce_packet(Pkt, State),
-           bounce_message_queue(State1)
-    after 0 ->
-           State
+-spec bounce_message_queue({binary(), binary()}, state()) -> state().
+bounce_message_queue({LServer, RServer} = FromTo, State) ->
+    Pids = ejabberd_s2s:get_connections_pids(FromTo),
+    case lists:member(self(), Pids) of
+       true ->
+           ?WARNING_MSG("Outgoing s2s connection ~s -> ~s is supposed "
+                        "to be unregistered, but pid ~p still presents "
+                        "in 's2s' table", [LServer, RServer, self()]),
+           State;
+       false ->
+           receive {route, Pkt} ->
+                   State1 = bounce_packet(Pkt, State),
+                   bounce_message_queue(FromTo, State1)
+           after 0 ->
+                   State
+           end
     end.
 
 -spec bounce_packet(xmpp_element(), state()) -> state().
index b4f7a6e4031a5b6c0806e7c36a9a86754e15a0b5..03218dc3e5eaf3af73995217fa30b26456637e9e 100644 (file)
@@ -63,6 +63,7 @@
         get_session_pid/3,
         get_session_sid/3,
         get_session_sids/2,
+        get_session_sids/3,
         get_user_info/2,
         get_user_info/3,
         set_user_info/5,
@@ -400,6 +401,16 @@ get_session_sids(User, Server) ->
     Sessions = get_sessions(Mod, LUser, LServer),
     [SID || #session{sid = SID} <- Sessions].
 
+-spec get_session_sids(binary(), binary(), binary()) -> [sid()].
+
+get_session_sids(User, Server, Resource) ->
+    LUser = jid:nodeprep(User),
+    LServer = jid:nameprep(Server),
+    LResource = jid:resourceprep(Resource),
+    Mod = get_sm_backend(LServer),
+    Sessions = get_sessions(Mod, LUser, LServer, LResource),
+    [SID || #session{sid = SID} <- Sessions].
+
 -spec dirty_get_sessions_list() -> [ljid()].
 
 dirty_get_sessions_list() ->
index 1777b1216c7eceb50e43cdad0b427a1f7baedf41..b0abf1ffb0bab23d40c65a8ed378620f659e7e8f 100644 (file)
@@ -266,10 +266,10 @@ c2s_closed(#{mgmt_state := active} = State, _Reason) ->
 c2s_closed(State, _Reason) ->
     State.
 
-c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
+c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason) ->
     ?DEBUG("Closing former stream of resumed session for ~s",
           [jid:encode(JID)]),
-    bounce_message_queue(),
+    ejabberd_c2s:bounce_message_queue(SID, JID),
     {stop, State};
 c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
                 sid := {Time, _}, jid := JID} = State, _Reason) ->
@@ -705,15 +705,6 @@ cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
 cancel_ack_timer(State) ->
     State.
 
--spec bounce_message_queue() -> ok.
-bounce_message_queue() ->
-    receive {route, Pkt} ->
-           ejabberd_router:route(Pkt),
-           bounce_message_queue()
-    after 0 ->
-           ok
-    end.
-
 -spec need_to_enqueue(state(), xmlel() | stanza()) -> {boolean(), state()}.
 need_to_enqueue(State, Pkt) when ?is_stanza(Pkt) ->
     {not xmpp:get_meta(Pkt, mgmt_is_resent, false), State};