-%%%----------------------------------------------------------------------
-%%% File : ejabberd_cluster.erl
-%%% Author : Christophe Romain <christophe.romain@process-one.net>
-%%% Purpose : Ejabberd clustering management
-%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net>
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2017, Evgeny Khramtsov
+%%% @doc
%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
+%%% @end
+%%% Created : 5 Jul 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
-module(ejabberd_cluster).
+-behaviour(ejabberd_config).
+-behaviour(gen_server).
%% API
--export([get_nodes/0, call/4, multicall/3, multicall/4,
- eval_everywhere/3, eval_everywhere/4]).
--export([join/1, leave/1, get_known_nodes/0]).
--export([node_id/0, get_node_by_id/1]).
+-export([start_link/0, call/4, multicall/3, multicall/4, eval_everywhere/3,
+ eval_everywhere/4]).
+%% Backend dependent API
+-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0,
+ subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([opt_type/1]).
--include("ejabberd.hrl").
-include("logger.hrl").
--spec get_nodes() -> [node()].
+-type dst() :: pid() | atom() | {atom(), node()}.
-get_nodes() ->
- mnesia:system_info(running_db_nodes).
+-callback init() -> ok | {error, any()}.
+-callback get_nodes() -> [node()].
+-callback get_known_nodes() -> [node()].
+-callback join(node()) -> ok | {error, any()}.
+-callback leave(node()) -> ok | {error, any()}.
+-callback node_id() -> binary().
+-callback get_node_by_id(binary()) -> node().
+-callback send({atom(), node()}, term()) -> boolean().
+-callback wait_for_sync(timeout()) -> ok | {error, any()}.
+-callback subscribe(dst()) -> ok.
--spec get_known_nodes() -> [node()].
+-record(state, {}).
-get_known_nodes() ->
- lists:usort(mnesia:system_info(db_nodes)
- ++ mnesia:system_info(extra_db_nodes)).
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec call(node(), module(), atom(), [any()]) -> any().
-
call(Node, Module, Function, Args) ->
- rpc:call(Node, Module, Function, Args, 5000).
+ rpc:call(Node, Module, Function, Args, rpc_timeout()).
-spec multicall(module(), atom(), [any()]) -> {list(), [node()]}.
-
multicall(Module, Function, Args) ->
multicall(get_nodes(), Module, Function, Args).
-spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}.
-
multicall(Nodes, Module, Function, Args) ->
- rpc:multicall(Nodes, Module, Function, Args, 5000).
+ rpc:multicall(Nodes, Module, Function, Args, rpc_timeout()).
-spec eval_everywhere(module(), atom(), [any()]) -> ok.
-
eval_everywhere(Module, Function, Args) ->
eval_everywhere(get_nodes(), Module, Function, Args),
ok.
-spec eval_everywhere([node()], module(), atom(), [any()]) -> ok.
-
eval_everywhere(Nodes, Module, Function, Args) ->
rpc:eval_everywhere(Nodes, Module, Function, Args),
ok.
--spec join(node()) -> ok | {error, any()}.
+%%%===================================================================
+%%% Backend dependent API
+%%%===================================================================
+-spec get_nodes() -> [node()].
+get_nodes() ->
+ Mod = get_mod(),
+ Mod:get_nodes().
+-spec get_known_nodes() -> [node()].
+get_known_nodes() ->
+ Mod = get_mod(),
+ Mod:get_known_nodes().
+
+-spec join(node()) -> ok | {error, any()}.
join(Node) ->
- case {node(), net_adm:ping(Node)} of
- {Node, _} ->
- {error, {not_master, Node}};
- {_, pong} ->
- application:stop(ejabberd),
- application:stop(mnesia),
- mnesia:delete_schema([node()]),
- application:start(mnesia),
- mnesia:change_config(extra_db_nodes, [Node]),
- mnesia:change_table_copy_type(schema, node(), disc_copies),
- spawn(fun() ->
- lists:foreach(fun(Table) ->
- Type = call(Node, mnesia, table_info, [Table, storage_type]),
- mnesia:add_table_copy(Table, node(), Type)
- end, mnesia:system_info(tables)--[schema])
- end),
- application:start(ejabberd);
- _ ->
- {error, {no_ping, Node}}
- end.
+ Mod = get_mod(),
+ Mod:join(Node).
-spec leave(node()) -> ok | {error, any()}.
-
leave(Node) ->
- case {node(), net_adm:ping(Node)} of
- {Node, _} ->
- Cluster = get_nodes()--[Node],
- leave(Cluster, Node);
- {_, pong} ->
- rpc:call(Node, ?MODULE, leave, [Node], 10000);
- {_, pang} ->
- case mnesia:del_table_copy(schema, Node) of
- {atomic, ok} -> ok;
- {aborted, Reason} -> {error, Reason}
- end
- end.
-leave([], Node) ->
- {error, {no_cluster, Node}};
-leave([Master|_], Node) ->
- application:stop(ejabberd),
- application:stop(mnesia),
- call(Master, mnesia, del_table_copy, [schema, Node]),
- spawn(fun() ->
- mnesia:delete_schema([node()]),
- erlang:halt(0)
- end),
- ok.
+ Mod = get_mod(),
+ Mod:leave(Node).
-spec node_id() -> binary().
node_id() ->
- integer_to_binary(erlang:phash2(node())).
+ Mod = get_mod(),
+ Mod:node_id().
-spec get_node_by_id(binary()) -> node().
-get_node_by_id(Hash) ->
- try binary_to_integer(Hash) of
- I -> match_node_id(I)
- catch _:_ ->
- node()
+get_node_by_id(ID) ->
+ Mod = get_mod(),
+ Mod:get_node_by_id(ID).
+
+-spec send(dst(), term()) -> boolean().
+send(Dst, Msg) ->
+ IsLocal = case Dst of
+ {_, Node} -> Node == node();
+ Pid when is_pid(Pid) -> node(Pid) == node();
+ Name when is_atom(Name) -> true;
+ _ -> false
+ end,
+ if IsLocal ->
+ erlang:send(Dst, Msg),
+ true;
+ true ->
+ Mod = get_mod(),
+ Mod:send(Dst, Msg)
end.
+-spec wait_for_sync(timeout()) -> ok | {error, any()}.
+wait_for_sync(Timeout) ->
+ Mod = get_mod(),
+ Mod:wait_for_sync(Timeout).
+
+-spec subscribe() -> ok.
+subscribe() ->
+ subscribe(self()).
+
+-spec subscribe(dst()) -> ok.
+subscribe(Proc) ->
+ Mod = get_mod(),
+ Mod:subscribe(Proc).
+
+%%%===================================================================
+%%% gen_server API
+%%%===================================================================
+init([]) ->
+ Ticktime = ejabberd_config:get_option(net_ticktime, 60),
+ Nodes = ejabberd_config:get_option(cluster_nodes, []),
+ net_kernel:set_net_ticktime(Ticktime),
+ lists:foreach(fun(Node) ->
+ net_kernel:connect_node(Node)
+ end, Nodes),
+ Mod = get_mod(),
+ case Mod:init() of
+ ok ->
+ Mod:subscribe(?MODULE),
+ {ok, #state{}};
+ {error, Reason} ->
+ {stop, Reason}
+ end.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({node_up, Node}, State) ->
+ ?INFO_MSG("Node ~s has joined", [Node]),
+ {noreply, State};
+handle_info({node_down, Node}, State) ->
+ ?INFO_MSG("Node ~s has left", [Node]),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
%%%===================================================================
%%% Internal functions
%%%===================================================================
--spec match_node_id(integer()) -> node().
-match_node_id(I) ->
- match_node_id(I, get_nodes()).
-
--spec match_node_id(integer(), [node()]) -> node().
-match_node_id(I, [Node|Nodes]) ->
- case erlang:phash2(Node) of
- I -> Node;
- _ -> match_node_id(I, Nodes)
- end;
-match_node_id(_I, []) ->
- node().
+get_mod() ->
+ Backend = ejabberd_config:get_option(cluster_backend, mnesia),
+ list_to_atom("ejabberd_cluster_" ++ atom_to_list(Backend)).
+
+rpc_timeout() ->
+ timer:seconds(ejabberd_config:get_option(rpc_timeout, 5)).
+
+opt_type(net_ticktime) ->
+ fun (P) when is_integer(P), P > 0 -> P end;
+opt_type(cluster_nodes) ->
+ fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end;
+opt_type(rpc_timeout) ->
+ fun (T) when is_integer(T), T > 0 -> T end;
+opt_type(cluster_backend) ->
+ fun (T) -> ejabberd_config:v_db(?MODULE, T) end;
+opt_type(_) ->
+ [rpc_timeout, cluster_backend, cluster_nodes, net_ticktime].
--- /dev/null
+%%%----------------------------------------------------------------------
+%%% File : ejabberd_cluster.erl
+%%% Author : Christophe Romain <christophe.romain@process-one.net>
+%%% Purpose : Ejabberd clustering management
+%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_cluster_mnesia).
+-behaviour(ejabberd_cluster).
+
+%% API
+-export([init/0, get_nodes/0, join/1, leave/1,
+ get_known_nodes/0, node_id/0, get_node_by_id/1,
+ send/2, wait_for_sync/1, subscribe/1]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+
+-spec init() -> ok.
+init() ->
+ ok.
+
+-spec get_nodes() -> [node()].
+
+get_nodes() ->
+ mnesia:system_info(running_db_nodes).
+
+-spec get_known_nodes() -> [node()].
+
+get_known_nodes() ->
+ lists:usort(mnesia:system_info(db_nodes)
+ ++ mnesia:system_info(extra_db_nodes)).
+
+-spec join(node()) -> ok | {error, any()}.
+
+join(Node) ->
+ case {node(), net_adm:ping(Node)} of
+ {Node, _} ->
+ {error, {not_master, Node}};
+ {_, pong} ->
+ application:stop(ejabberd),
+ application:stop(mnesia),
+ mnesia:delete_schema([node()]),
+ application:start(mnesia),
+ mnesia:change_config(extra_db_nodes, [Node]),
+ mnesia:change_table_copy_type(schema, node(), disc_copies),
+ spawn(fun() ->
+ lists:foreach(fun(Table) ->
+ Type = ejabberd_cluster:call(
+ Node, mnesia, table_info, [Table, storage_type]),
+ mnesia:add_table_copy(Table, node(), Type)
+ end, mnesia:system_info(tables)--[schema])
+ end),
+ application:start(ejabberd);
+ _ ->
+ {error, {no_ping, Node}}
+ end.
+
+-spec leave(node()) -> ok | {error, any()}.
+
+leave(Node) ->
+ case {node(), net_adm:ping(Node)} of
+ {Node, _} ->
+ Cluster = get_nodes()--[Node],
+ leave(Cluster, Node);
+ {_, pong} ->
+ rpc:call(Node, ?MODULE, leave, [Node], 10000);
+ {_, pang} ->
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} -> {error, Reason}
+ end
+ end.
+leave([], Node) ->
+ {error, {no_cluster, Node}};
+leave([Master|_], Node) ->
+ application:stop(ejabberd),
+ application:stop(mnesia),
+ ejabberd_cluster:call(Master, mnesia, del_table_copy, [schema, Node]),
+ spawn(fun() ->
+ mnesia:delete_schema([node()]),
+ erlang:halt(0)
+ end),
+ ok.
+
+-spec node_id() -> binary().
+node_id() ->
+ integer_to_binary(erlang:phash2(node())).
+
+-spec get_node_by_id(binary()) -> node().
+get_node_by_id(Hash) ->
+ try binary_to_integer(Hash) of
+ I -> match_node_id(I)
+ catch _:_ ->
+ node()
+ end.
+
+-spec send({atom(), node()}, term()) -> boolean().
+send(Dst, Msg) ->
+ erlang:send(Dst, Msg).
+
+-spec wait_for_sync(timeout()) -> ok.
+wait_for_sync(Timeout) ->
+ ?INFO_MSG("Waiting for Mnesia synchronization to complete", []),
+ mnesia:wait_for_tables(mnesia:system_info(local_tables), Timeout),
+ ok.
+
+-spec subscribe(_) -> ok.
+subscribe(_) ->
+ ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec match_node_id(integer()) -> node().
+match_node_id(I) ->
+ match_node_id(I, get_nodes()).
+
+-spec match_node_id(integer(), [node()]) -> node().
+match_node_id(I, [Node|Nodes]) ->
+ case erlang:phash2(Node) of
+ I -> Node;
+ _ -> match_node_id(I, Nodes)
+ end;
+match_node_id(_I, []) ->
+ node().