]> granicus.if.org Git - ejabberd/commitdiff
* src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829)
authorBadlop <badlop@process-one.net>
Tue, 28 Apr 2009 14:57:16 +0000 (14:57 +0000)
committerBadlop <badlop@process-one.net>
Tue, 28 Apr 2009 14:57:16 +0000 (14:57 +0000)
SVN Revision: 2047

ChangeLog
src/ejabberd_hooks.erl

index bd175f4ad39b4381c11de19d872939e9cd50a5b1..7b0ba705d18bbff3fe63c3c7ceb8f34188cc89e3 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+2009-04-28  Badlop  <badlop@process-one.net>
+
+       * src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829)
+
 2009-04-27  Badlop  <badlop@process-one.net>
 
        * src/xml.erl: More verbose error reporting for
index 9851a241cf9d022d9c5917480020c6fdd238f4fc..bb71f14e164bd8b34465fdb611cdc7ec389f80d7 100644 (file)
 -export([start_link/0,
         add/3,
         add/4,
+        add_dist/5,
         delete/3,
         delete/4,
+        delete_dist/5,
         run/2,
         run_fold/3,
         add/5,
+        add_dist/6,
         delete/5,
+        delete_dist/6,
         run/3,
         run_fold/4]).
 
@@ -52,6 +56,9 @@
 
 -include("ejabberd.hrl").
 
+%% Timeout of 5 seconds in calls to distributed hooks
+-define(TIMEOUT_DISTRIBUTED_HOOK, 5000).
+
 -record(state, {}).
 
 %%%----------------------------------------------------------------------
@@ -77,6 +84,12 @@ add(Hook, Module, Function, Seq) ->
 add(Hook, Host, Module, Function, Seq) ->
     gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}).
 
+add_dist(Hook, Node, Module, Function, Seq) ->
+    gen_server:call(ejabberd_hooks, {add, Hook, global, Node, Module, Function, Seq}).
+
+add_dist(Hook, Host, Node, Module, Function, Seq) ->
+    gen_server:call(ejabberd_hooks, {add, Hook, Host, Node, Module, Function, Seq}).
+
 %% @spec (Hook::atom(), Function::function(), Seq::integer()) -> ok
 %% @doc See del/4.
 delete(Hook, Function, Seq) when is_function(Function) ->
@@ -94,6 +107,12 @@ delete(Hook, Module, Function, Seq) ->
 delete(Hook, Host, Module, Function, Seq) ->
     gen_server:call(ejabberd_hooks, {delete, Hook, Host, Module, Function, Seq}).
 
+delete_dist(Hook, Node, Module, Function, Seq) ->
+    delete_dist(Hook, global, Node, Module, Function, Seq).
+
+delete_dist(Hook, Host, Node, Module, Function, Seq) ->
+    gen_server:call(ejabberd_hooks, {delete, Hook, Host, Node, Module, Function, Seq}).
+
 %% @spec (Hook::atom(), Args) -> ok
 %% @doc Run the calls of this hook in order, don't care about function results.
 %% If a call returns stop, no more calls are performed.
@@ -167,6 +186,24 @@ handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) ->
                    ok
            end,
     {reply, Reply, State};
+handle_call({add, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
+    Reply = case ets:lookup(hooks, {Hook, Host}) of
+               [{_, Ls}] ->
+                   El = {Seq, Node, Module, Function},
+                   case lists:member(El, Ls) of
+                       true ->
+                           ok;
+                       false ->
+                           NewLs = lists:merge(Ls, [El]),
+                           ets:insert(hooks, {{Hook, Host}, NewLs}),
+                           ok
+                   end;
+               [] ->
+                   NewLs = [{Seq, Node, Module, Function}],
+                   ets:insert(hooks, {{Hook, Host}, NewLs}),
+                   ok
+           end,
+    {reply, Reply, State};
 handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
     Reply = case ets:lookup(hooks, {Hook, Host}) of
                [{_, Ls}] ->
@@ -177,6 +214,16 @@ handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
                    ok
            end,
     {reply, Reply, State};
+handle_call({delete, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
+    Reply = case ets:lookup(hooks, {Hook, Host}) of
+               [{_, Ls}] ->
+                   NewLs = lists:delete({Seq, Node, Module, Function}, Ls),
+                   ets:insert(hooks, {{Hook, Host}, NewLs}),
+                   ok;
+               [] ->
+                   ok
+           end,
+    {reply, Reply, State};
 handle_call(_Request, _From, State) ->
     Reply = ok,
     {reply, Reply, State}.
@@ -217,6 +264,25 @@ code_change(_OldVsn, State, _Extra) ->
 
 run1([], _Hook, _Args) ->
     ok;
+run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
+    case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of
+       timeout ->
+           ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
+                      [Node, {Hook, Args}]),
+           run1(Ls, Hook, Args);
+       {badrpc, Reason} ->
+           ?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
+                      [Node, Reason, {Hook, Args}]),
+           run1(Ls, Hook, Args);
+       stop ->
+           ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
+                     "Stop.", [self(), node(), Node]), % debug code
+           ok;
+       Res ->
+           ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
+                     "The response is:~n~s", [self(), node(), Node, Res]), % debug code
+           run1(Ls, Hook, Args)
+    end;
 run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
     Res = if is_function(Function) ->
                  catch apply(Function, Args);
@@ -237,6 +303,27 @@ run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
 
 run_fold1([], _Hook, Val, _Args) ->
     Val;
+run_fold1([{_Seq, Node, Module, Function} | Ls], Hook, Val, Args) ->
+    case rpc:call(Node, Module, Function, [Val | Args], ?TIMEOUT_DISTRIBUTED_HOOK) of
+       {badrpc, Reason} ->
+           ?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
+                      [Node, Reason, {Hook, Args}]),
+           run_fold1(Ls, Hook, Val, Args);
+       timeout ->
+           ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
+                      [Node, {Hook, Args}]),
+           run_fold1(Ls, Hook, Val, Args);
+       stop ->
+           stopped;
+       {stop, NewVal} ->
+           ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
+                     "Stop, and the NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code
+           NewVal;
+       NewVal ->
+           ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
+                     "The NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code
+           run_fold1(Ls, Hook, NewVal, Args)
+    end;
 run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
     Res = if is_function(Function) ->
                  catch apply(Function, [Val | Args]);