]> granicus.if.org Git - ejabberd/commitdiff
Use pg2 from R14B in systems with older versions (EJAB-1349)
authorBadlop <badlop@process-one.net>
Mon, 22 Nov 2010 15:01:39 +0000 (16:01 +0100)
committerBadlop <badlop@process-one.net>
Mon, 22 Nov 2010 15:04:07 +0000 (16:04 +0100)
pg2_backport.erl is a copy of pg2.erl from Erlang/OTP R14B.
That module is used in ejabberd installations where an OTP
previous to R14 is installed.

src/ejabberd_node_groups.erl
src/eldap/Makefile.in
src/eldap/eldap_pool.erl
src/pg2_backport.erl [new file with mode: 0644]

index fc1b4ded5bab02b9c7036d4dfd09c1b3a8711c3d..8438c5e1ce7ec2287a8cb72cb2f929c21b1f9c4f 100644 (file)
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).
 
+-ifdef(SSL40).
+-define(PG2, pg2).
+-else.
+-define(PG2, pg2_backport).
+-endif.
+
 -record(state, {}).
 
 %%====================================================================
@@ -54,20 +60,20 @@ start_link() ->
 
 join(Name) ->
     PG = {?MODULE, Name},
-    pg2:create(PG),
-    pg2:join(PG, whereis(?MODULE)).
+    ?PG2:create(PG),
+    ?PG2:join(PG, whereis(?MODULE)).
 
 leave(Name) ->
     PG = {?MODULE, Name},
-    pg2:leave(PG, whereis(?MODULE)).
+    ?PG2:leave(PG, whereis(?MODULE)).
 
 get_members(Name) ->
     PG = {?MODULE, Name},
-    [node(P) || P <- pg2:get_members(PG)].
+    [node(P) || P <- ?PG2:get_members(PG)].
 
 get_closest_node(Name) ->
     PG = {?MODULE, Name},
-    node(pg2:get_closest_pid(PG)).
+    node(?PG2:get_closest_pid(PG)).
 
 %%====================================================================
 %% gen_server callbacks
index 2ebbdaf7393cca8ff9f3ab86a006242e701c3ebe..158252670b7580e8720912fe1ad7fcc7063ef062 100644 (file)
@@ -11,6 +11,7 @@ ASN_FLAGS = -bber_bin +optimize +driver
 ERLANG_CFLAGS = @ERLANG_CFLAGS@
 ERLANG_LIBS = @ERLANG_LIBS@
 
+EFLAGS += @ERLANG_SSLVER@
 EFLAGS += -I ..
 EFLAGS += -pz ..
 
index c8f224824198da55fdad7a9f490b7af8419045a6..1b29612437e78dd182c93114a769d49ff89860e0 100644 (file)
 
 -include("ejabberd.hrl").
 
+-ifdef(SSL40).
+-define(PG2, pg2).
+-else.
+-define(PG2, pg2_backport).
+-endif.
+
 %%====================================================================
 %% API
 %%====================================================================
@@ -51,14 +57,14 @@ modify_passwd(PoolName, DN, Passwd) ->
 
 start_link(Name, Hosts, Backups, Port, Rootdn, Passwd, Opts) ->
     PoolName = make_id(Name),
