]> granicus.if.org Git - ejabberd/commitdiff
use queue to reduced quadratic time effort on selective receive (thanks to Alexey...
authorEvgeniy Khramtsov <xramtsov@gmail.com>
Wed, 7 Oct 2009 13:41:36 +0000 (13:41 +0000)
committerEvgeniy Khramtsov <xramtsov@gmail.com>
Wed, 7 Oct 2009 13:41:36 +0000 (13:41 +0000)
SVN Revision: 2644

src/ejabberd_service.erl
src/p1_fsm.erl

index 34d62f43311a41807e69fa75f8d474740de6ee8c..2305407b08fc3f2a3dabaa0a5aa0811086436c0b 100644 (file)
@@ -27,7 +27,9 @@
 -module(ejabberd_service).
 -author('alexey@process-one.net').
 
--behaviour(gen_fsm).
+-define(GEN_FSM, p1_fsm).
+
+-behaviour(?GEN_FSM).
 
 %% External exports
 -export([start/2,
 -define(FSMOPTS, []).
 -endif.
 
+%% Only change this value if you now what your are doing:
+-define(FSMLIMITS,[]).
+%% -define(FSMLIMITS, [{max_queue, 2000}]).
+
 -define(STREAM_HEADER,
        "<?xml version='1.0'?>"
        "<stream:stream "
@@ -100,7 +106,8 @@ start(SockData, Opts) ->
     supervisor:start_child(ejabberd_service_sup, [SockData, Opts]).
 
 start_link(SockData, Opts) ->
-    gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS).
+    ?GEN_FSM:start_link(
+       ejabberd_service, [SockData, Opts], ?FSMLIMITS ++ ?FSMOPTS).
 
 socket_type() ->
     xml_stream.
index c4de9faa4eac2431b420deb4adf7334d2d8c1cf8..dec999266c06e7321bc2aa92ce854465bccabedc 100644 (file)
@@ -21,7 +21,7 @@
 %%   terminate immediatetly. If the fsm trap_exit process flag has been
 %%   set to true, the FSM terminate function will called.
 %%   - You can pass the gen_fsm options to control resource usage.
-%%   {max_messages, N} will exit the process with priority_shutdown
+%%   {max_queue, N} will exit the process with priority_shutdown
 %%   - You can limit the time processing a message (TODO): If the
 %%   message processing does not return in a given period of time, the
 %%   process will be terminated.
         sync_send_all_state_event/2, sync_send_all_state_event/3,
         reply/2,
         start_timer/2,send_event_after/2,cancel_timer/1,
-        enter_loop/4, enter_loop/5, enter_loop/6]).
+        enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
 
 -export([behaviour_info/1]).
 
@@ -273,8 +273,11 @@ enter_loop(Mod, Options, StateName, StateData, ServerName, Timeout) ->
     Name = get_proc_name(ServerName),
     Parent = get_parent(),
     Debug = gen:debug_options(Options),
-    Limits= limit_options(Options),
-    loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits).
+    Limits = limit_options(Options),
+    Queue = queue:new(),
+    QueueLen = 0,
+    loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug,
+        Limits, Queue, QueueLen).
 
 get_proc_name(Pid) when is_pid(Pid) ->
     Pid;
@@ -329,16 +332,19 @@ name_to_pid(Name) ->
 %%% ---------------------------------------------------
 init_it(Starter, self, Name, Mod, Args, Options) ->
     init_it(Starter, self(), Name, Mod, Args, Options);
-init_it(Starter, Parent, Name, Mod, Args, Options) ->
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+    Name = name(Name0),
     Debug = gen:debug_options(Options),
-    Limits= limit_options(Options),
+    Limits = limit_options(Options),
+    Queue = queue:new(),
+    QueueLen = 0,
     case catch Mod:init(Args) of
        {ok, StateName, StateData} ->
            proc_lib:init_ack(Starter, {ok, self()}),       
-           loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits);
+           loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen);
        {ok, StateName, StateData, Timeout} ->
            proc_lib:init_ack(Starter, {ok, self()}),       
