%% terminate immediatetly. If the fsm trap_exit process flag has been
%% set to true, the FSM terminate function will called.
%% - You can pass the gen_fsm options to control resource usage.
-%% {max_messages, N} will exit the process with priority_shutdown
+%% {max_queue, N} will exit the process with priority_shutdown
%% - You can limit the time processing a message (TODO): If the
%% message processing does not return in a given period of time, the
%% process will be terminated.
sync_send_all_state_event/2, sync_send_all_state_event/3,
reply/2,
start_timer/2,send_event_after/2,cancel_timer/1,
- enter_loop/4, enter_loop/5, enter_loop/6]).
+ enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]).
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = gen:debug_options(Options),
- Limits= limit_options(Options),
- loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits).
+ Limits = limit_options(Options),
+ Queue = queue:new(),
+ QueueLen = 0,
+ loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug,
+ Limits, Queue, QueueLen).
get_proc_name(Pid) when is_pid(Pid) ->
Pid;
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
-init_it(Starter, Parent, Name, Mod, Args, Options) ->
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+ Name = name(Name0),
Debug = gen:debug_options(Options),
- Limits= limit_options(Options),
+ Limits = limit_options(Options),
+ Queue = queue:new(),
+ QueueLen = 0,
case catch Mod:init(Args) of
{ok, StateName, StateData} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits);
+ loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen);
{ok, StateName, StateData, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits);
+ loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen);
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
exit(Error)
end.
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+name(Pid) when is_pid(Pid) -> Pid.
+
%%-----------------------------------------------------------------
%% The MAIN loop
%%-----------------------------------------------------------------
+loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
+ Limits, Queue, QueueLen)
+ when QueueLen > 0 ->
+ case queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
+ Debug, Limits, Queue1, QueueLen - 1, false);
+ {empty, _} ->
+ Reason = internal_queue_error,
+ error_info(Reason, Name, hibernate, StateName, StateData, Debug),
+ exit(Reason)
+ end;
+loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
+ Limits, _Queue, _QueueLen) ->
+ proc_lib:hibernate(?MODULE,wake_hib,
+ [Parent, Name, StateName, StateData, Mod,
+ Debug, Limits]);
%% First we test if we have reach a defined limit ...
-loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
+loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
+ Limits, Queue, QueueLen) ->
try
- message_queue_len(Limits)
+ message_queue_len(Limits, QueueLen)
%% TODO: We can add more limit checking here...
catch
{process_limit, Limit} ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug)
end,
process_message(Parent, Name, StateName, StateData,
- Mod, Time, Debug, Limits).
+ Mod, Time, Debug, Limits, Queue, QueueLen).
%% ... then we can process a new message:
-process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
+process_message(Parent, Name, StateName, StateData, Mod, Time, Debug,
+ Limits, Queue, QueueLen) ->
+ {Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time),
+ decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time,
+ Debug, Limits, Queue1, QueueLen1, false).
+
+collect_messages(Queue, QueueLen, Time) ->
+ receive
+ Input ->
+ case Input of
+ {'EXIT', _Parent, priority_shutdown} ->
+ {Input, Queue, QueueLen};
+ _ ->
+ collect_messages(
+ queue:in(Input, Queue), QueueLen + 1, Time)
+ end
+ after 0 ->
+ case queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ {Msg, Queue1, QueueLen - 1};
+ {empty, _} ->
+ receive
+ Input ->
+ {Input, Queue, QueueLen}
+ after Time ->
+ {{'$gen_event', timeout}, Queue, QueueLen}
+ end
+ end
+ end.
+
+
+wake_hib(Parent, Name, StateName, StateData, Mod, Debug,
+ Limits) ->
Msg = receive
- {'EXIT', Parent, priority_shutdown} ->
- {'EXIT', Parent, priority_shutdown}
- after 0 ->
- receive
- Input ->
- Input
- after Time ->
- {'$gen_event', timeout}
- end
+ Input ->
+ Input
end,
+ Queue = queue:new(),
+ QueueLen = 0,
+ decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
+ Debug, Limits, Queue, QueueLen, true).
+
+decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug,
+ Limits, Queue, QueueLen, Hib) ->
+ put('$internal_queue_len', QueueLen),
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, StateName, StateData,
- Mod, Time, Limits]);
+ Mod, Time, Limits, Queue, QueueLen], Hib);
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug);
_Msg when Debug == [] ->
handle_msg(Msg, Parent, Name, StateName, StateData,
- Mod, Time, Limits);
+ Mod, Time, Limits, Queue, QueueLen);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, StateName}, {in, Msg}),
handle_msg(Msg, Parent, Name, StateName, StateData,
- Mod, Time, Debug1, Limits)
+ Mod, Time, Debug1, Limits, Queue, QueueLen)
end.
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-%% TODO: Fix me
system_continue(Parent, Debug, [Name, StateName, StateData,
- Mod, Time, Limits]) ->
- loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits).
+ Mod, Time, Limits, Queue, QueueLen]) ->
+ loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
+ Limits, Queue, QueueLen).
system_terminate(Reason, _Parent, Debug,
[Name, StateName, StateData, Mod, _Time, _Limits]) ->
terminate(Reason, Name, [], Mod, StateName, StateData, Debug).
-system_code_change([Name, StateName, StateData, Mod, Time, Limits],
+system_code_change([Name, StateName, StateData, Mod, Time,
+ Limits, Queue, QueueLen],
_Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of
{ok, NewStateName, NewStateData} ->
- {ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]};
+ {ok, [Name, NewStateName, NewStateData, Mod, Time,
+ Limits, Queue, QueueLen]};
Else -> Else
end.
io:format(Dev, "*DBG* ~p switched to state ~w~n",
[Name, StateName]).
-handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here
+handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
+ Limits, Queue, QueueLen) -> %No debug here
From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} ->
loop(Parent, Name, NStateName, NStateData,
- Mod, infinity, [], Limits);
+ Mod, infinity, [], Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} ->
- loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
- {reply, Reply, NStateName, NStateData} when From /= undefined ->
+ loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
+ Limits, Queue, QueueLen);
+ {reply, Reply, NStateName, NStateData} when From =/= undefined ->
reply(From, Reply),
loop(Parent, Name, NStateName, NStateData,
- Mod, infinity, [], Limits);
- {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
+ Mod, infinity, [], Limits, Queue, QueueLen);
+ {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
reply(From, Reply),
- loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
+ loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
+ Limits, Queue, QueueLen);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
- {stop, Reason, Reply, NStateData} when From /= undefined ->
+ {stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, [])),
reply(From, Reply),
end.
handle_msg(Msg, Parent, Name, StateName, StateData,
- Mod, _Time, Debug, Limits) ->
+ Mod, _Time, Debug, Limits, Queue, QueueLen) ->
From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData,
- Mod, infinity, Debug1, Limits);
+ Mod, infinity, Debug1, Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData,
- Mod, Time1, Debug1, Limits);
- {reply, Reply, NStateName, NStateData} when From /= undefined ->
+ Mod, Time1, Debug1, Limits, Queue, QueueLen);
+ {reply, Reply, NStateName, NStateData} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
- Mod, infinity, Debug1, Limits);
- {reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
+ Mod, infinity, Debug1, Limits, Queue, QueueLen);
+ {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
- Mod, Time1, Debug1, Limits);
+ Mod, Time1, Debug1, Limits, Queue, QueueLen);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
- {stop, Reason, Reply, NStateData} when From /= undefined ->
+ {stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, Debug)),
reply(Name, From, Reply, Debug, StateName),
%% Throw max_queue if we have reach the max queue size
%% Returns ok otherwise
-message_queue_len(#limits{max_queue = undefined}) ->
+message_queue_len(#limits{max_queue = undefined}, _QueueLen) ->
ok;
-message_queue_len(#limits{max_queue = MaxQueue}) ->
+message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
Pid = self(),
case process_info(Pid, message_queue_len) of
- {message_queue_len, N} when N > MaxQueue ->
- throw({process_limit, {max_queue, N}});
+ {message_queue_len, N} when N + QueueLen > MaxQueue ->
+ throw({process_limit, {max_queue, N + QueueLen}});
_ ->
ok
end.