-    pg2:create(PoolName),
+    ?PG2:create(PoolName),
     lists:foreach(
       fun(Host) ->
              ID = erlang:ref_to_list(make_ref()),
              case catch eldap:start_link(ID, [Host|Backups], Port,
                                          Rootdn, Passwd, Opts) of
                  {ok, Pid} ->
-                     pg2:join(PoolName, Pid);
+                     ?PG2:join(PoolName, Pid);
                  _ ->
                      error
              end
@@ -68,7 +74,7 @@ start_link(Name, Hosts, Backups, Port, Rootdn, Passwd, Opts) ->
 %% Internal functions
 %%====================================================================
 do_request(Name, {F, Args}) ->
-    case pg2:get_closest_pid(make_id(Name)) of
+    case ?PG2:get_closest_pid(make_id(Name)) of
        Pid when is_pid(Pid) ->
            case catch apply(eldap, F, [Pid | Args]) of
                {'EXIT', {timeout, _}} ->
diff --git a/src/pg2_backport.erl b/src/pg2_backport.erl
new file mode 100644 (file)
index 0000000..9c9f5d6
--- /dev/null
@@ -0,0 +1,381 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2010. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(pg2_backport).
+
+-export([create/1, delete/1, join/2, leave/2]).
+-export([get_members/1, get_local_members/1]).
+-export([get_closest_pid/1, which_groups/0]).
+-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
+         terminate/2]).
+
+%%% As of R13B03 monitors are used instead of links.
+
+%%%
+%%% Exported functions
+%%%
+
+-spec start_link() -> {'ok', pid()} | {'error', term()}.
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec start() -> {'ok', pid()} | {'error', term()}.
+
+start() ->
+    ensure_started().
+
+-spec create(term()) -> 'ok'.
+
+create(Name) ->
+    ensure_started(),
+    case ets:member(pg2_table, {group, Name}) of
+        false ->
+            global:trans({{?MODULE, Name}, self()},
+                         fun() ->
+                                 gen_server:multi_call(?MODULE, {create, Name})
+                         end),
+            ok;
+        true ->
+            ok
+    end.
+
+-type name() :: term().
+
+-spec delete(name()) -> 'ok'.
+
+delete(Name) ->
+    ensure_started(),
+    global:trans({{?MODULE, Name}, self()},
+                 fun() ->
+                         gen_server:multi_call(?MODULE, {delete, Name})
+                 end),
+    ok.
+
+-spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
+
+join(Name, Pid) when is_pid(Pid) ->
+    ensure_started(),
+    case ets:member(pg2_table, {group, Name}) of
+        false ->
+            {error, {no_such_group, Name}};
+        true ->
+            global:trans({{?MODULE, Name}, self()},
+                         fun() ->
+                                 gen_server:multi_call(?MODULE,
+                                                       {join, Name, Pid})
+                         end),
+            ok
+    end.
+
+-spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
+
+leave(Name, Pid) when is_pid(Pid) ->
+    ensure_started(),
+    case ets:member(pg2_table, {group, Name}) of
+        false ->
+            {error, {no_such_group, Name}};
+        true ->
+            global:trans({{?MODULE, Name}, self()},
+                         fun() ->
+                                 gen_server:multi_call(?MODULE,
+                                                       {leave, Name, Pid})
+                         end),
+            ok
+    end.
+
+-type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
+
+-spec get_members(name()) -> get_members_ret().
+   
+get_members(Name) ->
+    ensure_started(),
+    case ets:member(pg2_table, {group, Name}) of
+        true ->
+            group_members(Name);
+        false ->
+            {error, {no_such_group, Name}}
+    end.
+
+-spec get_local_members(name()) -> get_members_ret().
+
+get_local_members(Name) ->
+    ensure_started(),
+    case ets:member(pg2_table, {group, Name}) of
+        true ->
+            local_group_members(Name);
+        false ->
+            {error, {no_such_group, Name}}
+    end.
+
+-spec which_groups() -> [name()].
+
+which_groups() ->
+    ensure_started(),
+    all_groups().
+
+-type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
+
+-spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
+
+get_closest_pid(Name) ->
+    case get_local_members(Name) of
+        [Pid] ->
+            Pid;
+        [] ->
+            {_,_,X} = erlang:now(),
+            case get_members(Name) of
+                [] -> {error, {no_process, Name}};
+                Members ->
+                    lists:nth((X rem length(Members))+1, Members)
+            end;
+        Members when is_list(Members) ->
+            {_,_,X} = erlang:now(),
+            lists:nth((X rem length(Members))+1, Members);
+        Else ->
+            Else
+    end.
+
+%%%
+%%% Callback functions from gen_server
+%%%
+
+-record(state, {}).
+
+-spec init([]) -> {'ok', #state{}}.
+
+init([]) ->
+    Ns = nodes(),
+    net_kernel:monitor_nodes(true),
+    lists:foreach(fun(N) ->
+                          {?MODULE, N} ! {new_pg2, node()},
+                          self() ! {nodeup, N}
+                  end, Ns),
+    pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
+    {ok, #state{}}.
+
+-type call() :: {'create', name()}
+              | {'delete', name()}
+              | {'join', name(), pid()}
+              | {'leave', name(), pid()}.
+
+-spec handle_call(call(), _, #state{}) -> 
+        {'reply', 'ok', #state{}}.
+
+handle_call({create, Name}, _From, S) ->
+    assure_group(Name),
+    {reply, ok, S};
+handle_call({join, Name, Pid}, _From, S) ->
+    ets:member(pg2_table, {group, Name}) andalso join_group(Name, Pid),
+    {reply, ok, S};
+handle_call({leave, Name, Pid}, _From, S) ->
+    ets:member(pg2_table, {group, Name}) andalso leave_group(Name, Pid),
+    {reply, ok, S};
+handle_call({delete, Name}, _From, S) ->
+    delete_group(Name),
+    {reply, ok, S};
+handle_call(Request, From, S) ->
+    error_logger:warning_msg("The pg2 server received an unexpected message:\n"
+                             "handle_call(~p, ~p, _)\n", 
+                             [Request, From]),
+    {noreply, S}.
+
+-type all_members() :: [[name(),...]].
+-type cast() :: {'exchange', node(), all_members()}
+              | {'del_member', name(), pid()}.
+
+-spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
+
+handle_cast({exchange, _Node, List}, S) ->
+    store(List),
+    {noreply, S};
+handle_cast(_, S) ->
+    %% Ignore {del_member, Name, Pid}.
+    {noreply, S}.
+
+-spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
+
+handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
+    member_died(MonitorRef),
+    {noreply, S};
+handle_info({nodeup, Node}, S) ->
+    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
+    {noreply, S};
+handle_info({new_pg2, Node}, S) ->
+    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
+    {noreply, S};
+handle_info(_, S) ->
+    {noreply, S}.
+
+-spec terminate(term(), #state{}) -> 'ok'.
+
+terminate(_Reason, _S) ->
+    true = ets:delete(pg2_table),
+    ok.
+
+%%%
+%%% Local functions
+%%%
+
+%%% One ETS table, pg2_table, is used for bookkeeping. The type of the
+%%% table is ordered_set, and the fast matching of partially
+%%% instantiated keys is used extensively.
+%%%
+%%% {{group, Name}}
+%%%    Process group Name.
+%%% {{ref, Pid}, RPid, MonitorRef, Counter}
+%%% {{ref, MonitorRef}, Pid}
+%%%    Each process has one monitor. Sometimes a process is spawned to
+%%%    monitor the pid (RPid). Counter is incremented when the Pid joins
+%%%    some group.
+%%% {{member, Name, Pid}, GroupCounter}
+%%% {{local_member, Name, Pid}}
+%%%    Pid is a member of group Name, GroupCounter is incremented when the
+%%%    Pid joins the group Name.
+%%% {{pid, Pid, Name}}
+%%%    Pid is a member of group Name.
+
+store(List) ->
+    _ = [(assure_group(Name)
+          andalso
+          [join_group(Name, P) || P <- Members -- group_members(Name)]) ||
+            [Name, Members] <- List],
+    ok.
+
+assure_group(Name) ->
+    Key = {group, Name},
+    ets:member(pg2_table, Key) orelse true =:= ets:insert(pg2_table, {Key}).
+
+delete_group(Name) ->
+    _ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
+    true = ets:delete(pg2_table, {group, Name}),
+    ok.
+
+member_died(Ref) ->
+    [{{ref, Ref}, Pid}] = ets:lookup(pg2_table, {ref, Ref}),
+    Names = member_groups(Pid),
+    _ = [leave_group(Name, P) || 
+            Name <- Names,
+            P <- member_in_group(Pid, Name)],
+    %% Kept for backward compatibility with links. Can be removed, eventually.
+    _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
+            Name <- Names],
+    ok.
+
+join_group(Name, Pid) ->
+    Ref_Pid = {ref, Pid}, 
+    try _ = ets:update_counter(pg2_table, Ref_Pid, {4, +1})
+    catch _:_ ->
+            {RPid, Ref} = do_monitor(Pid),
+            true = ets:insert(pg2_table, {Ref_Pid, RPid, Ref, 1}),
+            true = ets:insert(pg2_table, {{ref, Ref}, Pid})
+    end,
+    Member_Name_Pid = {member, Name, Pid},
+    try _ = ets:update_counter(pg2_table, Member_Name_Pid, {2, +1})
+    catch _:_ ->
+            true = ets:insert(pg2_table, {Member_Name_Pid, 1}),
+            _ = [ets:insert(pg2_table, {{local_member, Name, Pid}}) ||
+                    node(Pid) =:= node()],
+            true = ets:insert(pg2_table, {{pid, Pid, Name}})
+    end.
+
+leave_group(Name, Pid) ->
+    Member_Name_Pid = {member, Name, Pid},
+    try ets:update_counter(pg2_table, Member_Name_Pid, {2, -1}) of
+        N ->
+            if 
+                N =:= 0 ->
+                    true = ets:delete(pg2_table, {pid, Pid, Name}),
+                    _ = [ets:delete(pg2_table, {local_member, Name, Pid}) ||
+                            node(Pid) =:= node()],
+                    true = ets:delete(pg2_table, Member_Name_Pid);
+                true ->
+                    ok
+            end,
+            Ref_Pid = {ref, Pid}, 
+            case ets:update_counter(pg2_table, Ref_Pid, {4, -1}) of
+                0 ->
+                    [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_table, Ref_Pid),
+                    true = ets:delete(pg2_table, {ref, Ref}),
+                    true = ets:delete(pg2_table, Ref_Pid),
+                    true = erlang:demonitor(Ref, [flush]),
+                    kill_monitor_proc(RPid, Pid);
+                _ ->
+                    ok
+            end
+    catch _:_ ->
+            ok
+    end.
+
+all_members() ->
+    [[G, group_members(G)] || G <- all_groups()].
+
+group_members(Name) ->
+    [P || 
+        [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
+        _ <- lists:seq(1, N)].
+
+local_group_members(Name) ->
+    [P || 
+        [Pid] <- ets:match(pg2_table, {{local_member, Name, '$1'}}),
+        P <- member_in_group(Pid, Name)].
+
+member_in_group(Pid, Name) ->
+    case ets:lookup(pg2_table, {member, Name, Pid}) of
+        [] -> [];
+        [{{member, Name, Pid}, N}] ->
+            lists:duplicate(N, Pid)
+    end.
+
+member_groups(Pid) ->
+    [Name || [Name] <- ets:match(pg2_table, {{pid, Pid, '$1'}})].
+
+all_groups() ->
+    [N || [N] <- ets:match(pg2_table, {{group,'$1'}})].
+
+ensure_started() ->
+    case whereis(?MODULE) of
+        undefined ->
+            C = {pg2, {?MODULE, start_link, []}, permanent,
+                 1000, worker, [?MODULE]},
+            supervisor:start_child(kernel_safe_sup, C);
+        Pg2Pid ->
+            {ok, Pg2Pid}
+    end.
+
+
+kill_monitor_proc(RPid, Pid) ->
+    RPid =:= Pid orelse exit(RPid, kill).
+
+%% When/if erlang:monitor() returns before trying to connect to the
+%% other node this function can be removed.
+do_monitor(Pid) ->
+    case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
+        true ->
+            %% Assume the node is still up
+            {Pid, erlang:monitor(process, Pid)};
+        false ->
+            F = fun() -> 
+                        Ref = erlang:monitor(process, Pid),
+                        receive 
+                            {'DOWN', Ref, process, Pid, _Info} ->
+                                exit(normal)
+                        end
+                end,
+            erlang:spawn_monitor(F)
+    end.