-           loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits);
+           loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen);
        {stop, Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
@@ -354,13 +360,35 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
            exit(Error)
     end.
 
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+name(Pid) when is_pid(Pid) -> Pid.
+
 %%-----------------------------------------------------------------
 %% The MAIN loop
 %%-----------------------------------------------------------------
+loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
+     Limits, Queue, QueueLen)
+  when QueueLen > 0 ->
+    case queue:out(Queue) of
+       {{value, Msg}, Queue1} ->
+           decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
+                      Debug, Limits, Queue1, QueueLen - 1, false);
+       {empty, _} ->
+           Reason = internal_queue_error,
+           error_info(Reason, Name, hibernate, StateName, StateData, Debug),
+           exit(Reason)
+    end;
+loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
+     Limits, _Queue, _QueueLen) ->
+    proc_lib:hibernate(?MODULE,wake_hib,
+                      [Parent, Name, StateName, StateData, Mod,
+                       Debug, Limits]);
 %% First we test if we have reach a defined limit ...
-loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
+loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
+     Limits, Queue, QueueLen) ->
     try        
-       message_queue_len(Limits)
+       message_queue_len(Limits, QueueLen)
        %% TODO: We can add more limit checking here...
     catch
        {process_limit, Limit} ->
@@ -369,54 +397,89 @@ loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
            terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug)
     end,
     process_message(Parent, Name, StateName, StateData,
-                   Mod, Time, Debug, Limits).
+                   Mod, Time, Debug, Limits, Queue, QueueLen).
 %% ... then we can process a new message:
-process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
+process_message(Parent, Name, StateName, StateData, Mod, Time, Debug,
+               Limits, Queue, QueueLen) ->
+    {Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time),
+    decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time,
+              Debug, Limits, Queue1, QueueLen1, false).
+
+collect_messages(Queue, QueueLen, Time) ->
+    receive
+       Input ->
+           case Input of
+               {'EXIT', _Parent, priority_shutdown} ->
+                   {Input, Queue, QueueLen};
+               _ ->
+                   collect_messages(
+                     queue:in(Input, Queue), QueueLen + 1, Time)
+           end
+    after 0 ->
+           case queue:out(Queue) of
+               {{value, Msg}, Queue1} ->
+                   {Msg, Queue1, QueueLen - 1};
+               {empty, _} ->
+                   receive
+                       Input ->
+                           {Input, Queue, QueueLen}
+                   after Time ->
+                           {{'$gen_event', timeout}, Queue, QueueLen}
+                   end
+           end
+    end.
+
+
+wake_hib(Parent, Name, StateName, StateData, Mod, Debug,
+        Limits) ->
     Msg = receive
-             {'EXIT', Parent, priority_shutdown} ->
-                 {'EXIT', Parent, priority_shutdown}
-         after 0 ->
-                 receive
-                     Input ->
-                         Input
-                 after Time ->
-                         {'$gen_event', timeout}
-                 end
+             Input ->
+                 Input
          end,
+    Queue = queue:new(),
+    QueueLen = 0,
+    decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
+              Debug, Limits, Queue, QueueLen, true).
+
+decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug,
+          Limits, Queue, QueueLen, Hib) ->
+    put('$internal_queue_len', QueueLen),
     case Msg of
         {system, From, Req} ->
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                                  [Name, StateName, StateData,
-                                  Mod, Time, Limits]);
+                                  Mod, Time, Limits, Queue, QueueLen], Hib);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug);
        _Msg when Debug == [] ->
            handle_msg(Msg, Parent, Name, StateName, StateData,
-                      Mod, Time, Limits);
+                      Mod, Time, Limits, Queue, QueueLen);
        _Msg ->
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
                                      {Name, StateName}, {in, Msg}),
            handle_msg(Msg, Parent, Name, StateName, StateData,
-                      Mod, Time, Debug1, Limits)
+                      Mod, Time, Debug1, Limits, Queue, QueueLen)
     end.
 
 %%-----------------------------------------------------------------
 %% Callback functions for system messages handling.
 %%-----------------------------------------------------------------
-%% TODO: Fix me
 system_continue(Parent, Debug, [Name, StateName, StateData,
-                               Mod, Time, Limits]) ->
-    loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits).
+                               Mod, Time, Limits, Queue, QueueLen]) ->
+    loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
+        Limits, Queue, QueueLen).
 
 system_terminate(Reason, _Parent, Debug,
                 [Name, StateName, StateData, Mod, _Time, _Limits]) ->
     terminate(Reason, Name, [], Mod, StateName, StateData, Debug).
 
