]> granicus.if.org Git - ejabberd/commitdiff
Improve overloaded S2S queue processing new_queue
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 10 Mar 2017 17:21:04 +0000 (20:21 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 10 Mar 2017 17:21:04 +0000 (20:21 +0300)
rebar.config
src/ejabberd_s2s_out.erl

index 7088cfefee34acef68f564e5690bd8c3ca8038d4..ca41b6ee63054a138ac9bc3593c13efb022ebcd8 100644 (file)
@@ -19,7 +19,7 @@
 %%%----------------------------------------------------------------------
 
 {deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
-        {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}},
+        {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "f677e61"}},
         {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}},
         {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}},
         {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}},
index 8c9f9d631837b50eb6ba3b6aabfdeb77dcfb4a6e..60c19b082f5aa14dd408e98488915ed7f5580960 100644 (file)
@@ -145,14 +145,14 @@ process_closed(#{server := LServer, remote_server := RServer,
                 on_route := send} = State,
               Reason) ->
     ?INFO_MSG("Closing outbound s2s connection ~s -> ~s: ~s",
-             [LServer, RServer, xmpp_stream_out:format_error(Reason)]),
+             [LServer, RServer, format_error(Reason)]),
     stop(State);
 process_closed(#{server := LServer, remote_server := RServer} = State,
               Reason) ->
     Delay = get_delay(),
     ?INFO_MSG("Failed to establish outbound s2s connection ~s -> ~s: ~s; "
              "bouncing for ~p seconds",
-             [LServer, RServer, xmpp_stream_out:format_error(Reason), Delay]),
+             [LServer, RServer, format_error(Reason), Delay]),
     State1 = State#{on_route => bounce},
     State2 = bounce_queue(State1),
     xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)).
@@ -309,11 +309,9 @@ handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) ->
        queue ->
            try State#{queue => p1_queue:in(Pkt, Q)}
            catch error:full ->
-                   #{server := LServer, remote_server := RServer} = State,
-                   ?INFO_MSG("Failed to establish outbound s2s connection "
-                             "~s -> ~s: message queue is overloaded",
-                             [LServer, RServer]),
-                   stop(State#{stop_reason => queue_full})
+                   Q1 = p1_queue:set_limit(Q, unlimited),
+                   Q2 = p1_queue:in(Pkt, Q1),
+                   handle_stream_end(queue_full, State#{queue => Q2})
            end;
        bounce -> bounce_packet(Pkt, State);
        send -> set_idle_timeout(send(State, Pkt))
@@ -371,12 +369,12 @@ bounce_packet(_, State) ->
 
 -spec mk_bounce_error(binary(), state()) -> stanza_error().
 mk_bounce_error(Lang, #{stop_reason := Why}) ->
-    Reason = xmpp_stream_out:format_error(Why),
+    Reason = format_error(Why),
     case Why of
        internal_failure ->
-           xmpp:err_internal_server_error();
+           xmpp:err_internal_server_error(Reason, Lang);
        queue_full ->
-           xmpp:err_resource_constraint();
+           xmpp:err_resource_constraint(Reason, Lang);
        {dns, _} ->
            xmpp:err_remote_server_not_found(Reason, Lang);
                                             _ ->
@@ -401,6 +399,7 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) ->
 set_idle_timeout(State) ->
     State.
 
+-spec queue_fold(fun((xmpp_element(), state()) -> state()), state()) -> state().
 queue_fold(F, #{queue := Q} = State) ->
     case p1_queue:out(Q) of
        {{value, Pkt}, Q1} ->
@@ -410,6 +409,13 @@ queue_fold(F, #{queue := Q} = State) ->
            State#{queue => Q1}
     end.
 
+format_error(internal_failure) ->
+    <<"Internal server error">>;
+format_error(queue_full) ->
+    <<"Stream queue is overloaded">>;
+format_error(Reason) ->
+    xmpp_stream_out:format_error(Reason).
+
 transform_options(Opts) ->
     lists:foldl(fun transform_options/2, [], Opts).