]> granicus.if.org Git - ejabberd/commitdiff
Refactor ejabberd listener API
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Mon, 17 Sep 2018 08:21:02 +0000 (11:21 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Mon, 17 Sep 2018 08:21:02 +0000 (11:21 +0300)
13 files changed:
rebar.config
src/ejabberd_bosh.erl
src/ejabberd_c2s.erl
src/ejabberd_http.erl
src/ejabberd_http_ws.erl
src/ejabberd_listener.erl
src/ejabberd_s2s_in.erl
src/ejabberd_service.erl
src/ejabberd_sip.erl
src/ejabberd_stun.erl
src/ejabberd_xmlrpc.erl
src/mod_proxy65.erl
src/mod_proxy65_stream.erl

index 29a6b0e1150e6a38d8e4b21d0dca1067781fed64..12c1c2def635248e260bade03212b886664e1620 100644 (file)
@@ -25,7 +25,7 @@
         {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "f36ea5b74526c2c1c9c38f8d473168d95804f59d"}},
         {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}},
         {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.32"}}},
-        {xmpp, ".*", {git, "https://github.com/processone/xmpp", "8d85c4b"}},
+        {xmpp, ".*", {git, "https://github.com/processone/xmpp", "2ce0626"}},
         {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.15"}}},
         {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}},
         {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}},
index 1a650803ebb179a19dc2da180d9f02f36296d2de..e39a67132aacb48481fcef5f8cf943686fa5ef56 100644 (file)
 %%%
 %%%-------------------------------------------------------------------
 -module(ejabberd_bosh).
-
+-behaviour(xmpp_socket).
+-behaviour(p1_fsm).
 -protocol({xep, 124, '1.11'}).
 -protocol({xep, 206, '1.4'}).
 
--behaviour(p1_fsm).
-
 %% API
 -export([start/2, start/3, start_link/3]).
 
 -export([send_xml/2, setopts/2, controlling_process/2,
-        migrate/3, become_controller/2,
-        reset_stream/1, change_shaper/2, monitor/1, close/1,
+        reset_stream/1, change_shaper/2, close/1,
         sockname/1, peername/1, process_request/3, send/2,
-        change_controller/2]).
+        get_transport/1, get_owner/1]).
 
 %% gen_fsm callbacks
 -export([init/1, wait_for_session/2, wait_for_session/3,
@@ -167,22 +165,12 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
 
 controlling_process(_Socket, _Pid) -> ok.
 
-become_controller(FsmRef, C2SPid) ->
-    p1_fsm:send_all_state_event(FsmRef,
-                                   {become_controller, C2SPid}).
-
-change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
-    become_controller(FsmRef, C2SPid).
-
 reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
     Socket.
 
 change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
     p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
 
-monitor({http_bind, FsmRef, _IP}) ->
-    erlang:monitor(process, FsmRef).
-
 close({http_bind, FsmRef, _IP}) ->
     catch p1_fsm:sync_send_all_state_event(FsmRef,
                                               close).
@@ -191,10 +179,11 @@ sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}.
 
 peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
 
-migrate(FsmRef, Node, After) when node(FsmRef) == node() ->
-    catch erlang:send_after(After, FsmRef, {migrate, Node});
-migrate(_FsmRef, _Node, _After) ->
-    ok.
+get_transport(_Socket) ->
+    http_bind.
+
+get_owner({http_bind, FsmRef, _IP}) ->
+    FsmRef.
 
 process_request(Data, IP, Type) ->
     Opts1 = ejabberd_c2s_config:get_c2s_limits(),
@@ -295,30 +284,26 @@ init([#body{attrs = Attrs}, IP, SID]) ->
                                     buf_new(XMPPDomain)),
                              Opts2}
                    end,
-    xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
-                     [{receiver, self()}|Opts]),
-    Inactivity = gen_mod:get_module_opt(XMPPDomain,
-                                       mod_bosh, max_inactivity),
-    MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat),
-    ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
-    State = #state{host = XMPPDomain, sid = SID, ip = IP,
-                  xmpp_ver = XMPPVer, el_ibuf = InBuf,
-                  max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
-                  inactivity_timeout = Inactivity,
-                  shaped_receivers = ShapedReceivers,
-                  shaper_state = ShaperState},
-    NewState = restart_inactivity_timer(State),
-    mod_bosh:open_session(SID, self()),
-    {ok, wait_for_session, NewState};
-init([StateName, State]) ->
-    mod_bosh:open_session(State#state.sid, self()),
-    case State#state.c2s_pid of
-      C2SPid when is_pid(C2SPid) ->
-         NewSocket = make_socket(self(), State#state.ip),
-         C2SPid ! {change_socket, NewSocket},
-         NewState = restart_inactivity_timer(State),
-         {ok, StateName, NewState};
-      _ -> {stop, normal}
+    case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
+       {ok, C2SPid} ->
+           ejabberd_c2s:accept(C2SPid),
+           Inactivity = gen_mod:get_module_opt(XMPPDomain,
+                                               mod_bosh, max_inactivity),
+           MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat),
+           ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
+           State = #state{host = XMPPDomain, sid = SID, ip = IP,
+                          xmpp_ver = XMPPVer, el_ibuf = InBuf,
+                          max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
+                          inactivity_timeout = Inactivity,
+                          shaped_receivers = ShapedReceivers,
+                          shaper_state = ShaperState},
+           NewState = restart_inactivity_timer(State),
+           mod_bosh:open_session(SID, self()),
+           {ok, wait_for_session, NewState};
+       {error, Reason} ->
+           {stop, Reason};
+       ignore ->
+           ignore
     end.
 
 wait_for_session(_Event, State) ->