-system_code_change([Name, StateName, StateData, Mod, Time, Limits],
+system_code_change([Name, StateName, StateData, Mod, Time,
+                   Limits, Queue, QueueLen],
                   _Module, OldVsn, Extra) ->
     case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of
        {ok, NewStateName, NewStateData} ->
-           {ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]};
+           {ok, [Name, NewStateName, NewStateData, Mod, Time,
+                 Limits, Queue, QueueLen]};
        Else -> Else
     end.
 
@@ -453,24 +516,27 @@ print_event(Dev, return, {Name, StateName}) ->
     io:format(Dev, "*DBG* ~p switched to state ~w~n",
              [Name, StateName]).
 
-handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here
+handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
+          Limits, Queue, QueueLen) -> %No debug here
     From = from(Msg),
     case catch dispatch(Msg, Mod, StateName, StateData) of
        {next_state, NStateName, NStateData} ->     
            loop(Parent, Name, NStateName, NStateData,
-                Mod, infinity, [], Limits);
+                Mod, infinity, [], Limits, Queue, QueueLen);
        {next_state, NStateName, NStateData, Time1} ->
-           loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
-        {reply, Reply, NStateName, NStateData} when From /= undefined ->
+           loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
+                Limits, Queue, QueueLen);
+        {reply, Reply, NStateName, NStateData} when From =/= undefined ->
            reply(From, Reply),
            loop(Parent, Name, NStateName, NStateData,
-                Mod, infinity, [], Limits);
-        {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
+                Mod, infinity, [], Limits, Queue, QueueLen);
+        {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
            reply(From, Reply),
-           loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
+           loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
+                Limits, Queue, QueueLen);
        {stop, Reason, NStateData} ->
            terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
-       {stop, Reason, Reply, NStateData} when From /= undefined ->
+       {stop, Reason, Reply, NStateData} when From =/= undefined ->
            {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
                                           StateName, NStateData, [])),
            reply(From, Reply),
@@ -483,30 +549,30 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No d
     end.
 
 handle_msg(Msg, Parent, Name, StateName, StateData,
-          Mod, _Time, Debug, Limits) ->
+          Mod, _Time, Debug, Limits, Queue, QueueLen) ->
     From = from(Msg),
     case catch dispatch(Msg, Mod, StateName, StateData) of
        {next_state, NStateName, NStateData} ->
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
                                      {Name, NStateName}, return),
            loop(Parent, Name, NStateName, NStateData,
-                Mod, infinity, Debug1, Limits);
+                Mod, infinity, Debug1, Limits, Queue, QueueLen);
        {next_state, NStateName, NStateData, Time1} ->
            Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, 
                                      {Name, NStateName}, return),
            loop(Parent, Name, NStateName, NStateData,
-                Mod, Time1, Debug1, Limits);
-        {reply, Reply, NStateName, NStateData} when From /= undefined ->
+                Mod, Time1, Debug1, Limits, Queue, QueueLen);
+        {reply, Reply, NStateName, NStateData} when From =/= undefined ->
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
            loop(Parent, Name, NStateName, NStateData,
-                Mod, infinity, Debug1, Limits);
-        {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
+                Mod, infinity, Debug1, Limits, Queue, QueueLen);
+        {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
            Debug1 = reply(Name, From, Reply, Debug, NStateName),
            loop(Parent, Name, NStateName, NStateData,
-                Mod, Time1, Debug1, Limits);
+                Mod, Time1, Debug1, Limits, Queue, QueueLen);
        {stop, Reason, NStateData} ->
            terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
-       {stop, Reason, Reply, NStateData} when From /= undefined ->
+       {stop, Reason, Reply, NStateData} when From =/= undefined ->
            {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
                                           StateName, NStateData, Debug)),
            reply(Name, From, Reply, Debug, StateName),
@@ -671,13 +737,13 @@ limit_options([_|Options], Limits) ->
 
 %% Throw max_queue if we have reach the max queue size
 %% Returns ok otherwise
-message_queue_len(#limits{max_queue = undefined}) ->
+message_queue_len(#limits{max_queue = undefined}, _QueueLen) ->
     ok;
-message_queue_len(#limits{max_queue = MaxQueue}) ->
+message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
     Pid = self(),
     case process_info(Pid, message_queue_len) of
-        {message_queue_len, N} when N > MaxQueue ->
-           throw({process_limit, {max_queue, N}});
+        {message_queue_len, N} when N + QueueLen > MaxQueue ->
+           throw({process_limit, {max_queue, N + QueueLen}});
        _ ->
            ok
     end.