-record(session, {sid, usr, us, priority, info}).
-record(session_counter, {vhost, count}).
--type sid() :: {erlang:timestamp(), pid()}.
+-type sid() :: {erlang:timestamp(), pid()} | {erlang:timestamp(), undefined}.
-type ip() :: {inet:ip_address(), inet:port_number()} | undefined.
-type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()}
- | {oor, boolean()} | {auth_module, atom()}].
+ | {oor, boolean()} | {auth_module, atom()}
+ | {num_stanzas_in, non_neg_integer()}].
-type prio() :: undefined | integer().
-endif.
(Xmlns == ?NS_STREAM_MGMT_2) or
(Xmlns == ?NS_STREAM_MGMT_3)).
--define(MGMT_FAILED(Condition, Xmlns),
+-define(MGMT_FAILED(Condition, Attrs),
#xmlel{name = <<"failed">>,
- attrs = [{<<"xmlns">>, Xmlns}],
+ attrs = Attrs,
children = [#xmlel{name = Condition,
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
children = []}]}).
-define(MGMT_BAD_REQUEST(Xmlns),
- ?MGMT_FAILED(<<"bad-request">>, Xmlns)).
-
--define(MGMT_ITEM_NOT_FOUND(Xmlns),
- ?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
+ ?MGMT_FAILED(<<"bad-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
- ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
+ ?MGMT_FAILED(<<"service-unavailable">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
- ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
+ ?MGMT_FAILED(<<"unexpected-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
- ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
+ ?MGMT_FAILED(<<"unsupported-version">>, [{<<"xmlns">>, Xmlns}])).
+
+-define(MGMT_ITEM_NOT_FOUND(Xmlns),
+ ?MGMT_FAILED(<<"item-not-found">>, [{<<"xmlns">>, Xmlns}])).
+
+-define(MGMT_ITEM_NOT_FOUND_H(Xmlns, NumStanzasIn),
+ ?MGMT_FAILED(<<"item-not-found">>,
+ [{<<"xmlns">>, Xmlns},
+ {<<"h">>, jlib:integer_to_binary(NumStanzasIn)}])).
%%%----------------------------------------------------------------------
%%% API
wait_for_resume(timeout, StateData) ->
?DEBUG("Timed out waiting for resumption of stream for ~s",
[jid:to_string(StateData#state.jid)]),
- {stop, normal, StateData};
+ {stop, normal, StateData#state{mgmt_state = timeout}};
wait_for_resume(Event, StateData) ->
?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
fsm_next_state(wait_for_resume, StateData).
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet)
end,
+ case StateData#state.mgmt_state of
+ timeout ->
+ Info = [{num_stanzas_in,
+ StateData#state.mgmt_stanzas_in}],
+ ejabberd_sm:set_offline_info(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ Info);
+ _ ->
+ ok
+ end,
handle_unacked_stanzas(StateData)
end,
bounce_messages();
case inherit_session_state(StateData, PrevID) of
{ok, InheritedState} ->
{ok, InheritedState, H};
+ {error, Err, InH} ->
+ {error, ?MGMT_ITEM_NOT_FOUND_H(Xmlns, InH), Err};
{error, Err} ->
{error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
end;
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
- {error, <<"Previous session PID not found">>};
+ case ejabberd_sm:get_offline_info(Time, U, S, R) of
+ none ->
+ {error, <<"Previous session PID not found">>};
+ Info ->
+ case proplists:get_value(num_stanzas_in, Info) of
+ undefined ->
+ {error, <<"Previous session timed out">>};
+ H ->
+ {error, <<"Previous session timed out">>, H}
+ end
+ end;
OldPID ->
OldSID = {Time, OldPID},
case catch resume_session(OldSID) of
set_presence/7,
unset_presence/6,
close_session_unset_presence/5,
+ set_offline_info/5,
+ get_offline_info/4,
dirty_get_sessions_list/0,
dirty_get_my_sessions_list/0,
get_vh_session_list/1,
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Ss = Mod:get_sessions(LUser, LServer),
+ Ss = online(Mod:get_sessions(LUser, LServer)),
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
Mod = get_sm_backend(LServer),
- Ss = Mod:get_sessions(LUser, LServer),
+ Ss = online(Mod:get_sessions(LUser, LServer)),
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer, LResource) of
+ case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
undefined;
Ss ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer, LResource) of
+ case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
offline;
Ss ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer, LResource) of
+ case online(Mod:get_sessions(LUser, LServer, LResource)) of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
+-spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
+
+set_offline_info({Time, _Pid}, User, Server, Resource, Info) ->
+ SID = {Time, undefined},
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
+ LResource = jid:resourceprep(Resource),
+ set_session(SID, LUser, LServer, LResource, undefined, Info).
+
+-spec get_offline_info(erlang:timestamp(), binary(), binary(),
+ binary()) -> none | info().
+
+get_offline_info(Time, User, Server, Resource) ->
+ SID = {Time, undefined},
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
+ LResource = jid:resourceprep(Resource),
+ Mod = get_sm_backend(LServer),
+ case Mod:get_sessions(LUser, LServer, LResource) of
+ [#session{sid = SID, info = Info}] ->
+ Info;
+ _ ->
+ none
+ end.
+
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S#session.usr || S <- Mod:get_sessions()]
+ [S#session.usr || S <- online(Mod:get_sessions())]
end, get_sm_backends()).
-spec dirty_get_my_sessions_list() -> [#session{}].
dirty_get_my_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S || S <- Mod:get_sessions(),
+ [S || S <- online(Mod:get_sessions()),
node(element(2, S#session.sid)) == node()]
end, get_sm_backends()).
get_vh_session_list(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- [S#session.usr || S <- Mod:get_sessions(LServer)].
+ [S#session.usr || S <- online(Mod:get_sessions(LServer))].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
lists:flatmap(
fun(Mod) ->
- [element(2, S#session.sid) || S <- Mod:get_sessions()]
+ [element(2, S#session.sid) || S <- online(Mod:get_sessions())]
end, get_sm_backends()).
-spec get_vh_session_number(binary()) -> non_neg_integer().
get_vh_session_number(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- length(Mod:get_sessions(LServer)).
+ length(online(Mod:get_sessions(LServer))).
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_sm ! {register_iq_handler, Host, XMLNS, Module, Fun}.
Mod:set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}).
+-spec online([#session{}]) -> [#session{}].
+
+online(Sessions) ->
+ lists:filter(fun(#session{sid = {_, undefined}}) ->
+ false;
+ (_) ->
+ true
+ end, Sessions).
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_route(From, To, {broadcast, _} = Packet) ->
_ ->
{U, S, R} = jid:tolower(To),
Mod = get_sm_backend(S),
- case Mod:get_sessions(U, S, R) of
+ case online(Mod:get_sessions(U, S, R)) of
[] ->
?DEBUG("packet dropped~n", []);
Ss ->
_ -> ok
end;
_ ->
- Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer, LResource) of
+ Mod = get_sm_backend(LServer),
+ case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
case Name of
<<"message">> ->
(P >= 0) and (Type == headline) ->
LResource = jid:resourceprep(R),
Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer,
- LResource) of
+ case online(Mod:get_sessions(LUser, LServer,
+ LResource)) of
[] ->
ok; % Race condition
Ss ->
if SIDs == [] -> ok;
true ->
MaxSID = lists:max(SIDs),
- lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
+ lists:foreach(fun ({_, undefined} = S) ->
+ Mod = get_sm_backend(LServer),
+ Mod:delete_session(LUser, LServer, LResource,
+ S);
+ ({_, Pid} = S) when S /= MaxSID ->
Pid ! replaced;
(_) -> ok
end,
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- [S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)].
+ [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer, LResource))].
check_max_sessions(LUser, LServer) ->
Mod = get_sm_backend(LServer),
- SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)],
+ SIDs = [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer))],
MaxSessions = get_max_user_sessions(LUser, LServer),
if length(SIDs) =< MaxSessions -> ok;
true -> {_, Pid} = lists:min(SIDs), Pid ! replaced
force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(LServer),
- Ss = Mod:get_sessions(LUser, LServer),
+ Ss = online(Mod:get_sessions(LUser, LServer)),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
Pid ! {force_update_presence, LUser, LServer}
end,
auth(connect(Config));
sm_resume ->
auth(connect(Config));
+ sm_resume_failed ->
+ auth(connect(Config));
test_open_session ->
bind(auth(connect(Config)));
_ when IsMaster or IsSlave ->
stats,
sm,
sm_resume,
+ sm_resume_failed,
disco]},
{test_proxy65, [parallel],
[proxy65_master, proxy65_slave]}].
?recv1(#message{from = ServerJID, to = MyJID, body = [Txt]}),
?recv1(#sm_r{}),
send(Config, #sm_a{h = 1, xmlns = ?NS_STREAM_MGMT_3}),
+ %% Send another stanza to increment the server's 'h' for sm_resume_failed.
+ send(Config, #presence{to = ServerJID}),
+ close_socket(Config),
+ {save_config, set_opt(sm_previd, ID, Config)}.
+
+sm_resume_failed(Config) ->
+ {sm_resume, SMConfig} = ?config(saved_config, Config),
+ ID = ?config(sm_previd, SMConfig),
+ ct:sleep(5000), % Wait for session to time out.
+ send(Config, #sm_resume{previd = ID, h = 1, xmlns = ?NS_STREAM_MGMT_3}),
+ ?recv1(#sm_failed{reason = 'item-not-found', h = 4}),
disconnect(Config).
private(Config) ->
starttls: true
shaper: c2s_shaper
access: c2s
+ resume_timeout: 3
-
port: @@s2s_port@@
module: ejabberd_s2s_in
encode_sm_resumed(Resumed, []);
encode({sm_r, _} = R) -> encode_sm_r(R, []);
encode({sm_a, _, _} = A) -> encode_sm_a(A, []);
-encode({sm_failed, _, _} = Failed) ->
+encode({sm_failed, _, _, _} = Failed) ->
encode_sm_failed(Failed, []);
encode({offline_item, _, _} = Item) ->
encode_offline_item(Item,
pp(sm_resumed, 3) -> [h, previd, xmlns];
pp(sm_r, 1) -> [xmlns];
pp(sm_a, 2) -> [h, xmlns];
-pp(sm_failed, 2) -> [reason, xmlns];
+pp(sm_failed, 3) -> [reason, h, xmlns];
pp(offline_item, 2) -> [node, action];
pp(offline, 3) -> [items, purge, fetch];
pp(mix_join, 2) -> [jid, subscribe];
{xmlel, <<"failed">>, _attrs, _els}) ->
Reason = decode_sm_failed_els(__TopXMLNS, __IgnoreEls,
_els, undefined),
- Xmlns = decode_sm_failed_attrs(__TopXMLNS, _attrs,
- undefined),
- {sm_failed, Reason, Xmlns}.
+ {H, Xmlns} = decode_sm_failed_attrs(__TopXMLNS, _attrs,
+ undefined, undefined),
+ {sm_failed, Reason, H, Xmlns}.
decode_sm_failed_els(__TopXMLNS, __IgnoreEls, [],
Reason) ->
Reason).
decode_sm_failed_attrs(__TopXMLNS,
- [{<<"xmlns">>, _val} | _attrs], _Xmlns) ->
- decode_sm_failed_attrs(__TopXMLNS, _attrs, _val);
-decode_sm_failed_attrs(__TopXMLNS, [_ | _attrs],
+ [{<<"h">>, _val} | _attrs], _H, Xmlns) ->
+ decode_sm_failed_attrs(__TopXMLNS, _attrs, _val, Xmlns);
+decode_sm_failed_attrs(__TopXMLNS,
+ [{<<"xmlns">>, _val} | _attrs], H, _Xmlns) ->
+ decode_sm_failed_attrs(__TopXMLNS, _attrs, H, _val);
+decode_sm_failed_attrs(__TopXMLNS, [_ | _attrs], H,
Xmlns) ->
- decode_sm_failed_attrs(__TopXMLNS, _attrs, Xmlns);
-decode_sm_failed_attrs(__TopXMLNS, [], Xmlns) ->
- decode_sm_failed_attr_xmlns(__TopXMLNS, Xmlns).
+ decode_sm_failed_attrs(__TopXMLNS, _attrs, H, Xmlns);
+decode_sm_failed_attrs(__TopXMLNS, [], H, Xmlns) ->
+ {decode_sm_failed_attr_h(__TopXMLNS, H),
+ decode_sm_failed_attr_xmlns(__TopXMLNS, Xmlns)}.
-encode_sm_failed({sm_failed, Reason, Xmlns},
+encode_sm_failed({sm_failed, Reason, H, Xmlns},
_xmlns_attrs) ->
_els = lists:reverse('encode_sm_failed_$reason'(Reason,
[])),
_attrs = encode_sm_failed_attr_xmlns(Xmlns,
- _xmlns_attrs),
+ encode_sm_failed_attr_h(H,
+ _xmlns_attrs)),
{xmlel, <<"failed">>, _attrs, _els}.
'encode_sm_failed_$reason'(undefined, _acc) -> _acc;
<<"urn:ietf:params:xml:ns:xmpp-stanzas">>}])
| _acc].
+decode_sm_failed_attr_h(__TopXMLNS, undefined) ->
+ undefined;
+decode_sm_failed_attr_h(__TopXMLNS, _val) ->
+ case catch dec_int(_val, 0, infinity) of
+ {'EXIT', _} ->
+ erlang:error({xmpp_codec,
+ {bad_attr_value, <<"h">>, <<"failed">>, __TopXMLNS}});
+ _res -> _res
+ end.
+
+encode_sm_failed_attr_h(undefined, _acc) -> _acc;
+encode_sm_failed_attr_h(_val, _acc) ->
+ [{<<"h">>, enc_int(_val)} | _acc].
+
decode_sm_failed_attr_xmlns(__TopXMLNS, undefined) ->
undefined;
decode_sm_failed_attr_xmlns(__TopXMLNS, _val) -> _val.
-record(sasl_mechanisms, {list = [] :: [binary()]}).
-record(sm_failed, {reason :: atom() | #gone{} | #redirect{},
+ h :: non_neg_integer(),
xmlns :: binary()}).
-record(error, {type :: 'auth' | 'cancel' | 'continue' | 'modify' | 'wait',
-xml(sm_failed,
#elem{name = <<"failed">>,
xmlns = [<<"urn:xmpp:sm:2">>, <<"urn:xmpp:sm:3">>],
- result = {sm_failed, '$reason', '$xmlns'},
- attrs = [#attr{name = <<"xmlns">>}],
+ result = {sm_failed, '$reason', '$h', '$xmlns'},
+ attrs = [#attr{name = <<"h">>,
+ dec = {dec_int, [0, infinity]},
+ enc = {enc_int, []}},
+ #attr{name = <<"xmlns">>}],
refs = [#ref{name = error_bad_request,
min = 0, max = 1, label = '$reason'},
#ref{name = error_conflict,