auth_module = unknown,
ip,
aux_fields = [],
- sm_state,
- sm_xmlns,
- ack_queue,
- max_ack_queue,
- pending_since,
- resume_timeout,
- resend_on_timeout,
- n_stanzas_in = 0,
- n_stanzas_out = 0,
+ mgmt_state,
+ mgmt_xmlns,
+ mgmt_queue,
+ mgmt_max_queue,
+ mgmt_pending_since,
+ mgmt_timeout,
+ mgmt_resend,
+ mgmt_stanzas_in = 0,
+ mgmt_stanzas_out = 0,
lang}).
%-define(DBGFSM, true).
Name == <<"a">>;
Name == <<"r">>).
--define(IS_SUPPORTED_SM_XMLNS(Xmlns),
+-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns),
Xmlns == ?NS_STREAM_MGMT_2;
Xmlns == ?NS_STREAM_MGMT_3).
--define(SM_FAILED(Condition, Xmlns),
+-define(MGMT_FAILED(Condition, Xmlns),
#xmlel{name = <<"failed">>,
attrs = [{<<"xmlns">>, Xmlns}],
children = [#xmlel{name = Condition,
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
children = []}]}).
--define(SM_BAD_REQUEST(Xmlns),
- ?SM_FAILED(<<"bad-request">>, Xmlns)).
+-define(MGMT_BAD_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"bad-request">>, Xmlns)).
--define(SM_ITEM_NOT_FOUND(Xmlns),
- ?SM_FAILED(<<"item-not-found">>, Xmlns)).
+-define(MGMT_ITEM_NOT_FOUND(Xmlns),
+ ?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
--define(SM_SERVICE_UNAVAILABLE(Xmlns),
- ?SM_FAILED(<<"service-unavailable">>, Xmlns)).
+-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
--define(SM_UNEXPECTED_REQUEST(Xmlns),
- ?SM_FAILED(<<"unexpected-request">>, Xmlns)).
+-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
+ ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
--define(SM_UNSUPPORTED_VERSION(Xmlns),
- ?SM_FAILED(<<"unsupported-version">>, Xmlns)).
+-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
+ ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
tls_enabled = TLSEnabled, tls_options = TLSOpts,
sid = {now(), self()}, streamid = new_id(),
access = Access, shaper = Shaper, ip = IP,
- sm_state = StreamMgmtState,
- max_ack_queue = MaxAckQueue,
- resume_timeout = ResumeTimeout,
- resend_on_timeout = ResendOnTimeout},
+ mgmt_state = StreamMgmtState,
+ mgmt_max_queue = MaxAckQueue,
+ mgmt_timeout = ResumeTimeout,
+ mgmt_resend = ResendOnTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
-wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El},
+ StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
- fsm_next_state(wait_for_feature_request, dispatch_stream_mgmt(El, StateData));
+ fsm_next_state(wait_for_feature_request,
+ dispatch_stream_mgmt(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
send_trailer(StateData),
{stop, normal, StateData};
session_established(closed, StateData)
- when StateData#state.resume_timeout > 0,
- StateData#state.sm_state == active orelse
- StateData#state.sm_state == pending ->
+ when StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
log_pending_state(StateData),
- fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
+ fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending});
session_established(closed, StateData) ->
{stop, normal, StateData}.
StateData#state.user,
StateData#state.server,
StateData#state.resource),
- {stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}};
+ {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}};
handle_sync_event(_Event, _From, StateName,
StateData) ->
Reply = ok, fsm_reply(Reply, StateName, StateData).
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
_StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
- if StateData#state.resume_timeout > 0,
- StateData#state.sm_state == active orelse
- StateData#state.sm_state == pending ->
+ if StateData#state.mgmt_timeout > 0,
+ StateData#state.mgmt_state == active orelse
+ StateData#state.mgmt_state == pending ->
log_pending_state(StateData),
- fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
+ fsm_next_state(wait_for_resume,
+ StateData#state{mgmt_state = pending});
true ->
{stop, normal, StateData}
end;
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, StateName, StateData) ->
- case StateData#state.sm_state of
+ case StateData#state.mgmt_state of
resumed ->
?INFO_MSG("Closing former stream of resumed session for ~s",
[jlib:jid_to_string(StateData#state.jid)]);
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
-send_text(StateData, Text) when StateData#state.sm_state == pending ->
+send_text(StateData, Text) when StateData#state.mgmt_state == pending ->
?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
send_text(StateData, Text) when StateData#state.xml_socket ->
?DEBUG("Send Text on stream = ~p", [Text]),
?DEBUG("Send XML on stream = ~p", [Text]),
(StateData#state.sockmod):send(StateData#state.socket, Text).
-send_element(StateData, El) when StateData#state.sm_state == pending ->
+send_element(StateData, El) when StateData#state.mgmt_state == pending ->
?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, El) when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
-send_stanza(StateData, Stanza) when StateData#state.sm_state == pending ->
- ack_queue_add(StateData, Stanza);
-send_stanza(StateData, Stanza) when StateData#state.sm_state == active ->
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
+ mgmt_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
send_stanza_and_ack_req(StateData, Stanza),
- ack_queue_add(StateData, Stanza);
+ mgmt_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) ->
send_element(StateData, Stanza),
StateData.
send_text(StateData, iolist_to_binary(Header)).
send_trailer(StateData)
- when StateData#state.sm_state == pending ->
+ when StateData#state.mgmt_state == pending ->
?DEBUG("Cannot send stream trailer while waiting for resumption", []);
send_trailer(StateData)
when StateData#state.xml_socket ->
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
-fsm_next_state(wait_for_resume, #state{pending_since = undefined} =
+fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} =
StateData) ->
{next_state, wait_for_resume,
- StateData#state{pending_since = os:timestamp()},
- StateData#state.resume_timeout};
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
fsm_next_state(wait_for_resume, StateData) ->
- Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
- Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
{next_state, wait_for_resume, StateData, Timeout};
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
-fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} =
+fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} =
StateData) ->
{reply, Reply, wait_for_resume,
- StateData#state{pending_since = os:timestamp()},
- StateData#state.resume_timeout};
+ StateData#state{mgmt_pending_since = os:timestamp()},
+ StateData#state.mgmt_timeout};
fsm_reply(Reply, wait_for_resume, StateData) ->
- Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
- Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
+ Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
+ Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
{reply, Reply, wait_for_resume, StateData, Timeout};
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
%%% XEP-0198
%%%----------------------------------------------------------------------
-stream_mgmt_enabled(#state{sm_state = disabled}) ->
+stream_mgmt_enabled(#state{mgmt_state = disabled}) ->
false;
stream_mgmt_enabled(_StateData) ->
true.
-dispatch_stream_mgmt(El, StateData) when StateData#state.sm_state == active;
- StateData#state.sm_state == pending ->
+dispatch_stream_mgmt(El, StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
perform_stream_mgmt(El, StateData);
dispatch_stream_mgmt(El, StateData) ->
negotiate_stream_mgmt(El, StateData).
%% Binding unless it is resuming a previous session". However, it also
%% says: "Stream management errors SHOULD be considered recoverable", so we
%% won't bail out.
- send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
+ send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
StateData;
negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
case xml:get_attr_s(<<"xmlns">>, Attrs) of
- Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
case stream_mgmt_enabled(StateData) of
true ->
case Name of
<<"enable">> ->
- handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs);
+ handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs);
_ ->
Res = if Name == <<"a">>;
Name == <<"r">>;
Name == <<"resume">> ->
- ?SM_UNEXPECTED_REQUEST(Xmlns);
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
true ->
- ?SM_BAD_REQUEST(Xmlns)
+ ?MGMT_BAD_REQUEST(Xmlns)
end,
send_element(StateData, Res),
StateData
end;
false ->
- send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)),
+ send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)),
StateData
end;
_ ->
- send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
+ send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
StateData
end.
perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
case xml:get_attr_s(<<"xmlns">>, Attrs) of
- Xmlns when Xmlns == StateData#state.sm_xmlns ->
+ Xmlns when Xmlns == StateData#state.mgmt_xmlns ->
case Name of
<<"r">> ->
handle_r(StateData);
_ ->
Res = if Name == <<"enable">>;
Name == <<"resume">> ->
- ?SM_UNEXPECTED_REQUEST(Xmlns);
+ ?MGMT_UNEXPECTED_REQUEST(Xmlns);
true ->
- ?SM_BAD_REQUEST(Xmlns)
+ ?MGMT_BAD_REQUEST(Xmlns)
end,
send_element(StateData, Res),
StateData
end;
_ ->
send_element(StateData,
- ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)),
+ ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)),
StateData
end.
-handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
+handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) ->
Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
ResumeAttr when ResumeAttr == <<"true">>;
ResumeAttr == <<"1">> ->
_ ->
0
end,
- ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++
+ ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++
if Timeout > 0 ->
?INFO_MSG("Stream management with resumption enabled for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
attrs = ResAttrs,
children = []},
send_element(StateData, Res),
- StateData#state{sm_state = active,
- ack_queue = queue:new(),
- resume_timeout = Timeout * 1000}.
+ StateData#state{mgmt_state = active,
+ mgmt_queue = queue:new(),
+ mgmt_timeout = Timeout * 1000}.
handle_r(StateData) ->
- H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
+ H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in),
Res = #xmlel{name = <<"a">>,
- attrs = [{<<"xmlns">>, StateData#state.sm_xmlns},
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns},
{<<"h">>, H}],
children = []},
send_element(StateData, Res),
StateData.
-handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
+handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData,
+ Attrs) ->
case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
H when is_integer(H), H >= 0 ->
?DEBUG("~s acknowledged ~B of ~B stanzas",
[jlib:jid_to_string(JID), H, NumStanzasOut]),
- ack_queue_drop(StateData, H);
+ mgmt_queue_drop(StateData, H);
_ ->
?WARNING_MSG("Ignoring invalid ACK element from ~s",
[jlib:jid_to_string(JID)]),
handle_resume(StateData, Attrs) ->
R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
- Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
+ Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
case stream_mgmt_enabled(StateData) of
true ->
case {xml:get_attr(<<"previd">>, Attrs),
{ok, InheritedState} ->
{ok, InheritedState, H};
{error, Err} ->
- {error, ?SM_ITEM_NOT_FOUND(Xmlns), Err}
+ {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
end;
_ ->
- {error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>}
+ {error, ?MGMT_BAD_REQUEST(Xmlns),
+ <<"Invalid request">>}
end;
false ->
- {error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>}
+ {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns),
+ <<"XEP-0198 disabled">>}
end;
_ ->
- {error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>}
+ {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3),
+ <<"Invalid XMLNS">>}
end,
case R of
{ok, ResumedState, NumHandled} ->
- NewState = ack_queue_drop(ResumedState, NumHandled),
- AttrXmlns = NewState#state.sm_xmlns,
+ NewState = mgmt_queue_drop(ResumedState, NumHandled),
+ AttrXmlns = NewState#state.mgmt_xmlns,
AttrId = make_resume_id(NewState),
- AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in),
+ AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in),
send_element(NewState,
#xmlel{name = <<"resumed">>,
attrs = [{<<"xmlns">>, AttrXmlns},
error
end.
-update_num_stanzas_in(#state{sm_state = active} = StateData, El) ->
- NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
+update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) ->
+ NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of
{true, 4294967295} ->
0;
{true, Num} ->
{false, Num} ->
Num
end,
- StateData#state{n_stanzas_in = NewNum};
+ StateData#state{mgmt_stanzas_in = NewNum};
update_num_stanzas_in(StateData, _El) ->
StateData.
send_stanza_and_ack_req(StateData, Stanza) ->
AckReq = #xmlel{name = <<"r">>,
- attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
children = []},
StanzaS = xml:element_to_binary(Stanza),
AckReqS = xml:element_to_binary(AckReq),
send_text(StateData, [StanzaS, AckReqS]).
-ack_queue_add(StateData, El) ->
- NewNum = case StateData#state.n_stanzas_out of
+mgmt_queue_add(StateData, El) ->
+ NewNum = case StateData#state.mgmt_stanzas_out of
4294967295 ->
0;
Num ->
Num + 1
end,
NewState = limit_queue_length(StateData),
- NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue),
- NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}.
+ NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue),
+ NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}.
-ack_queue_drop(StateData, NumHandled) ->
+mgmt_queue_drop(StateData, NumHandled) ->
NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
- StateData#state.ack_queue),
- StateData#state{ack_queue = NewQueue}.
+ StateData#state.mgmt_queue),
+ StateData#state{mgmt_queue = NewQueue}.
-limit_queue_length(#state{max_ack_queue = Limit} = StateData)
+limit_queue_length(#state{mgmt_max_queue = Limit} = StateData)
when Limit == infinity;
Limit == unlimited ->
StateData;
limit_queue_length(#state{jid = JID,
- ack_queue = Queue,
- max_ack_queue = Limit} = StateData) ->
+ mgmt_queue = Queue,
+ mgmt_max_queue = Limit} = StateData) ->
case queue:len(Queue) >= Limit of
true ->
?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
[jlib:jid_to_string(JID)]),
- limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)});
+ limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)});
false ->
StateData
end.
-log_pending_state(StateData) when StateData#state.sm_state /= pending ->
+log_pending_state(StateData) when StateData#state.mgmt_state /= pending ->
?INFO_MSG("Waiting for resumption of stream for ~s",
[jlib:jid_to_string(StateData#state.jid)]);
log_pending_state(_StateData) ->
ok.
-handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active;
- StateData#state.sm_state == pending ->
- Queue = StateData#state.ack_queue,
+handle_unacked_stanzas(StateData, F)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ Queue = StateData#state.mgmt_queue,
case queue:len(Queue) of
0 ->
ok;
handle_unacked_stanzas(_StateData, _F) ->
ok.
-handle_unacked_stanzas(StateData) when StateData#state.sm_state == active;
- StateData#state.sm_state == pending ->
- ReRoute = case StateData#state.resend_on_timeout of
+handle_unacked_stanzas(StateData)
+ when StateData#state.mgmt_state == active;
+ StateData#state.mgmt_state == pending ->
+ ReRoute = case StateData#state.mgmt_resend of
true ->
fun ejabberd_router:route/3;
false ->
pres_invis = OldStateData#state.pres_invis,
privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields,
- sm_xmlns = OldStateData#state.sm_xmlns,
- ack_queue = OldStateData#state.ack_queue,
- resume_timeout = OldStateData#state.resume_timeout,
- n_stanzas_in = OldStateData#state.n_stanzas_in,
- n_stanzas_out = OldStateData#state.n_stanzas_out,
- sm_state = active}};
+ mgmt_xmlns = OldStateData#state.mgmt_xmlns,
+ mgmt_queue = OldStateData#state.mgmt_queue,
+ mgmt_timeout = OldStateData#state.mgmt_timeout,
+ mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
+ mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
+ mgmt_state = active}};
_ ->
{error, <<"Cannot grab session state">>}
end