]> granicus.if.org Git - ejabberd/commitdiff
New stream management option: ack_timeout
authorHolger Weiss <holger@zedat.fu-berlin.de>
Wed, 7 Sep 2016 21:16:54 +0000 (23:16 +0200)
committerHolger Weiss <holger@zedat.fu-berlin.de>
Wed, 7 Sep 2016 21:16:54 +0000 (23:16 +0200)
Close the connection if a stream management client fails to respond to
an acknowledgement request within 60 seconds.  This number of seconds
can be changed with the new "ack_timeout" option, and the mechanism can
be disabled by specifying 'infinity'.

As a side effect of this change, a new acknowledgement is no longer
requested before the response to the previous request is received.

src/ejabberd_c2s.erl
src/ejabberd_http_bind.erl
src/ejabberd_http_ws.erl

index cf7602441b7dec6b4c5ef8baeab34f62f40feae9..09df739b473355bded57aa9efa7ccba3405bf033 100644 (file)
                mgmt_pending_since,
                mgmt_timeout,
                mgmt_max_timeout,
+               mgmt_ack_timeout,
+               mgmt_ack_timer,
                mgmt_resend,
                mgmt_stanzas_in = 0,
                mgmt_stanzas_out = 0,
+               mgmt_stanzas_req = 0,
                ask_offline = true,
                lang = <<"">>}).
 
@@ -308,13 +311,18 @@ init([{SockMod, Socket}, Opts]) ->
                    _ -> 1000
                  end,
     ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
-                     Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+                     RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo;
                      _ -> 300
                    end,
     MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
                         Max when is_integer(Max), Max >= ResumeTimeout -> Max;
                         _ -> ResumeTimeout
                       end,
+    AckTimeout = case proplists:get_value(ack_timeout, Opts) of
+                  ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000;
+                  infinity -> undefined;
+                  _ -> 60000
+                end,
     ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
                        Resend when is_boolean(Resend) -> Resend;
                        if_offline -> if_offline;
@@ -338,6 +346,7 @@ init([{SockMod, Socket}, Opts]) ->
                       mgmt_max_queue = MaxAckQueue,
                       mgmt_timeout = ResumeTimeout,
                       mgmt_max_timeout = MaxResumeTimeout,
+                      mgmt_ack_timeout = AckTimeout,
                       mgmt_resend = ResendOnTimeout},
     {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
 
@@ -1759,6 +1768,11 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
     fsm_next_state(StateName, StateData);
 handle_info(dont_ask_offline, StateName, StateData) ->
     fsm_next_state(StateName, StateData#state{ask_offline = false});
+handle_info(close, StateName, StateData) ->
+    ?DEBUG("Timeout waiting for stream management acknowledgement of ~s",
+          [jid:to_string(StateData#state.jid)]),
+    close(self()),
+    fsm_next_state(StateName, StateData);
 handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
     %% This happens if the resume_session/1 request timed out; the new session
     %% now receives the late response.
@@ -1894,8 +1908,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive ->
 send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
     mgmt_queue_add(StateData, Stanza);
 send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
-    NewStateData = send_stanza_and_ack_req(StateData, Stanza),
-    mgmt_queue_add(NewStateData, Stanza);
+    NewStateData = mgmt_queue_add(StateData, Stanza),
+    mgmt_send_stanza(NewStateData, Stanza);
 send_stanza(StateData, Stanza) ->
     send_element(StateData, Stanza),
     StateData.
@@ -2757,7 +2771,8 @@ handle_r(StateData) ->
 handle_a(StateData, Attrs) ->
     case catch jlib:binary_to_integer(fxml:get_attr_s(<<"h">>, Attrs)) of
       H when is_integer(H), H >= 0 ->
-         check_h_attribute(StateData, H);
+         NewStateData = check_h_attribute(StateData, H),
+         maybe_renew_ack_request(NewStateData);
       _ ->
          ?DEBUG("Ignoring invalid ACK element from ~s",
                 [jid:to_string(StateData#state.jid)]),
@@ -2850,16 +2865,45 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El)
 update_num_stanzas_in(StateData, _El) ->
     StateData.
 
-send_stanza_and_ack_req(StateData, Stanza) ->
-    AckReq = #xmlel{name = <<"r">>,
-                   attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
-                   children = []},
-    case send_element(StateData, Stanza) == ok andalso
-        send_element(StateData, AckReq) == ok of
+mgmt_send_stanza(StateData, Stanza) ->
+    case send_element(StateData, Stanza) of
+      ok ->
+         maybe_request_ack(StateData);
+      _ ->
+         StateData#state{mgmt_state = pending}
+    end.
+
+maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) ->
+    request_ack(StateData);
+maybe_request_ack(StateData) ->
+    StateData.
+
+request_ack(#state{mgmt_xmlns = Xmlns,
+                  mgmt_ack_timeout = AckTimeout} = StateData) ->
+    AckReq = #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, Xmlns}]},
+    case {send_element(StateData, AckReq), AckTimeout} of
+      {ok, undefined} ->
+         ok;
+      {ok, Timeout} ->
+         Timer = erlang:send_after(Timeout, self(), close),
+         StateData#state{mgmt_ack_timer = Timer,
+                         mgmt_stanzas_req = StateData#state.mgmt_stanzas_out};
+      _ ->
+         StateData#state{mgmt_state = pending}
+    end.
+
+maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) ->
+    StateData;
+maybe_renew_ack_request(#state{mgmt_ack_timer = Timer,
+                              mgmt_queue = Queue,
+                              mgmt_stanzas_out = NumStanzasOut,
+                              mgmt_stanzas_req = NumStanzasReq} = StateData) ->
+    erlang:cancel_timer(Timer),
+    case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of
       true ->
-         StateData;
+         request_ack(StateData#state{mgmt_ack_timer = undefined});
       false ->
-         StateData#state{mgmt_state = pending}
+         StateData#state{mgmt_ack_timer = undefined}
     end.
 
 mgmt_queue_add(StateData, El) ->
index 758c1cee521faeaa791e16eb7d6dc271cc409277..628119e6ff7f721dcd7aaebe0ac81d88d6c65242 100644 (file)
@@ -340,6 +340,7 @@ init([Sid, Key, IP, HOpts]) ->
     Opts1 = ejabberd_c2s_config:get_c2s_limits(),
     SOpts = lists:filtermap(fun({stream_management, _}) -> true;
                                ({max_ack_queue, _}) -> true;
+                               ({ack_timeout, _}) -> true;
                                ({resume_timeout, _}) -> true;
                                ({max_resume_timeout, _}) -> true;
                                ({resend_on_timeout, _}) -> true;
index 24554a8cc5e4fa095e55de67169dbf3dac4a4550..e76e8689aeee4bdb01e00f8c23e05f36eb80212f 100644 (file)
@@ -114,6 +114,7 @@ socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) ->
 init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
     SOpts = lists:filtermap(fun({stream_management, _}) -> true;
                                ({max_ack_queue, _}) -> true;
+                               ({ack_timeout, _}) -> true;
                                ({resume_timeout, _}) -> true;
                                ({max_resume_timeout, _}) -> true;
                                ({resend_on_timeout, _}) -> true;