From: Alexey Shchepin Date: Tue, 7 Nov 2006 02:08:51 +0000 (+0000) Subject: * src/ejabberd_node_groups.erl: Support for node tagging X-Git-Tag: v2.0.0~469 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=091d2bcb00cd90b04c57e40638863ea5069e9df3;p=ejabberd * src/ejabberd_node_groups.erl: Support for node tagging * src/ejabberd_sup.erl: Likewise * src/ejabberd_frontend_socket.erl: Use node tags to determine backend nodes * src/ejabberd_config.erl: Added node_type and cluster_nodes options * src/ejabberd_app.erl: Establish connections to the nodes from the cluster_nodes option * src/ejabberd_router.erl: Added balancing method option * src/ejabberd_config.erl: Likewise SVN Revision: 676 --- diff --git a/ChangeLog b/ChangeLog index c823aad78..217511e30 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,17 @@ +2006-11-07 Alexey Shchepin + + * src/ejabberd_node_groups.erl: Support for node tagging + * src/ejabberd_sup.erl: Likewise + * src/ejabberd_frontend_socket.erl: Use node tags to determine + backend nodes + * src/ejabberd_config.erl: Added node_type and cluster_nodes + options + * src/ejabberd_app.erl: Establish connections to the nodes from + the cluster_nodes option + + * src/ejabberd_router.erl: Added balancing method option + * src/ejabberd_config.erl: Likewise + 2006-11-05 Mickael Remond * src/mod_private_odbc.erl: Support for MySQL and MSSQL. @@ -8,13 +22,13 @@ 2006-11-04 Mickael Remond * src/eldap_utils.erl: Fixed missing export. - + * src/odbc/pg.sql: Database scripts consistency. * src/odbc/mysql.sql: Likewise. * src/odbc/mssql.sql: Likewise. - * src/odbc/mysql: Database creation script should now be compliant with - MySQL 4.0.x. + * src/odbc/mysql: Database creation script should now be compliant + with MySQL 4.0.x. 2006-10-29 Mickael Remond @@ -23,8 +37,8 @@ 2006-10-28 Mickael Remond - * src/ejabberd.cfg.example: Changed the anonymous example a bit to work - in most cases. + * src/ejabberd.cfg.example: Changed the anonymous example a bit to + work in most cases. * doc/guide.tex: Likewise. 2006-10-28 Alexey Shchepin diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 7c44c8fab..028db9afa 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -27,6 +27,8 @@ start(normal, _Args) -> ejabberd_ctl:init(), gen_mod:start(), ejabberd_config:start(), + start(), + connect_nodes(), Sup = ejabberd_sup:start_link(), ejabberd_rdbms:start(), ejabberd_auth:start(), @@ -35,7 +37,6 @@ start(normal, _Args) -> %eprof:start(), %eprof:profile([self()]), %fprof:trace(start, "/tmp/fprof"), - start(), load_modules(), Sup; start(_, _) -> @@ -103,3 +104,15 @@ load_modules() -> end end, ?MYHOSTS). +connect_nodes() -> + case ejabberd_config:get_local_option(cluster_nodes) of + undefined -> + ok; + Nodes when is_list(Nodes) -> + lists:foreach(fun(Node) -> + net_kernel:connect_node(Node) + end, Nodes) + end. + + + diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index cc1d84877..38cf1c0b0 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -115,6 +115,12 @@ process_term(Term, State) -> add_option(s2s_certfile, CertFile, State); {domain_certfile, Domain, CertFile} -> add_option({domain_certfile, Domain}, CertFile, State); + {node_type, NodeType} -> + add_option(node_type, NodeType, State); + {cluster_nodes, Nodes} -> + add_option(cluster_nodes, Nodes, State); + {domain_balancing, Domain, Balancing} -> + add_option({domain_balancing, Domain}, Balancing, State); {Opt, Val} -> lists:foldl(fun(Host, S) -> process_host_term(Term, Host, S) end, State, State#state.hosts) diff --git a/src/ejabberd_frontend_socket.erl b/src/ejabberd_frontend_socket.erl index bafb96e41..a906f1912 100644 --- a/src/ejabberd_frontend_socket.erl +++ b/src/ejabberd_frontend_socket.erl @@ -13,7 +13,7 @@ %% API -export([start/4, - start_link/4, + start_link/5, %connect/3, starttls/2, starttls/3, @@ -41,9 +41,9 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(Module, SockMod, Socket, Opts) -> +start_link(Module, SockMod, Socket, Opts, Receiver) -> gen_server:start_link(?MODULE, - [Module, SockMod, Socket, Opts], []). + [Module, SockMod, Socket, Opts, Receiver], []). start(Module, SockMod, Socket, Opts) -> case Module:socket_type() of @@ -60,8 +60,8 @@ start(Module, SockMod, Socket, Opts) -> {error, _Reason} -> SockMod:close(Socket) end, - gen_server:start(?MODULE, - [Module, SockMod, Socket, Opts, Receiver], []); + supervisor:start_child(ejabberd_frontend_socket_sup, + [Module, SockMod, Socket, Opts, Receiver]); raw -> %{ok, Pid} = Module:start({SockMod, Socket}, Opts), %case SockMod:controlling_process(Socket, Pid) of @@ -129,8 +129,7 @@ peername(FsmRef) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init([Module, SockMod, Socket, Opts, Receiver]) -> - Nodes = mnesia:table_info(schema, disc_copies), - Node = lists:nth(erlang:phash(now(), length(Nodes)), Nodes), + Node = ejabberd_node_groups:get_closest_node(backend), {ok, Pid} = rpc:call(Node, Module, start, [{?MODULE, self()}, Opts]), ejabberd_receiver:become_controller(Receiver, Pid), diff --git a/src/ejabberd_node_groups.erl b/src/ejabberd_node_groups.erl new file mode 100644 index 000000000..5f5d755b2 --- /dev/null +++ b/src/ejabberd_node_groups.erl @@ -0,0 +1,142 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_node_groups.erl +%%% Author : Alexey Shchepin +%%% Purpose : Distributed named node groups based on pg2 module +%%% Created : 1 Nov 2006 by Alexey Shchepin +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(ejabberd_node_groups). +-author('alexey@process-one.net'). +-vsn('$Revision$ '). + +-behaviour(gen_server). + +%% API +-export([start_link/0, + join/1, + leave/1, + get_members/1, + get_closest_node/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +join(Name) -> + PG = {?MODULE, Name}, + pg2:create(PG), + pg2:join(PG, whereis(?MODULE)). + +leave(Name) -> + PG = {?MODULE, Name}, + pg2:leave(PG, whereis(?MODULE)). + +get_members(Name) -> + PG = {?MODULE, Name}, + [node(P) || P <- pg2:get_members(PG)]. + +get_closest_node(Name) -> + PG = {?MODULE, Name}, + node(pg2:get_closest_pid(PG)). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([]) -> + {FE, BE} = + case ejabberd_config:get_local_option(node_type) of + frontend -> + {true, false}; + backend -> + {false, true}; + generic -> + {true, true}; + undefined -> + {true, true} + end, + if + FE -> + join(frontend); + true -> + ok + end, + if + BE -> + join(backend); + true -> + ok + end, + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index 5e3127f9f..35c8d621c 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -220,7 +220,6 @@ code_change(_OldVsn, State, _Extra) -> do_route(OrigFrom, OrigTo, OrigPacket) -> ?DEBUG("route~n\tfrom ~p~n\tto ~p~n\tpacket ~p~n", [OrigFrom, OrigTo, OrigPacket]), - LOrigDstDomain = OrigTo#jid.lserver, case ejabberd_hooks:run_fold(filter_packet, {OrigFrom, OrigTo, OrigPacket}, []) of {From, To, Packet} -> @@ -244,12 +243,25 @@ do_route(OrigFrom, OrigTo, OrigPacket) -> Rs -> case [R || R <- Rs, node(R#route.pid) == node()] of [] -> - R = lists:nth(erlang:phash(now(), length(Rs)), Rs), + Value = case ejabberd_config:get_local_option( + {domain_balancing, LDstDomain}) of + source -> jlib:jid_tolower(From); + destination -> jlib:jid_tolower(To); + random -> now(); + undefined -> now() + end, + R = lists:nth(erlang:phash(Value, length(Rs)), Rs), Pid = R#route.pid, Pid ! {route, From, To, Packet}; LRs -> - LRs, - R = lists:nth(erlang:phash(now(), length(LRs)), LRs), + Value = case ejabberd_config:get_local_option( + {domain_balancing, LDstDomain}) of + source -> jlib:jid_tolower(From); + destination -> jlib:jid_tolower(To); + random -> now(); + undefined -> now() + end, + R = lists:nth(erlang:phash(Value, length(LRs)), LRs), Pid = R#route.pid, case R#route.local_hint of {apply, Module, Function} -> diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 1e6851dc2..e093b6a2b 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -33,6 +33,13 @@ init([]) -> brutal_kill, worker, [stringprep]}, + NodeGroups = + {ejabberd_node_groups, + {ejabberd_node_groups, start_link, []}, + permanent, + brutal_kill, + worker, + [ejabberd_node_groups]}, Router = {ejabberd_router, {ejabberd_router, start_link, []}, @@ -123,6 +130,14 @@ init([]) -> infinity, supervisor, [ejabberd_tmp_sup]}, + FrontendSocketSupervisor = + {ejabberd_frontend_socket_sup, + {ejabberd_tmp_sup, start_link, + [ejabberd_frontend_socket_sup, ejabberd_frontend_socket]}, + permanent, + infinity, + supervisor, + [ejabberd_tmp_sup]}, IQSupervisor = {ejabberd_iq_sup, {ejabberd_tmp_sup, start_link, @@ -134,6 +149,7 @@ init([]) -> {ok, {{one_for_one, 10, 1}, [Hooks, StringPrep, + NodeGroups, Router, SM, S2S, @@ -146,6 +162,7 @@ init([]) -> HTTPSupervisor, HTTPPollSupervisor, IQSupervisor, + FrontendSocketSupervisor, Listener]}}.