-export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
reply/2, copy_state/2, set_timeout/2, route/2,
- host_up/1, host_down/1, send_ws_ping/1]).
+ host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]).
-include("xmpp.hrl").
-include("logger.hrl").
ejabberd_sm:close_session(SID, U, S, R),
State
end,
- bounce_message_queue(),
+ bounce_message_queue(SID, JID),
State1;
process_terminated(#{socket := Socket,
stop_reason := {tls, _}} = State, Reason) ->
{accept_resource, Rnew}
end.
--spec bounce_message_queue() -> ok.
-bounce_message_queue() ->
- receive {route, Pkt} ->
- ejabberd_router:route(Pkt),
- bounce_message_queue()
- after 0 ->
- ok
+-spec bounce_message_queue(ejabberd_sm:sid(), jid:jid()) -> ok.
+bounce_message_queue(SID, JID) ->
+ {U, S, R} = jid:tolower(JID),
+ SIDs = ejabberd_sm:get_session_sids(U, S, R),
+ case lists:member(SID, SIDs) of
+ true ->
+ ?WARNING_MSG("The session for ~s@~s/~s is supposed to "
+ "be unregistered, but session identifier ~p "
+ "still presents in the 'session' table",
+ [U, S, R]);
+ false ->
+ receive {route, Pkt} ->
+ ejabberd_router:route(Pkt),
+ bounce_message_queue(SID, JID)
+ after 0 ->
+ ok
+ end
end.
-spec new_uniq_id() -> binary().
_ -> State#{stop_reason => internal_failure}
end,
State2 = bounce_queue(State1),
- bounce_message_queue(State2).
+ bounce_message_queue({LServer, RServer}, State2).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
bounce_packet(Pkt, AccState)
end, State).
--spec bounce_message_queue(state()) -> state().
-bounce_message_queue(State) ->
- receive {route, Pkt} ->
- State1 = bounce_packet(Pkt, State),
- bounce_message_queue(State1)
- after 0 ->
- State
+-spec bounce_message_queue({binary(), binary()}, state()) -> state().
+bounce_message_queue({LServer, RServer} = FromTo, State) ->
+ Pids = ejabberd_s2s:get_connections_pids(FromTo),
+ case lists:member(self(), Pids) of
+ true ->
+ ?WARNING_MSG("Outgoing s2s connection ~s -> ~s is supposed "
+ "to be unregistered, but pid ~p still presents "
+ "in 's2s' table", [LServer, RServer, self()]),
+ State;
+ false ->
+ receive {route, Pkt} ->
+ State1 = bounce_packet(Pkt, State),
+ bounce_message_queue(FromTo, State1)
+ after 0 ->
+ State
+ end
end.
-spec bounce_packet(xmpp_element(), state()) -> state().
get_session_pid/3,
get_session_sid/3,
get_session_sids/2,
+ get_session_sids/3,
get_user_info/2,
get_user_info/3,
set_user_info/5,
Sessions = get_sessions(Mod, LUser, LServer),
[SID || #session{sid = SID} <- Sessions].
+-spec get_session_sids(binary(), binary(), binary()) -> [sid()].
+
+get_session_sids(User, Server, Resource) ->
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
+ LResource = jid:resourceprep(Resource),
+ Mod = get_sm_backend(LServer),
+ Sessions = get_sessions(Mod, LUser, LServer, LResource),
+ [SID || #session{sid = SID} <- Sessions].
+
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
c2s_closed(State, _Reason) ->
State.
-c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
+c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason) ->
?DEBUG("Closing former stream of resumed session for ~s",
[jid:encode(JID)]),
- bounce_message_queue(),
+ ejabberd_c2s:bounce_message_queue(SID, JID),
{stop, State};
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
sid := {Time, _}, jid := JID} = State, _Reason) ->
cancel_ack_timer(State) ->
State.
--spec bounce_message_queue() -> ok.
-bounce_message_queue() ->
- receive {route, Pkt} ->
- ejabberd_router:route(Pkt),
- bounce_message_queue()
- after 0 ->
- ok
- end.
-
-spec need_to_enqueue(state(), xmlel() | stanza()) -> {boolean(), state()}.
need_to_enqueue(State, Pkt) when ?is_stanza(Pkt) ->
{not xmpp:get_meta(Pkt, mgmt_is_resent, false), State};