]> granicus.if.org Git - ejabberd/commitdiff
open up to 3 s2s outgoing connection per domain pair
authorMickaël Rémond <mickael.remond@process-one.net>
Fri, 14 Sep 2007 14:15:44 +0000 (14:15 +0000)
committerMickaël Rémond <mickael.remond@process-one.net>
Fri, 14 Sep 2007 14:15:44 +0000 (14:15 +0000)
SVN Revision: 928

src/ejabberd_s2s.erl
src/ejabberd_s2s_in.erl
src/ejabberd_s2s_out.erl

index c9e40efbcec3a7c69ed6f4265c19eecff905ae58..e208702729e574bfa5b00ff918b452daec62b60b 100644 (file)
@@ -3,12 +3,12 @@
 %%% Author  : Alexey Shchepin <alexey@sevcom.net>
 %%% Purpose : S2S connections manager
 %%% Created :  7 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
-%%% Id      : $Id$
+%%% Id      : $Id: ejabberd_s2s.erl 820 2007-07-19 21:17:13Z mremond $
 %%%----------------------------------------------------------------------
 
 -module(ejabberd_s2s).
 -author('alexey@sevcom.net').
--vsn('$Revision$ ').
+-vsn('$Revision: 820 $ ').
 
 -behaviour(gen_server).
 
@@ -16,9 +16,8 @@
 -export([start_link/0,
         route/3,
         have_connection/1,
-        get_key/1,
+        has_key/2,
         try_register/1,
-        remove_connection/1,
         remove_connection/3,
         dirty_get_connections/0,
         allow_host/2,
@@ -55,14 +54,9 @@ route(From, To, Packet) ->
            ok
     end.
 
-remove_connection(FromTo) ->
-    F = fun() ->
-               mnesia:delete({s2s, FromTo})
-       end,
-    mnesia:transaction(F).
-
 remove_connection(FromTo, Pid, Key) ->
-    case catch mnesia:dirty_read(s2s, FromTo) of
+    ?ERROR_MSG("XXXXXXXXXXX ~p~n", [{FromTo, Pid, Key}]),
+    case catch mnesia:dirty_match_object(s2s, {s2s, FromTo, Pid, '_'}) of
        [#s2s{pid = Pid, key = Key}] ->
            F = fun() ->
                        mnesia:delete_object(#s2s{fromto = FromTo,
@@ -82,23 +76,27 @@ have_connection(FromTo) ->
            false
     end.
 
-get_key(FromTo) ->
-    case catch mnesia:dirty_read(s2s, FromTo) of
-       [E] ->
-           E#s2s.key;
+has_key(FromTo, Key) ->
+    case mnesia:dirty_select(s2s,
+                            [{#s2s{fromto = FromTo, key = Key, _ = '_'},
+                              [],
+                              ['$_']}]) of
+       [] ->
+           false;
        _ ->
-           error
+           true
     end.
 
 try_register(FromTo) ->
     Key = randoms:get_string(),
+    Max_S2S_Connexions_Number = 3,
     F = fun() ->
                case mnesia:read({s2s, FromTo}) of
-                   [] ->
-                       mnesia:write(#s2s{fromto = FromTo,
-                                         pid = self(),
-                                         key = Key}),
-                       {key, Key};
+                   L when length(L) < Max_S2S_Connexions_Number ->
+                          mnesia:write(#s2s{fromto = FromTo,
+                                            pid = self(),
+                                            key = Key}),
+                          {key, Key};
                    _ ->
                        false
                end
@@ -126,9 +124,10 @@ dirty_get_connections() ->
 %%--------------------------------------------------------------------
 init([]) ->
     update_tables(),
-    mnesia:create_table(s2s, [{ram_copies, [node()]},
+    mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag},
                              {attributes, record_info(fields, s2s)}]),
     mnesia:add_table_copy(s2s, node(), ram_copies),
+    mnesia:add_table_index(s2s, key),
     mnesia:subscribe(system),
     ejabberd_ctl:register_commands(
       [{"incoming-s2s-number", "print number of incoming s2s connections on the node"},
@@ -224,7 +223,7 @@ do_route(From, To, Packet) ->
                                                  Attrs),
            send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
            ok;
-       {aborted, Reason} ->
+       {aborted, _Reason} ->
            case xml:get_tag_attr_s("type", Packet) of
                "error" -> ok;
                "result" -> ok;
@@ -240,6 +239,8 @@ find_connection(From, To) ->
     #jid{lserver = MyServer} = From,
     #jid{lserver = Server} = To,
     FromTo = {MyServer, Server},
+    Max_S2S_Connexions_Number = 3,
+    ?ERROR_MSG("XXX Finding connection for ~p~n", [FromTo]),
     case catch mnesia:dirty_read(s2s, FromTo) of
        {'EXIT', Reason} ->
            {aborted, Reason};
@@ -250,36 +251,49 @@ find_connection(From, To) ->
            case {is_service(From, To),
                  allow_host(MyServer, Server)} of
                {false, true} ->
-                   ?DEBUG("starting new s2s connection~n", []),
-                   Key = randoms:get_string(),
-                   {ok, Pid} = ejabberd_s2s_out:start(
-                                 MyServer, Server, {new, Key}),
-                   F = fun() ->
-                               case mnesia:read({s2s, FromTo}) of
-                                   [El] ->
-                                       El#s2s.pid;
-                                   [] ->
-                                       mnesia:write(#s2s{fromto = FromTo,
-                                                         pid = Pid,
-                                                         key = Key}),
-                                       Pid
-                               end
-                       end,
-                   TRes = mnesia:transaction(F),
-                   case TRes of
-                       {atomic, Pid} ->
-                           ejabberd_s2s_out:start_connection(Pid);
-                       _ ->
-                           ejabberd_s2s_out:stop_connection(Pid)
-                   end,
-                   TRes;
+                   new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number);
                _ ->
                    {aborted, error}
            end;
