]> granicus.if.org Git - ejabberd/commitdiff
Make ejabberd_cluster modular
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 6 Jul 2017 12:47:00 +0000 (15:47 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 6 Jul 2017 12:47:35 +0000 (15:47 +0300)
For setting the cluster backend new global option 'cluster_backend' is
introduced. The default and only available value at the moment is 'mnesia'

src/ejabberd_app.erl
src/ejabberd_cluster.erl
src/ejabberd_cluster_mnesia.erl [new file with mode: 0644]
src/ejabberd_listener.erl
src/ejabberd_mnesia.erl
src/ejabberd_sup.erl

index b52450d24a5b039a0cbd52bb81a0a594799462a6..64edf508c3a62208c68e35395962d5237ad3af1e 100644 (file)
 
 -module(ejabberd_app).
 
--behaviour(ejabberd_config).
 -author('alexey@process-one.net').
 
 -behaviour(application).
 
--export([start/2, prep_stop/1, stop/1, opt_type/1]).
+-export([start/2, prep_stop/1, stop/1]).
 
 -include("ejabberd.hrl").
 -include("logger.hrl").
@@ -49,13 +48,12 @@ start(normal, _Args) ->
     setup_if_elixir_conf_used(),
     ejabberd_config:start(),
     ejabberd_mnesia:start(),
-    set_settings_from_config(),
     file_queue_init(),
     maybe_add_nameservers(),
-    connect_nodes(),
     case ejabberd_sup:start_link() of
        {ok, SupPid} ->
            register_elixir_config_hooks(),
+           ejabberd_cluster:wait_for_sync(infinity),
            {T2, _} = statistics(wall_clock),
            ?INFO_MSG("ejabberd ~s is started in the node ~p in ~.2fs",
                      [?VERSION, node(), (T2-T1)/1000]),
@@ -88,12 +86,6 @@ stop(_State) ->
 %%% Internal functions
 %%%
 
-connect_nodes() ->
-    Nodes = ejabberd_config:get_option(cluster_nodes, []),
-    lists:foreach(fun(Node) ->
-                          net_kernel:connect_node(Node)
-                  end, Nodes).
-
 %% If ejabberd is running on some Windows machine, get nameservers and add to Erlang
 maybe_add_nameservers() ->
     case os:type() of
@@ -136,10 +128,6 @@ delete_pid_file() ->
            file:delete(PidFilename)
     end.
 
-set_settings_from_config() ->
-    Ticktime = ejabberd_config:get_option(net_ticktime, 60),
-    net_kernel:set_net_ticktime(Ticktime).
-
 file_queue_init() ->
     QueueDir = case ejabberd_config:queue_dir() of
                   undefined ->
@@ -160,15 +148,6 @@ start_apps() ->
     ejabberd:start_app(xmpp),
     ejabberd:start_app(cache_tab).
 
--spec opt_type(net_ticktime) -> fun((pos_integer()) -> pos_integer());
-             (cluster_nodes) -> fun(([node()]) -> [node()]);
-             (atom()) -> atom().
-opt_type(net_ticktime) ->
-    fun (P) when is_integer(P), P > 0 -> P end;
-opt_type(cluster_nodes) ->
-    fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end;
-opt_type(_) -> [cluster_nodes, net_ticktime].
-
 setup_if_elixir_conf_used() ->
   case ejabberd_config:is_using_elixir_config() of
     true -> 'Elixir.Ejabberd.Config.Store':start_link();
index aeae294b0e9566cffdeff1174bfd8c105f5d79e9..c04216ebcc7dfc9a7db90829cdd889350d0c04ad 100644 (file)
-%%%----------------------------------------------------------------------
-%%% File    : ejabberd_cluster.erl
-%%% Author  : Christophe Romain <christophe.romain@process-one.net>
-%%% Purpose : Ejabberd clustering management
-%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net>
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2017, Evgeny Khramtsov
+%%% @doc
 %%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2017   ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
+%%% @end
+%%% Created :  5 Jul 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
 -module(ejabberd_cluster).
+-behaviour(ejabberd_config).
+-behaviour(gen_server).
 
 %% API
--export([get_nodes/0, call/4, multicall/3, multicall/4,
-        eval_everywhere/3, eval_everywhere/4]).
--export([join/1, leave/1, get_known_nodes/0]).
--export([node_id/0, get_node_by_id/1]).
+-export([start_link/0, call/4, multicall/3, multicall/4, eval_everywhere/3,
+        eval_everywhere/4]).
+%% Backend dependent API
+-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0,
+        subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+        terminate/2, code_change/3]).
+-export([opt_type/1]).
 