@@ -525,7 +510,7 @@ active1(#body{attrs = Attrs} = Req, From, State) ->
           end
     end.
 
-handle_event({become_controller, C2SPid}, StateName,
+handle_event({activate, C2SPid}, StateName,
             State) ->
     State1 = route_els(State#state{c2s_pid = C2SPid}),
     {next_state, StateName, State1};
@@ -598,24 +583,11 @@ handle_info({timeout, TRef, shaper_timeout}, StateName,
          {stop, normal, State};
       _ -> {next_state, StateName, State}
     end;
-handle_info({migrate, Node}, StateName, State) ->
-    if Node /= node() ->
-          NewState = bounce_receivers(State, migrated),
-          {migrate, NewState,
-           {Node, ?MODULE, start, [StateName, NewState]}, 0};
-       true -> {next_state, StateName, State}
-    end;
 handle_info(_Info, StateName, State) ->
     ?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p",
               [_Info, StateName]),
     {next_state, StateName, State}.
 
-terminate({migrated, ClonePid}, _StateName, State) ->
-    ?INFO_MSG("Migrating session \"~s\" (c2s_pid = "
-             "~p) to ~p on node ~p",
-             [State#state.sid, State#state.c2s_pid, ClonePid,
-              node(ClonePid)]),
-    mod_bosh:close_session(State#state.sid);
 terminate(_Reason, _StateName, State) ->
     mod_bosh:close_session(State#state.sid),
     case State#state.c2s_pid of
@@ -718,7 +690,7 @@ do_reply(State, From, Body, RID) ->
     Responses2 = gb_trees:insert(RID, Body, Responses1),
     State#state{responses = Responses2}.
 
-bounce_receivers(State, Reason) ->
+bounce_receivers(State, _Reason) ->
     Receivers = gb_trees:to_list(State#state.receivers),
     ShapedReceivers = lists:map(fun ({_, From,
                                      #body{attrs = Attrs} = Body}) ->
@@ -726,18 +698,13 @@ bounce_receivers(State, Reason) ->
                                        {RID, {From, Body}}
                                end,
                                p1_queue:to_list(State#state.shaped_receivers)),
-    lists:foldl(fun ({RID, {From, Body}}, AccState) ->
-                       NewBody = if Reason == closed ->
-                                        #body{http_reason =
-                                                  <<"Session closed">>,
-                                              attrs =
-                                                  [{type, <<"terminate">>},
-                                                   {condition,
-                                                    <<"other-request">>}]};
-                                    Reason == migrated ->
-                                        Body#body{http_reason =
-                                                      <<"Session migrated">>}
-                                 end,
+    lists:foldl(fun ({RID, {From, _Body}}, AccState) ->
+                       NewBody = #body{http_reason =
+                                           <<"Session closed">>,
+                                       attrs =
+                                           [{type, <<"terminate">>},
+                                            {condition,
+                                             <<"other-request">>}]},
                        do_reply(AccState, From, NewBody, RID)
                end,
                State, Receivers ++ ShapedReceivers).
index 76166db9e7cbc66062d482b19beb6d4acb208917..a6434c9746dccd079e1af5045c4aaf8bd427c544 100644 (file)
 -module(ejabberd_c2s).
 -behaviour(xmpp_stream_in).
 -behaviour(ejabberd_config).
--behaviour(xmpp_socket).
+-behaviour(ejabberd_listener).
 
 -protocol({rfc, 6121}).
 
-%% xmpp_socket callbacks
--export([start/2, start_link/2, socket_type/0]).
+%% ejabberd_listener callbacks
+-export([start/2, start_link/2, accept/1, listen_opt_type/1]).
 %% ejabberd_config callbacks
--export([opt_type/1, listen_opt_type/1, transform_listen_option/2]).
+-export([opt_type/1, transform_listen_option/2]).
 %% xmpp_stream_in callbacks
 -export([init/1, handle_call/3, handle_cast/2,
         handle_info/2, terminate/2, code_change/3]).
 -export_type([state/0]).
 
 %%%===================================================================
-%%% xmpp_socket API
+%%% ejabberd_listener API
 %%%===================================================================
 start(SockData, Opts) ->
-    case proplists:get_value(supervisor, Opts, true) of
-       true ->
-           case supervisor:start_child(ejabberd_c2s_sup, [SockData, Opts]) of
-               {ok, undefined} -> ignore;
-               Res -> Res
-           end;
-       _ ->
-           xmpp_stream_in:start(?MODULE, [SockData, Opts],
-                                ejabberd_config:fsm_limit_opts(Opts))
-    end.
+    xmpp_stream_in:start(?MODULE, [SockData, Opts],
+                        ejabberd_config:fsm_limit_opts(Opts)).
 
 start_link(SockData, Opts) ->
     xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
                              ejabberd_config:fsm_limit_opts(Opts)).
 
-socket_type() ->
-    xml_stream.
+accept(Ref) ->
+    xmpp_stream_in:accept(Ref).
 
 %%%===================================================================
 %%% Common API
index db9182cd8856414feede9017ba46eef2e398b54d..4212c519b9918f45c8237df8179075335f3e737a 100644 (file)
 %%%----------------------------------------------------------------------
 
 -module(ejabberd_http).
-
+-behaviour(ejabberd_listener).
 -behaviour(ejabberd_config).
 
 -author('alexey@process-one.net').
 
 %% External exports
--export([start/2, start_link/2, become_controller/1,
-        socket_type/0, receive_headers/1, recv_file/2,
+-export([start/2, start_link/2,
+        accept/1, receive_headers/1, recv_file/2,
          transform_listen_option/2, listen_opt_type/1]).
 
 -export([init/2, opt_type/1]).
@@ -164,12 +164,9 @@ init({SockMod, Socket}, Opts) ->
         {error, _} -> State
     end.
 
-become_controller(_Pid) ->
+accept(_Pid) ->
     ok.
 
-socket_type() ->
-    raw.
-
 send_text(_State, none) ->
     ok;
 send_text(State, Text) ->
index a9d98b882fd7a69e6eacbf7fbf36242ccc2f4a92..d10dbd108a887b69f78f11204312b7ce07e8b51f 100644 (file)
 %%%
 %%%----------------------------------------------------------------------
 -module(ejabberd_http_ws).
-
--behaviour(ejabberd_config).
-
 -author('ecestari@process-one.net').
-
+-behaviour(ejabberd_config).
+-behaviour(xmpp_socket).
 -behaviour(p1_fsm).
 
 -export([start/1, start_link/1, init/1, handle_event/3,
         handle_sync_event/4, code_change/4, handle_info/3,
         terminate/3, send_xml/2, setopts/2, sockname/1,
-        peername/1, controlling_process/2, become_controller/2,
-        monitor/1, reset_stream/1, close/1, change_shaper/2,
-        socket_handoff/3, opt_type/1]).
+        peername/1, controlling_process/2, get_owner/1,
+        reset_stream/1, close/1, change_shaper/2,
+        socket_handoff/3, get_transport/1, opt_type/1]).
 
 -include("logger.hrl").
 
@@ -54,8 +52,8 @@
          timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(),
          timer = make_ref()           :: reference(),
          input = []                   :: list(),
-         waiting_input = false        :: false | pid(),
-         last_receiver = self()       :: pid(),
+        active = false               :: boolean(),
+        c2s_pid                      :: pid(),
          ws                           :: {#ws{}, pid()},
          rfc_compilant = undefined    :: boolean() | undefined}).
 
@@ -104,15 +102,9 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
 
 controlling_process(_Socket, _Pid) -> ok.
 
-become_controller(FsmRef, C2SPid) ->
-    p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}).
-
 close({http_ws, FsmRef, _IP}) ->
     catch p1_fsm:sync_send_all_state_event(FsmRef, close).
 
-monitor({http_ws, FsmRef, _IP}) ->
-    erlang:monitor(process, FsmRef).
-
 reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
     Socket.
 
@@ -120,6 +112,12 @@ change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
     %% TODO???
     ok.
 
+get_transport(_Socket) ->
+    websocket.
+
+get_owner({http_ws, FsmRef, _IP}) ->
+    FsmRef.
+
 socket_handoff(LocalPath, Request, Opts) ->
     ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0).
 
@@ -145,31 +143,34 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
     Socket = {http_ws, self(), IP},
     ?DEBUG("Client connected through websocket ~p",
           [Socket]),
-    xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
-                     [{receiver, self()}|Opts]),
-    Timer = erlang:start_timer(WSTimeout, self(), []),
-    {ok, loop,
-     #state{socket = Socket, timeout = WSTimeout,
-            timer = Timer, ws = WS,
-            ping_interval = PingInterval}}.
-
-handle_event({activate, From}, StateName, StateData) ->
-    case StateData#state.input of
-      [] ->
-            {next_state, StateName,
-             StateData#state{waiting_input = From}};
-      Input ->
-            Receiver = From,
-            lists:foreach(fun(I) when is_binary(I)->
-                                  Receiver ! {tcp, StateData#state.socket, I};
-                             (I2) ->
-                                  Receiver ! {tcp, StateData#state.socket, [I2]}
-                          end, Input),
-            {next_state, StateName,
-             StateData#state{input = [], waiting_input = false,
-                             last_receiver = Receiver}}
+    case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
+       {ok, C2SPid} ->
+           ejabberd_c2s:accept(C2SPid),
+           Timer = erlang:start_timer(WSTimeout, self(), []),
+           {ok, loop,
+            #state{socket = Socket, timeout = WSTimeout,
+                   timer = Timer, ws = WS, c2s_pid = C2SPid,
+                   ping_interval = PingInterval}};
+       {error, Reason} ->
+           {stop, Reason};
+       ignore ->
+           ignore
     end.
 
+handle_event({activate, From}, StateName, State) ->
+    State1 = case State#state.input of
+                [] -> State#state{active = true};
+                Input ->
+                    lists:foreach(
+                      fun(I) when is_binary(I)->
+                              From ! {tcp, State#state.socket, I};
+                         (I2) ->
+                              From ! {tcp, State#state.socket, [I2]}
+                      end, Input),
+                    State#state{active = false, input = []}
+            end,
+    {next_state, StateName, State1#state{c2s_pid = From}}.
+
 handle_sync_event({send_xml, Packet}, _From, StateName,
                  #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
     Packet2 = case {case R of undefined -> true; V -> V end, Packet} of
@@ -233,14 +234,13 @@ handle_info(closed, _StateName, StateData) ->
     {stop, normal, StateData};
 handle_info({received, Packet}, StateName, StateDataI) ->
     {StateData, Parsed} = parse(StateDataI, Packet),
-    SD = case StateData#state.waiting_input of
+    SD = case StateData#state.active of
              false ->
                  Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end,
                  StateData#state{input = Input};
-             Receiver ->
-                 Receiver ! {tcp, StateData#state.socket, Parsed},
-                 setup_timers(StateData#state{waiting_input = false,
-                                              last_receiver = Receiver})
+             true ->
+                 StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed},
+                 setup_timers(StateData#state{active = false})
          end,
     {next_state, StateName, SD};
 handle_info(PingPong, StateName, StateData) when PingPong == ping orelse
@@ -273,13 +273,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
     {ok, StateName, StateData}.
 
 terminate(_Reason, _StateName, StateData) ->
-    case StateData#state.waiting_input of
-      false -> ok;
-      Receiver ->
-         ?DEBUG("C2S Pid : ~p", [Receiver]),
-         Receiver ! {tcp_closed, StateData#state.socket}
-    end,
-    ok.
+    StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}.
 
 setup_timers(StateData) ->
     misc:cancel_timer(StateData#state.timer),
index 54bc877ccf7eb8580773c7e6d0800d2ea0854d9e..3c5192c4a583d19ff067931cc17e3e9c9110bd30 100644 (file)
 
 -include("logger.hrl").
 
+-callback start({gen_tcp, inet:socket()}, [proplists:property()]) ->
+    {ok, pid()} | {error, any()} | ignore.
+-callback start_link({gen_tcp, inet:socket()}, [proplists:property()]) ->
+    {ok, pid()} | {error, any()} | ignore.
+-callback accept(pid()) -> any().
+-callback listen_opt_type(atom()) -> fun((atom()) -> term()) | [atom()].
+
 %% We do not block on send anymore.
 -define(TCP_SEND_TIMEOUT, 15000).
 
@@ -81,11 +88,7 @@ report_duplicated_portips(L) ->
 
 start(Port, Module, Opts) ->
     NewOpts = validate_module_options(Module, Opts),
-    %% Check if the module is an ejabberd listener or an independent listener
-    case Module:socket_type() of
-       independent -> Module:start_listener(Port, NewOpts);
-       _ -> start_dependent(Port, Module, NewOpts)
-    end.
+    start_dependent(Port, Module, NewOpts).
 
 %% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage}
 start_dependent(Port, Module, Opts) ->
@@ -109,7 +112,6 @@ init_udp(PortIP, Module, Opts, SockOpts, Port) ->
            %% Inform my parent that this port was opened successfully
            proc_lib:init_ack({ok, self()}),
            application:ensure_started(ejabberd),
-           start_module_sup(Port, Module),
            ?INFO_MSG("Start accepting UDP connections at ~s for ~p",
                      [format_portip(PortIP), Module]),
            case erlang:function_exported(Module, udp_init, 2) of
@@ -136,21 +138,21 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port) ->
        {ok, ListenSocket} ->
            proc_lib:init_ack({ok, self()}),
            application:ensure_started(ejabberd),
-           start_module_sup(Port, Module),
+           Sup = start_module_sup(Module, Opts),
            ?INFO_MSG("Start accepting TCP connections at ~s for ~p",
                      [format_portip(PortIP), Module]),
            case erlang:function_exported(Module, tcp_init, 2) of
                false ->
-                   accept(ListenSocket, Module, Opts);
+                   accept(ListenSocket, Module, Opts, Sup);
                true ->
                    case catch Module:tcp_init(ListenSocket, Opts) of
                        {'EXIT', _} = Err ->
                            ?ERROR_MSG("failed to process callback function "
                                       "~p:~s(~p, ~p): ~p",
                                       [Module, tcp_init, ListenSocket, Opts, Err]),
-                           accept(ListenSocket, Module, Opts);
+                           accept(ListenSocket, Module, Opts, Sup);
                        NewOpts ->
-                           accept(ListenSocket, Module, NewOpts)
+                           accept(ListenSocket, Module, NewOpts, Sup)
                    end
            end;
        {error, _} = Err ->
@@ -240,20 +242,22 @@ get_ip_tuple(no_ip_option, inet6) ->
 get_ip_tuple(IPOpt, _IPVOpt) ->
     IPOpt.
 
-accept(ListenSocket, Module, Opts) ->
+accept(ListenSocket, Module, Opts, Sup) ->
     Interval = proplists:get_value(accept_interval, Opts, 0),
-    accept(ListenSocket, Module, Opts, Interval).
+    accept(ListenSocket, Module, Opts, Sup, Interval).
 
-accept(ListenSocket, Module, Opts, Interval) ->
+accept(ListenSocket, Module, Opts, Sup, Interval) ->
     NewInterval = check_rate_limit(Interval),
     case gen_tcp:accept(ListenSocket) of
        {ok, Socket} ->
            case {inet:sockname(Socket), inet:peername(Socket)} of
                {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} ->
-                   Receiver = case xmpp_socket:start(Module,
-                                                         gen_tcp, Socket, Opts) of
-                                  {ok, RecvPid} -> RecvPid;
-                                  _ -> none
+                   Receiver = case start_connection(Module, Socket, Opts, Sup) of
+                                  {ok, RecvPid} ->
+                                      RecvPid;
+                                  _ ->
+                                      gen_tcp:close(Socket),
+                                      none
                               end,
                    ?INFO_MSG("(~p) Accepted connection ~s:~p -> ~s:~p",
                              [Receiver,
@@ -262,11 +266,11 @@ accept(ListenSocket, Module, Opts, Interval) ->
                _ ->
                    gen_tcp:close(Socket)
            end,
-           accept(ListenSocket, Module, Opts, NewInterval);
+           accept(ListenSocket, Module, Opts, Sup, NewInterval);
        {error, Reason} ->
            ?ERROR_MSG("(~w) Failed TCP accept: ~s",
                        [ListenSocket, inet:format_error(Reason)]),
-           accept(ListenSocket, Module, Opts, NewInterval)
+           accept(ListenSocket, Module, Opts, Sup, NewInterval)
     end.
 
 udp_recv(Socket, Module, Opts) ->
@@ -287,6 +291,25 @@ udp_recv(Socket, Module, Opts) ->
            throw({error, Reason})
     end.
 
+start_connection(Module, Socket, Opts, Sup) ->
+    Res = case Sup of
+             undefined -> Module:start({gen_tcp, Socket}, Opts);
+             _ -> supervisor:start_child(Sup, [{gen_tcp, Socket}, Opts])
+         end,
+    case Res of
+       {ok, Pid} ->
+           case gen_tcp:controlling_process(Socket, Pid) of
+               ok ->
+                   Module:accept(Pid),
+                   {ok, Pid};
+               Err ->
+                   exit(Pid, kill),
+                   Err
+           end;
+       Err ->
+           Err
+    end.
+
 %% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
 start_listener(Port, Module, Opts) ->
     case start_listener2(Port, Module, Opts) of
@@ -309,16 +332,21 @@ start_listener2(Port, Module, Opts) ->
     %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
     start_listener_sup(Port, Module, Opts).
 
-start_module_sup(_Port, Module) ->
-    Proc1 = gen_mod:get_module_proc(<<"sup">>, Module),
-    ChildSpec1 =
-       {Proc1,
-        {ejabberd_tmp_sup, start_link, [Proc1, Module]},
-        permanent,
-        infinity,
-        supervisor,
-        [ejabberd_tmp_sup]},
-    supervisor:start_child(ejabberd_sup, ChildSpec1).
+-spec start_module_sup(module(), [proplists:property()]) -> atom().
+start_module_sup(Module, Opts) ->
+    case proplists:get_value(supervisor, Opts, true) of
+       true ->
+           Proc = list_to_atom(atom_to_list(Module) ++ "_sup"),
+           ChildSpec = {Proc, {ejabberd_tmp_sup, start_link, [Proc, Module]},
+                        permanent,
+                        infinity,
+                        supervisor,
+                        [ejabberd_tmp_sup]},
+           supervisor:start_child(ejabberd_sup, ChildSpec),
+           Proc;
+       false ->
+           undefined
+    end.
 
 start_listener_sup(Port, Module, Opts) ->
     ChildSpec = {Port,
index 62227984e8414c34f8140b5ad76ce2513d64a49c..5166004b8d5e054aed28c3a1efb112fd1d7287cf 100644 (file)
 %%%-------------------------------------------------------------------
 -module(ejabberd_s2s_in).
 -behaviour(xmpp_stream_in).
--behaviour(xmpp_socket).
+-behaviour(ejabberd_listener).
 
-%% xmpp_socket callbacks
--export([start/2, start_link/2, socket_type/0]).
 %% ejabberd_listener callbacks
--export([listen_opt_type/1]).
+-export([start/2, start_link/2, accept/1, listen_opt_type/1]).
 %% xmpp_stream_in callbacks
 -export([init/1, handle_call/3, handle_cast/2,
         handle_info/2, terminate/2, code_change/3]).
 %%% API
 %%%===================================================================
 start(SockData, Opts) ->
-    case proplists:get_value(supervisor, Opts, true) of
-       true ->
-           case supervisor:start_child(ejabberd_s2s_in_sup, [SockData, Opts]) of
-               {ok, undefined} -> ignore;
-               Res -> Res
-           end;
-       _ ->
-           xmpp_stream_in:start(?MODULE, [SockData, Opts],
-                                ejabberd_config:fsm_limit_opts(Opts))
-    end.
+    xmpp_stream_in:start(?MODULE, [SockData, Opts],
+                        ejabberd_config:fsm_limit_opts(Opts)).
 
 start_link(SockData, Opts) ->
     xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
@@ -77,8 +67,8 @@ close(Ref, Reason) ->
 stop(Ref) ->
     xmpp_stream_in:stop(Ref).
 
-socket_type() ->
-    xml_stream.
+accept(Ref) ->
+    xmpp_stream_in:accept(Ref).
 
 -spec send(pid(), xmpp_element()) -> ok;
          (state(), xmpp_element()) -> state().
index c39a023ab185a5e468488dcf333baaa17b38b279..0655a5f54f930d58c735f97141a918946e9008ca 100644 (file)
 %%%-------------------------------------------------------------------
 -module(ejabberd_service).
 -behaviour(xmpp_stream_in).
--behaviour(xmpp_socket).
+-behaviour(ejabberd_listener).
 
 -protocol({xep, 114, '1.6'}).
 
-%% xmpp_socket callbacks
--export([start/2, start_link/2, socket_type/0, close/1, close/2]).
 %% ejabberd_listener callbacks
+-export([start/2, start_link/2, accept/1]).
 -export([listen_opt_type/1, transform_listen_option/2]).
 %% xmpp_stream_in callbacks
 -export([init/1, handle_info/2, terminate/2, code_change/3]).
 -export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4,
         handle_authenticated_packet/2, get_password_fun/1, tls_options/1]).
 %% API
--export([send/2]).
+-export([send/2, close/1, close/2]).
 
 -include("xmpp.hrl").
 -include("logger.hrl").
@@ -53,8 +52,8 @@ start_link(SockData, Opts) ->
     xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
                              ejabberd_config:fsm_limit_opts(Opts)).
 
-socket_type() ->
-    xml_stream.
+accept(Ref) ->
+    xmpp_stream_in:accept(Ref).
 
 -spec send(pid(), xmpp_element()) -> ok;
          (state(), xmpp_element()) -> state().
index e49fb4841fbc40faa9fd047aee98bc2deb0d5121..effcb8a04c87664816bd26ff2c407aaead842316 100644 (file)
 %%%-------------------------------------------------------------------
 
 -module(ejabberd_sip).
+-behaviour(ejabberd_listener).
 
 -ifndef(SIP).
 -include("logger.hrl").
--export([socket_type/0, start/2, listen_opt_type/1]).
+-export([accept/1, start/2, start_link/2, listen_opt_type/1]).
 log_error() ->
     ?CRITICAL_MSG("ejabberd is not compiled with SIP support", []).
-socket_type() ->
+accept(_) ->
     log_error(),
-    raw.
+    ok.
 listen_opt_type(_) ->
     log_error(),
     [].
 start(_, _) ->
     log_error(),
     {error, sip_not_compiled}.
+start_link(_, _) ->
+    log_error(),
+    {error, sip_not_compiled}.
 -else.
 %% API
 -export([tcp_init/2, udp_init/2, udp_recv/5, start/2,
-        socket_type/0, listen_opt_type/1]).
+        start_link/2, accept/1, listen_opt_type/1]).
 
 
 %%%===================================================================
@@ -62,8 +66,11 @@ udp_recv(Sock, Addr, Port, Data, Opts) ->
 start(Opaque, Opts) ->
     esip_socket:start(Opaque, Opts).
 
-socket_type() ->
-    raw.
+start_link({gen_tcp, Sock}, Opts) ->
+    esip_socket:start_link(Sock, Opts).
+
+accept(_) ->
+    ok.
 
 set_certfile(Opts) ->
     case lists:keymember(certfile, 1, Opts) of
index 53ecd5cc180e8b09705add8bd34ee1f46f499e58..e2f9f8f8f7aedfd9ea209d0af19aa5cfa8af18c3 100644 (file)
 %%%-------------------------------------------------------------------
 
 -module(ejabberd_stun).
-
+-behaviour(ejabberd_listener).
 -protocol({rfc, 5766}).
 -protocol({xep, 176, '1.0'}).
 
 -ifndef(STUN).
 -include("logger.hrl").
--export([socket_type/0, start/2, listen_opt_type/1]).
+-export([accept/1, start/2, start_link/2, listen_opt_type/1]).
 log_error() ->
     ?CRITICAL_MSG("ejabberd is not compiled with STUN/TURN support", []).
-socket_type() ->
+accept(_) ->
     log_error(),
-    raw.
+    ok.
 listen_opt_type(_) ->
     log_error(),
     [].
 start(_, _) ->
     log_error(),
     {error, sip_not_compiled}.
+start_link(_, _) ->
+    log_error(),
+    {error, sip_not_compiled}.
 -else.
 -export([tcp_init/2, udp_init/2, udp_recv/5, start/2,
-        socket_type/0, listen_opt_type/1]).
+        start_link/2, accept/1, listen_opt_type/1]).
 
 -include("logger.hrl").
 
@@ -65,8 +68,11 @@ udp_recv(Socket, Addr, Port, Packet, Opts) ->
 start(Opaque, Opts) ->
     stun:start(Opaque, Opts).
 
-socket_type() ->
-    raw.
+start_link({gen_tcp, Sock}, Opts) ->
+    stun:start_link(Sock, Opts).
+
+accept(_Pid) ->
+    ok.
 
 %%%===================================================================
 %%% Internal functions
index 68a774d4b308af516c8328ae795c1e0f810bd065..a6afe585c8e7bd6a4531c3f699256296e5f5cda9 100644 (file)
 %%% TODO: commands strings should be strings without ~n
 
 -module(ejabberd_xmlrpc).
+-behaviour(ejabberd_listener).
 
 -author('badlop@process-one.net').
 
--export([start/2, handler/2, process/2, socket_type/0,
+-export([start/2, start_link/2, handler/2, process/2, accept/1,
         transform_listen_option/2, listen_opt_type/1]).
 
 -include("logger.hrl").
 start({gen_tcp = _SockMod, Socket}, Opts) ->
     ejabberd_http:start({gen_tcp, Socket}, [{xmlrpc, true}|Opts]).
 
-socket_type() -> raw.
+start_link({gen_tcp = _SockMod, Socket}, Opts) ->
+    ejabberd_http:start_link({gen_tcp, Socket}, [{xmlrpc, true}|Opts]).
+
+accept(Pid) ->
+    ejabberd_http:accept(Pid).
 
 %% -----------------------------
 %% HTTP interface
index cc3546cf20e3eaa2863dd0cc507e888b158d9927..c911dd7aa8726164972b210f81ac102aa8a6a7f3 100644 (file)
@@ -99,15 +99,7 @@ init([Host, Opts]) ->
     Service = {mod_proxy65_service,
               {mod_proxy65_service, start_link, [Host, Opts]},
               transient, 5000, worker, [mod_proxy65_service]},
-    StreamSupervisor = {ejabberd_mod_proxy65_sup,
-                       {ejabberd_tmp_sup, start_link,
-                        [gen_mod:get_module_proc(Host,
-                                                 ejabberd_mod_proxy65_sup),
-                         mod_proxy65_stream]},
-                       transient, infinity, supervisor, [ejabberd_tmp_sup]},
-    {ok,
-     {{one_for_one, 10, 1},
-      [StreamSupervisor, Service]}}.
+    {ok, {{one_for_one, 10, 1}, [Service]}}.
 
 depends(_Host, _Opts) ->
     [].
index 4e7aa0334c9fc99b84e7e7ccd4b3b6c28e8d75f6..6688178687a0b153b254da9f487658c7c695d06f 100644 (file)
 -author('xram@jabber.ru').
 
 -behaviour(p1_fsm).
+-behaviour(ejabberd_listener).
 
 %% gen_fsm callbacks.
 -export([init/1, handle_event/3, handle_sync_event/4,
         code_change/4, handle_info/3, terminate/3]).
 
 %% gen_fsm states.
--export([wait_for_init/2, wait_for_auth/2,
+-export([accepting/2, wait_for_init/2, wait_for_auth/2,
         wait_for_request/2, wait_for_activation/2,
         stream_established/2]).
 
--export([start/2, stop/1, start_link/3, activate/2,
-        relay/3, socket_type/0, listen_opt_type/1,
+-export([start/2, stop/1, start_link/2, start_link/3, activate/2,
+        relay/3, accept/1, listen_opt_type/1,
         listen_options/0]).
 
 -include("mod_proxy65.hrl").
@@ -69,10 +70,14 @@ start({gen_tcp, Socket}, Opts1) ->
                                      fun({server_host, _}) -> true;
                                         (_) -> false
                                      end, Opts1),
-    Supervisor = gen_mod:get_module_proc(Host,
-                                        ejabberd_mod_proxy65_sup),
-    supervisor:start_child(Supervisor,
-                          [Socket, Host, Opts]).
+    p1_fsm:start(?MODULE, [Socket, Host, Opts], []).
+
+start_link({gen_tcp, Socket}, Opts1) ->
+    {[{server_host, Host}], Opts} = lists:partition(
+                                     fun({server_host, _}) -> true;
+                                        (_) -> false
+                                     end, Opts1),
+    start_link(Socket, Host, Opts).
 
 start_link(Socket, Host, Opts) ->
     p1_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
@@ -84,9 +89,8 @@ init([Socket, Host, Opts]) ->
     RecvBuf = gen_mod:get_opt(recbuf, Opts),
     SendBuf = gen_mod:get_opt(sndbuf, Opts),
     TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
-    inet:setopts(Socket,
-                [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]),
-    {ok, wait_for_init,
+    inet:setopts(Socket, [{recbuf, RecvBuf}, {sndbuf, SendBuf}]),
+    {ok, accepting,
      #state{host = Host, auth_type = AuthType,
            socket = Socket, shaper = Shaper, timer = TRef}}.
 
@@ -101,7 +105,8 @@ terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
 %%%------------------------------
 %%% API.
 %%%------------------------------
-socket_type() -> raw.
+accept(StreamPid) ->
+    p1_fsm:send_event(StreamPid, accept).
 
 stop(StreamPid) -> StreamPid ! stop.
 
@@ -125,6 +130,10 @@ activate({P1, J1}, {P2, J2}) ->
 %%%-----------------------
 %%% States
 %%%-----------------------
+accepting(accept, State) ->
+    inet:setopts(State#state.socket, [{active, true}]),
+    {next_state, wait_for_init, State}.
+
 wait_for_init(Packet,
              #state{socket = Socket, auth_type = AuthType} =
                  StateData) ->