-       [El] ->
-           {atomic, El#s2s.pid}
+       L when is_list(L) , length(L) < Max_S2S_Connexions_Number ->
+           %% We establish another connection for this pair.
+           new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number);
+       L when is_list(L) ->
+           %% We choose a connexion from the pool of opened ones.
+           {atomic, choose_connection(From, L)}
     end.
 
+choose_connection(From, Connections) ->
+    El = lists:nth(erlang:phash(From, length(Connections)), Connections),
+    %El = lists:nth(random:uniform(length(Connections)), Connections),
+    ?ERROR_MSG("XXX using ejabberd_s2s_out ~p~n", [El#s2s.pid]),
+    El#s2s.pid.
+
+new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number) ->
+    Key = randoms:get_string(),
+    {ok, Pid} = ejabberd_s2s_out:start(
+                 MyServer, Server, {new, Key}),
+    F = fun() ->
+               case mnesia:read({s2s, FromTo}) of
+                   L when length(L) < Max_S2S_Connexions_Number ->
+                       mnesia:write(#s2s{fromto = FromTo,
+                                         pid = Pid,
+                                         key = Key}),
+                       ?ERROR_MSG("XXX new s2s connection started ~p~n", [Pid]),
+                       Pid;
+                   L ->
+                       choose_connection(From, L)
+               end
+       end,
+    TRes = mnesia:transaction(F),
+    case TRes of
+       {atomic, Pid} ->
+           ejabberd_s2s_out:start_connection(Pid);
+       _ ->
+           ejabberd_s2s_out:stop_connection(Pid)
+    end,
+    TRes.
+
 
 %%--------------------------------------------------------------------
 %% Function: is_service(From, To) -> true | false
@@ -321,6 +335,15 @@ ctl_process(Val, _Args) ->
     Val.
 
 update_tables() ->
+    case catch mnesia:table_info(s2s, type) of
+       bag ->
+           ok;
+       {'EXIT', _} ->
+           ok;
+       _ ->
+           % XXX TODO convert it ?
+           mnesia:delete_table(s2s)
+    end,
     case catch mnesia:table_info(s2s, attributes) of
        [fromto, node, key] ->
            mnesia:transform_table(s2s, ignore, [fromto, pid, key]),
@@ -349,5 +372,3 @@ allow_host(MyServer, S2SHost) ->
                _ -> true %% The default s2s policy is allow
            end
     end.
-    
-    
index 969be69d93e487b18cc5ceea392c22071ef8a573..81cdf665f2949425fab629f12309b217eb9264ad 100644 (file)
@@ -3,7 +3,7 @@
 %%% Author  : Alexey Shchepin <alexey@sevcom.net>
 %%% Purpose : Serve incoming s2s connection
 %%% Created :  6 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
-%%% Id      : $Id$
+%%% Id      : $Id: ejabberd_s2s_in.erl 820 2007-07-19 21:17:13Z mremond $
 %%%----------------------------------------------------------------------
 
 -module(ejabberd_s2s_in).
@@ -74,7 +74,7 @@
 -define(HOST_UNKNOWN_ERR,
        xml:element_to_string(?SERR_HOST_UNKNOWN)).
 