--include("ejabberd.hrl").
 -include("logger.hrl").
 
--spec get_nodes() -> [node()].
+-type dst() :: pid() | atom() | {atom(), node()}.
 
-get_nodes() ->
-    mnesia:system_info(running_db_nodes).
+-callback init() -> ok | {error, any()}.
+-callback get_nodes() -> [node()].
+-callback get_known_nodes() -> [node()].
+-callback join(node()) -> ok | {error, any()}.
+-callback leave(node()) -> ok | {error, any()}.
+-callback node_id() -> binary().
+-callback get_node_by_id(binary()) -> node().
+-callback send({atom(), node()}, term()) -> boolean().
+-callback wait_for_sync(timeout()) -> ok | {error, any()}.
+-callback subscribe(dst()) -> ok.
 
--spec get_known_nodes() -> [node()].
+-record(state, {}).
 
-get_known_nodes() ->
-    lists:usort(mnesia:system_info(db_nodes)
-               ++ mnesia:system_info(extra_db_nodes)).
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 -spec call(node(), module(), atom(), [any()]) -> any().
-
 call(Node, Module, Function, Args) ->
-    rpc:call(Node, Module, Function, Args, 5000).
+    rpc:call(Node, Module, Function, Args, rpc_timeout()).
 
 -spec multicall(module(), atom(), [any()]) -> {list(), [node()]}.
-
 multicall(Module, Function, Args) ->
     multicall(get_nodes(), Module, Function, Args).
 
 -spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}.
-
 multicall(Nodes, Module, Function, Args) ->
-    rpc:multicall(Nodes, Module, Function, Args, 5000).
+    rpc:multicall(Nodes, Module, Function, Args, rpc_timeout()).
 
 -spec eval_everywhere(module(), atom(), [any()]) -> ok.
-
 eval_everywhere(Module, Function, Args) ->
     eval_everywhere(get_nodes(), Module, Function, Args),
     ok.
 
 -spec eval_everywhere([node()], module(), atom(), [any()]) -> ok.
-
 eval_everywhere(Nodes, Module, Function, Args) ->
     rpc:eval_everywhere(Nodes, Module, Function, Args),
     ok.
 
--spec join(node()) -> ok | {error, any()}.
+%%%===================================================================
+%%% Backend dependent API
+%%%===================================================================
+-spec get_nodes() -> [node()].
+get_nodes() ->
+    Mod = get_mod(),
+    Mod:get_nodes().
 
+-spec get_known_nodes() -> [node()].
+get_known_nodes() ->
+    Mod = get_mod(),
+    Mod:get_known_nodes().
+
+-spec join(node()) -> ok | {error, any()}.
 join(Node) ->
-    case {node(), net_adm:ping(Node)} of
-        {Node, _} ->
-            {error, {not_master, Node}};
-        {_, pong} ->
-            application:stop(ejabberd),
-            application:stop(mnesia),
-            mnesia:delete_schema([node()]),
-            application:start(mnesia),
-            mnesia:change_config(extra_db_nodes, [Node]),
-            mnesia:change_table_copy_type(schema, node(), disc_copies),
-            spawn(fun()  ->
-                lists:foreach(fun(Table) ->
-                            Type = call(Node, mnesia, table_info, [Table, storage_type]),
-                            mnesia:add_table_copy(Table, node(), Type)
-                    end, mnesia:system_info(tables)--[schema])
-                end),
-            application:start(ejabberd);
-        _ ->
-            {error, {no_ping, Node}}
-    end.
+    Mod = get_mod(),
+    Mod:join(Node).
 
 -spec leave(node()) -> ok | {error, any()}.
