set_presence/7,
unset_presence/6,
close_session_unset_presence/5,
- set_offline_info/5,
- get_offline_info/4,
dirty_get_sessions_list/0,
dirty_get_my_sessions_list/0,
get_vh_session_list/1,
host_down/1,
make_sid/0,
clean_cache/1,
- config_reloaded/0,
- is_online/1
+ config_reloaded/0
]).
-export([init/1, handle_call/3, handle_cast/2,
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Ss = online(get_sessions(Mod, LUser, LServer)),
+ Ss = get_sessions(Mod, LUser, LServer),
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
Mod = get_sm_backend(LServer),
- Ss = online(get_sessions(Mod, LUser, LServer)),
+ Ss = get_sessions(Mod, LUser, LServer),
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(get_sessions(Mod, LUser, LServer, LResource)) of
+ case get_sessions(Mod, LUser, LServer, LResource) of
[] ->
undefined;
Ss ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Ss = online(get_sessions(Mod, LUser, LServer)),
+ Ss = get_sessions(Mod, LUser, LServer),
[{LResource, [{node, node(Pid)}, {ts, Ts}, {pid, Pid},
{priority, Priority} | Info]}
|| #session{usr = {_, _, LResource},
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(get_sessions(Mod, LUser, LServer, LResource)) of
+ case get_sessions(Mod, LUser, LServer, LResource) of
[] ->
offline;
Ss ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(get_sessions(Mod, LUser, LServer, LResource)) of
+ case get_sessions(Mod, LUser, LServer, LResource) of
[] ->
none;
Ss ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Sessions = online(get_sessions(Mod, LUser, LServer)),
+ Sessions = get_sessions(Mod, LUser, LServer),
[SID || #session{sid = SID} <- Sessions].
--spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
-
-set_offline_info(SID, User, Server, Resource, Info) ->
- LUser = jid:nodeprep(User),
- LServer = jid:nameprep(Server),
- LResource = jid:resourceprep(Resource),
- set_session(SID, LUser, LServer, LResource, undefined, [offline | Info]).
-
--spec get_offline_info(erlang:timestamp(), binary(), binary(),
- binary()) -> none | info().
-
-get_offline_info(Time, User, Server, Resource) ->
- LUser = jid:nodeprep(User),
- LServer = jid:nameprep(Server),
- LResource = jid:resourceprep(Resource),
- Mod = get_sm_backend(LServer),
- case get_sessions(Mod, LUser, LServer, LResource) of
- [#session{sid = {Time, _}, info = Info}] ->
- case proplists:get_bool(offline, Info) of
- true ->
- Info;
- false ->
- none
- end;
- _ ->
- none
- end.
-
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S#session.usr || S <- online(get_sessions(Mod))]
+ [S#session.usr || S <- get_sessions(Mod)]
end, get_sm_backends()).
-spec dirty_get_my_sessions_list() -> [#session{}].
dirty_get_my_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S || S <- online(get_sessions(Mod)),
+ [S || S <- get_sessions(Mod),
node(element(2, S#session.sid)) == node()]
end, get_sm_backends()).
get_vh_session_list(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- [S#session.usr || S <- online(get_sessions(Mod, LServer))].
+ [S#session.usr || S <- get_sessions(Mod, LServer)].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
lists:flatmap(
fun(Mod) ->
- [element(2, S#session.sid) || S <- online(get_sessions(Mod))]
+ [element(2, S#session.sid) || S <- get_sessions(Mod)]
end, get_sm_backends()).
-spec get_vh_session_number(binary()) -> non_neg_integer().
get_vh_session_number(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- length(online(get_sessions(Mod, LServer))).
+ length(get_sessions(Mod, LServer)).
%% Why the hell do we have so many similar kicks?
c2s_handle_info(#{lang := Lang} = State, replaced) ->
ok
end.
--spec online([#session{}]) -> [#session{}].
-
-online(Sessions) ->
- lists:filter(fun is_online/1, Sessions).
-
--spec is_online(#session{}) -> boolean().
-
-is_online(#session{info = Info}) ->
- not proplists:get_bool(offline, Info).
-
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec do_route(jid(), term()) -> any().
do_route(#jid{lresource = <<"">>} = To, Term) ->
?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]),
{U, S, R} = jid:tolower(To),
Mod = get_sm_backend(S),
- case online(get_sessions(Mod, U, S, R)) of
+ case get_sessions(Mod, U, S, R) of
[] ->
?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]);
Ss ->
ejabberd_c2s:route(Pid, {route, Packet1});
(_) ->
ok
- end, online(get_sessions(Mod, LUser, LServer)));
+ end, get_sessions(Mod, LUser, LServer));
false ->
ok
end;
To = xmpp:get_to(Packet),
{LUser, LServer, LResource} = jid:tolower(To),
Mod = get_sm_backend(LServer),
- case online(get_sessions(Mod, LUser, LServer, LResource)) of
+ case get_sessions(Mod, LUser, LServer, LResource) of
[] ->
case Packet of
#message{type = T} when T == chat; T == normal ->
(P >= 0) and (Type == headline) ->
LResource = jid:resourceprep(R),
Mod = get_sm_backend(LServer),
- case online(get_sessions(Mod, LUser, LServer,
- LResource)) of
+ case get_sessions(Mod, LUser, LServer,
+ LResource) of
[] ->
ok; % Race condition
Ss ->
check_existing_resources(LUser, LServer, LResource) ->
Mod = get_sm_backend(LServer),
Ss = get_sessions(Mod, LUser, LServer, LResource),
- {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
- lists:foreach(fun(S) ->
- delete_session(Mod, S)
- end, OfflineSs),
- if OnlineSs == [] -> ok;
+ if Ss == [] -> ok;
true ->
- SIDs = [SID || #session{sid = SID} <- OnlineSs],
+ SIDs = [SID || #session{sid = SID} <- Ss],
MaxSID = lists:max(SIDs),
lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
ejabberd_c2s:route(Pid, replaced);
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))].
+ [S#session.sid || S <- get_sessions(Mod, LUser, LServer, LResource)].
-spec check_max_sessions(binary(), binary()) -> ok | replaced.
check_max_sessions(LUser, LServer) ->
Mod = get_sm_backend(LServer),
Ss = get_sessions(Mod, LUser, LServer),
- {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
MaxSessions = get_max_user_sessions(LUser, LServer),
- if length(OnlineSs) =< MaxSessions -> ok;
+ if length(Ss) =< MaxSessions -> ok;
true ->
- #session{sid = {_, Pid}} = lists:min(OnlineSs),
+ #session{sid = {_, Pid}} = lists:min(Ss),
ejabberd_c2s:route(Pid, replaced)
- end,
- if length(OfflineSs) =< MaxSessions -> ok;
- true ->
- delete_session(Mod, lists:min(OfflineSs))
end.
%% Get the user_max_session setting
force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(LServer),
- Ss = online(get_sessions(Mod, LUser, LServer)),
+ Ss = get_sessions(Mod, LUser, LServer),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
ejabberd_c2s:resend_presence(Pid)
end,
-include("logger.hrl").
-include("p1_queue.hrl").
+-define(STREAM_MGMT_CACHE, stream_mgmt_cache).
+
-define(is_sm_packet(Pkt),
is_record(Pkt, sm_enable) or
is_record(Pkt, sm_resume) or
%%%===================================================================
%%% API
%%%===================================================================
-start(Host, _Opts) ->
+start(Host, Opts) ->
+ init_cache(Opts),
ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50),
ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE,
c2s_stream_started, 50),
[jid:encode(JID)]),
bounce_message_queue(),
{stop, State};
-c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID,
- user := U, server := S, resource := R} = State, Reason) ->
- Result = case MgmtState of
- timeout ->
- Info = [{num_stanzas_in, In}],
- %% TODO: Usually, ejabberd_c2s:process_terminated/2 is
- %% called later in the hook chain. We swap the order so
- %% that the offline info won't be purged after we stored
- %% it. This should be fixed in a proper way.
- State1 = ejabberd_c2s:process_terminated(State, Reason),
- ejabberd_sm:set_offline_info(SID, U, S, R, Info),
- {stop, State1};
- _ ->
- State
- end,
+c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
+ sid := {Time, _}, jid := JID} = State, _Reason) ->
+ case MgmtState of
+ timeout ->
+ store_stanzas_in(jid:tolower(JID), Time, In);
+ _ ->
+ ok
+ end,
route_unacked_stanzas(State),
- Result;
+ State;
c2s_terminated(State, _Reason) ->
State.
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
- case ejabberd_sm:get_offline_info(Time, U, S, R) of
- none ->
+ case pop_stanzas_in({U, S, R}, Time) of
+ error ->
{error, <<"Previous session PID not found">>};
- Info ->
- case proplists:get_value(num_stanzas_in, Info) of
- undefined ->
- {error, <<"Previous session timed out">>};
- H ->
- {error, <<"Previous session timed out">>, H}
- end
+ {ok, H} ->
+ {error, <<"Previous session timed out">>, H}
end;
OldPID ->
OldSID = {Time, OldPID},
need_to_enqueue(State, _) ->
{false, State}.
+%%%===================================================================
+%%% Cache-like storage for last handled stanzas
+%%%===================================================================
+init_cache(Opts) ->
+ ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)).
+
+cache_opts(Opts) ->
+ [{max_size, gen_mod:get_opt(cache_size, Opts)},
+ {life_time, infinity}].
+
+-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean().
+store_stanzas_in(LJID, Time, Num) ->
+ ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num,
+ ejabberd_cluster:get_nodes()).
+
+-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error.
+pop_stanzas_in(LJID, Time) ->
+ case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of
+ {ok, Val} ->
+ ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time},
+ ejabberd_cluster:get_nodes()),
+ {ok, Val};
+ error ->
+ error
+ end.
+
%%%===================================================================
%%% Configuration processing
%%%===================================================================
fun(B) when is_boolean(B) -> B;
(if_offline) -> if_offline
end;
+mod_opt_type(cache_size) ->
+ fun(I) when is_integer(I), I>0 -> I;
+ (unlimited) -> infinity;
+ (infinity) -> infinity
+ end;
mod_opt_type(queue_type) ->
fun(ram) -> ram; (file) -> file end.
{resume_timeout, 300},
{max_resume_timeout, undefined},
{ack_timeout, 60},
+ {cache_size, ejabberd_config:cache_size(Host)},
{resend_on_timeout, false},
{queue_type, ejabberd_config:default_queue_type(Host)}].