--define(INVALID_FROM_ERR,                             
+-define(INVALID_FROM_ERR,
         xml:element_to_string(?SERR_INVALID_FROM)).
 
 -define(INVALID_XML_ERR,
@@ -101,7 +101,7 @@ socket_type() ->
 %% Returns: {ok, StateName, StateData}          |
 %%          {ok, StateName, StateData, Timeout} |
 %%          ignore                              |
-%%          {stop, StopReason}                   
+%%          {stop, StopReason}
 %%----------------------------------------------------------------------
 init([{SockMod, Socket}, Opts]) ->
     ?INFO_MSG("started: ~p", [Socket]),
@@ -136,7 +136,7 @@ init([{SockMod, Socket}, Opts]) ->
 %% Func: StateName/2
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 
 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
@@ -312,8 +312,8 @@ stream_established({xmlstreamelement, El}, StateData) ->
            ?INFO_MSG("GET KEY: ~p", [{To, From, Id, Key}]),
            LTo = jlib:nameprep(To),
            LFrom = jlib:nameprep(From),
-           %% Checks if the from domain is allowed and if the to               
-            %% domain is handled by this server:                                
+           %% Checks if the from domain is allowed and if the to
+            %% domain is handled by this server:
             case {ejabberd_s2s:allow_host(To, From),
                   lists:member(LTo, ejabberd_router:dirty_get_all_domains())} of
                 {true, true} ->
@@ -338,10 +338,13 @@ stream_established({xmlstreamelement, El}, StateData) ->
            ?INFO_MSG("VERIFY KEY: ~p", [{To, From, Id, Key}]),
            LTo = jlib:nameprep(To),
            LFrom = jlib:nameprep(From),
-           Key1 = ejabberd_s2s:get_key({LTo, LFrom}),
-           Type = if Key == Key1 -> "valid";
-                     true -> "invalid"
+           Type = case ejabberd_s2s:has_key({LTo, LFrom}, Key) of
+                      true -> "valid";
+                      _ -> "invalid"
                   end,
+           %Type = if Key == Key1 -> "valid";
+           % true -> "invalid"
+           % end,
            send_element(StateData,
                         {xmlelement,
                          "db:verify",
@@ -456,7 +459,7 @@ stream_established(closed, StateData) ->
 %%          {reply, Reply, NextStateName, NextStateData}          |
 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}                          |
-%%          {stop, Reason, Reply, NewStateData}                    
+%%          {stop, Reason, Reply, NewStateData}
 %%----------------------------------------------------------------------
 %state_name(Event, From, StateData) ->
 %    Reply = ok,
@@ -466,7 +469,7 @@ stream_established(closed, StateData) ->
 %% Func: handle_event/3
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 handle_event(_Event, StateName, StateData) ->
     {next_state, StateName, StateData}.
@@ -478,7 +481,7 @@ handle_event(_Event, StateName, StateData) ->
 %%          {reply, Reply, NextStateName, NextStateData}          |
 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}                          |
-%%          {stop, Reason, Reply, NewStateData}                    
+%%          {stop, Reason, Reply, NewStateData}
 %%----------------------------------------------------------------------
 handle_sync_event(_Event, _From, StateName, StateData) ->
     Reply = ok,
@@ -491,7 +494,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
 %% Func: handle_info/3
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 handle_info({send_text, Text}, StateName, StateData) ->
     send_text(StateData, Text),
@@ -677,5 +680,3 @@ match_labels([DL | DLabels], [PL | PLabels]) ->
        false ->
            false
     end.
-
-
index 363f03120f318f68984574997ac56a8202d06030..8bf756a168101ab148c25045fe75f4898b292e0c 100644 (file)
@@ -107,7 +107,7 @@ stop_connection(Pid) ->
 %% Returns: {ok, StateName, StateData}          |
 %%          {ok, StateName, StateData, Timeout} |
 %%          ignore                              |
-%%          {stop, StopReason}                   
+%%          {stop, StopReason}
 %%----------------------------------------------------------------------
 init([From, Server, Type]) ->
     process_flag(trap_exit, true),
@@ -146,7 +146,7 @@ init([From, Server, Type]) ->
 %% Func: StateName/2
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 open_socket(init, StateData) ->
     ?INFO_MSG("open_socket: ~p", [{StateData#state.myname,
@@ -583,7 +583,7 @@ stream_established(closed, StateData) ->
 %%          {reply, Reply, NextStateName, NextStateData}          |
 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}                          |
-%%          {stop, Reason, Reply, NewStateData}                    
+%%          {stop, Reason, Reply, NewStateData}
 %%----------------------------------------------------------------------
 %state_name(Event, From, StateData) ->
 %    Reply = ok,
@@ -593,7 +593,7 @@ stream_established(closed, StateData) ->
 %% Func: handle_event/3
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 handle_event(Event, StateName, StateData) ->
     {next_state, StateName, StateData}.
@@ -605,7 +605,7 @@ handle_event(Event, StateName, StateData) ->
 %%          {reply, Reply, NextStateName, NextStateData}          |
 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}                          |
-%%          {stop, Reason, Reply, NewStateData}                    
+%%          {stop, Reason, Reply, NewStateData}
 %%----------------------------------------------------------------------
 handle_sync_event(Event, From, StateName, StateData) ->
     Reply = ok,
@@ -618,7 +618,7 @@ code_change(OldVsn, StateName, StateData, Extra) ->
 %% Func: handle_info/3
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                         
+%%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
 handle_info({send_text, Text}, StateName, StateData) ->
     send_text(StateData, Text),
@@ -848,4 +848,3 @@ test_get_addr_port(Server) ->
                        lists:keyreplace(HostPort, 1, Acc, {HostPort, Num + 1})
                end
          end, [], lists:seq(1, 100000)).
-