-
 leave(Node) ->
-    case {node(), net_adm:ping(Node)} of
-        {Node, _} ->
-            Cluster = get_nodes()--[Node],
-            leave(Cluster, Node);
-        {_, pong} ->
-            rpc:call(Node, ?MODULE, leave, [Node], 10000);
-        {_, pang} ->
-            case mnesia:del_table_copy(schema, Node) of
-                {atomic, ok} -> ok;
-                {aborted, Reason} -> {error, Reason}
-            end
-    end.
-leave([], Node) ->
-    {error, {no_cluster, Node}};
-leave([Master|_], Node) ->
-    application:stop(ejabberd),
-    application:stop(mnesia),
-    call(Master, mnesia, del_table_copy, [schema, Node]),
-    spawn(fun() ->
-                mnesia:delete_schema([node()]),
-                erlang:halt(0)
-        end),
-    ok.
+    Mod = get_mod(),
+    Mod:leave(Node).
 
 -spec node_id() -> binary().
 node_id() ->
-    integer_to_binary(erlang:phash2(node())).
+    Mod = get_mod(),
+    Mod:node_id().
 
 -spec get_node_by_id(binary()) -> node().
-get_node_by_id(Hash) ->
-    try binary_to_integer(Hash) of
-       I -> match_node_id(I)
-    catch _:_ ->
-           node()
+get_node_by_id(ID) ->
+    Mod = get_mod(),
+    Mod:get_node_by_id(ID).
+
+-spec send(dst(), term()) -> boolean().
+send(Dst, Msg) ->
+    IsLocal = case Dst of
+                 {_, Node} -> Node == node();
+                 Pid when is_pid(Pid) -> node(Pid) == node();
+                 Name when is_atom(Name) -> true;
+                 _ -> false
+             end,
+    if IsLocal ->
+           erlang:send(Dst, Msg),
+           true;
+       true ->
+           Mod = get_mod(),
+           Mod:send(Dst, Msg)
     end.
 
+-spec wait_for_sync(timeout()) -> ok | {error, any()}.
+wait_for_sync(Timeout) ->
+    Mod = get_mod(),
+    Mod:wait_for_sync(Timeout).
+
+-spec subscribe() -> ok.
+subscribe() ->
+    subscribe(self()).
+
+-spec subscribe(dst()) -> ok.
+subscribe(Proc) ->
+    Mod = get_mod(),
+    Mod:subscribe(Proc).
+
+%%%===================================================================
+%%% gen_server API
+%%%===================================================================
+init([]) ->
+    Ticktime = ejabberd_config:get_option(net_ticktime, 60),
+    Nodes = ejabberd_config:get_option(cluster_nodes, []),
+    net_kernel:set_net_ticktime(Ticktime),
+    lists:foreach(fun(Node) ->
+                          net_kernel:connect_node(Node)
+                  end, Nodes),
+    Mod = get_mod(),
+    case Mod:init() of
+       ok ->
+           Mod:subscribe(?MODULE),
+           {ok, #state{}};
+       {error, Reason} ->
+           {stop, Reason}
+    end.
+
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({node_up, Node}, State) ->
+    ?INFO_MSG("Node ~s has joined", [Node]),
+    {noreply, State};
+handle_info({node_down, Node}, State) ->
+    ?INFO_MSG("Node ~s has left", [Node]),
+    {noreply, State};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
--spec match_node_id(integer()) -> node().
-match_node_id(I) ->
-    match_node_id(I, get_nodes()).
-
--spec match_node_id(integer(), [node()]) -> node().
-match_node_id(I, [Node|Nodes]) ->
-    case erlang:phash2(Node) of
-       I -> Node;
-       _ -> match_node_id(I, Nodes)
-    end;
-match_node_id(_I, []) ->
-    node().
+get_mod() ->
+    Backend = ejabberd_config:get_option(cluster_backend, mnesia),
+    list_to_atom("ejabberd_cluster_" ++ atom_to_list(Backend)).
+
+rpc_timeout() ->
+    timer:seconds(ejabberd_config:get_option(rpc_timeout, 5)).
+
+opt_type(net_ticktime) ->
+    fun (P) when is_integer(P), P > 0 -> P end;
+opt_type(cluster_nodes) ->
+    fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end;
+opt_type(rpc_timeout) ->
+    fun (T) when is_integer(T), T > 0 -> T end;
+opt_type(cluster_backend) ->
+    fun (T) -> ejabberd_config:v_db(?MODULE, T) end;
+opt_type(_) ->
+    [rpc_timeout, cluster_backend, cluster_nodes, net_ticktime].
diff --git a/src/ejabberd_cluster_mnesia.erl b/src/ejabberd_cluster_mnesia.erl
new file mode 100644 (file)
index 0000000..100bdaf
--- /dev/null
@@ -0,0 +1,144 @@
+%%%----------------------------------------------------------------------
+%%% File    : ejabberd_cluster.erl
+%%% Author  : Christophe Romain <christophe.romain@process-one.net>
+%%% Purpose : Ejabberd clustering management
+%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2017   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_cluster_mnesia).
+-behaviour(ejabberd_cluster).
+
+%% API
+-export([init/0, get_nodes/0, join/1, leave/1,
+        get_known_nodes/0, node_id/0, get_node_by_id/1,
+        send/2, wait_for_sync/1, subscribe/1]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+
+-spec init() -> ok.
+init() ->
+    ok.
+
+-spec get_nodes() -> [node()].
+
+get_nodes() ->
+    mnesia:system_info(running_db_nodes).
+
+-spec get_known_nodes() -> [node()].
+
+get_known_nodes() ->
+    lists:usort(mnesia:system_info(db_nodes)
+               ++ mnesia:system_info(extra_db_nodes)).
+
+-spec join(node()) -> ok | {error, any()}.
+
+join(Node) ->
+    case {node(), net_adm:ping(Node)} of
+        {Node, _} ->
+            {error, {not_master, Node}};
+        {_, pong} ->
+            application:stop(ejabberd),
+            application:stop(mnesia),
+            mnesia:delete_schema([node()]),
+            application:start(mnesia),
+            mnesia:change_config(extra_db_nodes, [Node]),
+            mnesia:change_table_copy_type(schema, node(), disc_copies),
+            spawn(fun()  ->
+                lists:foreach(fun(Table) ->
+                            Type = ejabberd_cluster:call(
+                                    Node, mnesia, table_info, [Table, storage_type]),
+                            mnesia:add_table_copy(Table, node(), Type)
+                    end, mnesia:system_info(tables)--[schema])
+                end),
+            application:start(ejabberd);
+        _ ->
+            {error, {no_ping, Node}}
+    end.
+
+-spec leave(node()) -> ok | {error, any()}.
+
+leave(Node) ->
+    case {node(), net_adm:ping(Node)} of
+        {Node, _} ->
+            Cluster = get_nodes()--[Node],
+            leave(Cluster, Node);
+        {_, pong} ->
+            rpc:call(Node, ?MODULE, leave, [Node], 10000);
+        {_, pang} ->
+            case mnesia:del_table_copy(schema, Node) of
+                {atomic, ok} -> ok;
+                {aborted, Reason} -> {error, Reason}
+            end
+    end.
+leave([], Node) ->
+    {error, {no_cluster, Node}};
+leave([Master|_], Node) ->
+    application:stop(ejabberd),
+    application:stop(mnesia),
+    ejabberd_cluster:call(Master, mnesia, del_table_copy, [schema, Node]),
+    spawn(fun() ->
+                mnesia:delete_schema([node()]),
+                erlang:halt(0)
+        end),
+    ok.
+
+-spec node_id() -> binary().
+node_id() ->
+    integer_to_binary(erlang:phash2(node())).
+
+-spec get_node_by_id(binary()) -> node().
+get_node_by_id(Hash) ->
+    try binary_to_integer(Hash) of
+       I -> match_node_id(I)
+    catch _:_ ->
+           node()
+    end.
+
+-spec send({atom(), node()}, term()) -> boolean().
+send(Dst, Msg) ->
+    erlang:send(Dst, Msg).
+
+-spec wait_for_sync(timeout()) -> ok.
+wait_for_sync(Timeout) ->
+    ?INFO_MSG("Waiting for Mnesia synchronization to complete", []),
+    mnesia:wait_for_tables(mnesia:system_info(local_tables), Timeout),
+    ok.
+
+-spec subscribe(_) -> ok.
+subscribe(_) ->
+    ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec match_node_id(integer()) -> node().
+match_node_id(I) ->
+    match_node_id(I, get_nodes()).
+
+-spec match_node_id(integer(), [node()]) -> node().
+match_node_id(I, [Node|Nodes]) ->
+    case erlang:phash2(Node) of
+       I -> Node;
+       _ -> match_node_id(I, Nodes)
+    end;
+match_node_id(_I, []) ->
+    node().
index 3ea23d61b3dfc436633dd086a6b0328f2aa28803..e9b4306e54d7f23a9f4a4e0d7602c7a0108d3c47 100644 (file)
@@ -109,6 +109,7 @@ init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
        {ok, Socket} ->
            %% Inform my parent that this port was opened succesfully
            proc_lib:init_ack({ok, self()}),
+           application:ensure_started(ejabberd),
            start_module_sup(Port, Module),
            ?INFO_MSG("Start accepting UDP connections at ~s for ~p",
                      [format_portip(PortIP), Module]),
@@ -134,6 +135,7 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
     ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
     %% Inform my parent that this port was opened succesfully
     proc_lib:init_ack({ok, self()}),
+    application:ensure_started(ejabberd),
     start_module_sup(Port, Module),
     ?INFO_MSG("Start accepting TCP connections at ~s for ~p",
              [format_portip(PortIP), Module]),
index 34691545ae5a7713aa8b4a62f9278c53fd17e28a..16e38501161349ea203fcfa78783491bef6f8989 100644 (file)
@@ -68,8 +68,6 @@ init([]) ->
                _ -> ok
            end,
            ejabberd:start_app(mnesia, permanent),
-           ?DEBUG("Waiting for Mnesia tables synchronization...", []),
-           mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity),
            Schema = read_schema_file(),
            {ok, #state{schema = Schema}};
        false ->
index 224ed16c10ef917d2d8b57f202ae755989971add..35527ebd71973a1018c5dd9ba862ed3ce1464dea 100644 (file)
@@ -41,6 +41,12 @@ init([]) ->
         brutal_kill,
         worker,
         [ejabberd_hooks]},
+    Cluster = {ejabberd_cluster,
+              {ejabberd_cluster, start_link, []},
+              permanent,
+              5000,
+              worker,
+              [ejabberd_cluster]},
     SystemMonitor =
        {ejabberd_system_monitor,
         {ejabberd_system_monitor, start_link, []},
@@ -152,6 +158,7 @@ init([]) ->
            permanent, 5000, worker, [ejabberd_pkix]},
     {ok, {{one_for_one, 10, 1},
          [Hooks,
+          Cluster,
           CyrSASL,
           Translation,
           AccessPerms,