From de385591d01deec5a498feef33cd4eb3f8a12b77 Mon Sep 17 00:00:00 2001 From: Evgeny Khramtsov Date: Mon, 17 Sep 2018 11:21:02 +0300 Subject: [PATCH] Refactor ejabberd listener API --- rebar.config | 2 +- src/ejabberd_bosh.erl | 109 +++++++++++++------------------------ src/ejabberd_c2s.erl | 26 +++------ src/ejabberd_http.erl | 11 ++-- src/ejabberd_http_ws.erl | 94 +++++++++++++++----------------- src/ejabberd_listener.erl | 86 +++++++++++++++++++---------- src/ejabberd_s2s_in.erl | 22 ++------ src/ejabberd_service.erl | 11 ++-- src/ejabberd_sip.erl | 19 +++++-- src/ejabberd_stun.erl | 20 ++++--- src/ejabberd_xmlrpc.erl | 9 ++- src/mod_proxy65.erl | 10 +--- src/mod_proxy65_stream.erl | 31 +++++++---- 13 files changed, 218 insertions(+), 232 deletions(-) diff --git a/rebar.config b/rebar.config index 29a6b0e11..12c1c2def 100644 --- a/rebar.config +++ b/rebar.config @@ -25,7 +25,7 @@ {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "f36ea5b74526c2c1c9c38f8d473168d95804f59d"}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.32"}}}, - {xmpp, ".*", {git, "https://github.com/processone/xmpp", "8d85c4b"}}, + {xmpp, ".*", {git, "https://github.com/processone/xmpp", "2ce0626"}}, {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.15"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}}, {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}}, diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 1a650803e..e39a67132 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -23,20 +23,18 @@ %%% %%%------------------------------------------------------------------- -module(ejabberd_bosh). - +-behaviour(xmpp_socket). +-behaviour(p1_fsm). -protocol({xep, 124, '1.11'}). -protocol({xep, 206, '1.4'}). --behaviour(p1_fsm). - %% API -export([start/2, start/3, start_link/3]). -export([send_xml/2, setopts/2, controlling_process/2, - migrate/3, become_controller/2, - reset_stream/1, change_shaper/2, monitor/1, close/1, + reset_stream/1, change_shaper/2, close/1, sockname/1, peername/1, process_request/3, send/2, - change_controller/2]). + get_transport/1, get_owner/1]). %% gen_fsm callbacks -export([init/1, wait_for_session/2, wait_for_session/3, @@ -167,22 +165,12 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. -become_controller(FsmRef, C2SPid) -> - p1_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). - -change_controller({http_bind, FsmRef, _IP}, C2SPid) -> - become_controller(FsmRef, C2SPid). - reset_stream({http_bind, _FsmRef, _IP} = Socket) -> Socket. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). -monitor({http_bind, FsmRef, _IP}) -> - erlang:monitor(process, FsmRef). - close({http_bind, FsmRef, _IP}) -> catch p1_fsm:sync_send_all_state_event(FsmRef, close). @@ -191,10 +179,11 @@ sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}. peername({http_bind, _FsmRef, IP}) -> {ok, IP}. -migrate(FsmRef, Node, After) when node(FsmRef) == node() -> - catch erlang:send_after(After, FsmRef, {migrate, Node}); -migrate(_FsmRef, _Node, _After) -> - ok. +get_transport(_Socket) -> + http_bind. + +get_owner({http_bind, FsmRef, _IP}) -> + FsmRef. process_request(Data, IP, Type) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), @@ -295,30 +284,26 @@ init([#body{attrs = Attrs}, IP, SID]) -> buf_new(XMPPDomain)), Opts2} end, - xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, - [{receiver, self()}|Opts]), - Inactivity = gen_mod:get_module_opt(XMPPDomain, - mod_bosh, max_inactivity), - MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat), - ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), - State = #state{host = XMPPDomain, sid = SID, ip = IP, - xmpp_ver = XMPPVer, el_ibuf = InBuf, - max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), - inactivity_timeout = Inactivity, - shaped_receivers = ShapedReceivers, - shaper_state = ShaperState}, - NewState = restart_inactivity_timer(State), - mod_bosh:open_session(SID, self()), - {ok, wait_for_session, NewState}; -init([StateName, State]) -> - mod_bosh:open_session(State#state.sid, self()), - case State#state.c2s_pid of - C2SPid when is_pid(C2SPid) -> - NewSocket = make_socket(self(), State#state.ip), - C2SPid ! {change_socket, NewSocket}, - NewState = restart_inactivity_timer(State), - {ok, StateName, NewState}; - _ -> {stop, normal} + case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of + {ok, C2SPid} -> + ejabberd_c2s:accept(C2SPid), + Inactivity = gen_mod:get_module_opt(XMPPDomain, + mod_bosh, max_inactivity), + MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat), + ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), + State = #state{host = XMPPDomain, sid = SID, ip = IP, + xmpp_ver = XMPPVer, el_ibuf = InBuf, + max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), + inactivity_timeout = Inactivity, + shaped_receivers = ShapedReceivers, + shaper_state = ShaperState}, + NewState = restart_inactivity_timer(State), + mod_bosh:open_session(SID, self()), + {ok, wait_for_session, NewState}; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore end. wait_for_session(_Event, State) -> @@ -525,7 +510,7 @@ active1(#body{attrs = Attrs} = Req, From, State) -> end end. -handle_event({become_controller, C2SPid}, StateName, +handle_event({activate, C2SPid}, StateName, State) -> State1 = route_els(State#state{c2s_pid = C2SPid}), {next_state, StateName, State1}; @@ -598,24 +583,11 @@ handle_info({timeout, TRef, shaper_timeout}, StateName, {stop, normal, State}; _ -> {next_state, StateName, State} end; -handle_info({migrate, Node}, StateName, State) -> - if Node /= node() -> - NewState = bounce_receivers(State, migrated), - {migrate, NewState, - {Node, ?MODULE, start, [StateName, NewState]}, 0}; - true -> {next_state, StateName, State} - end; handle_info(_Info, StateName, State) -> ?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p", [_Info, StateName]), {next_state, StateName, State}. -terminate({migrated, ClonePid}, _StateName, State) -> - ?INFO_MSG("Migrating session \"~s\" (c2s_pid = " - "~p) to ~p on node ~p", - [State#state.sid, State#state.c2s_pid, ClonePid, - node(ClonePid)]), - mod_bosh:close_session(State#state.sid); terminate(_Reason, _StateName, State) -> mod_bosh:close_session(State#state.sid), case State#state.c2s_pid of @@ -718,7 +690,7 @@ do_reply(State, From, Body, RID) -> Responses2 = gb_trees:insert(RID, Body, Responses1), State#state{responses = Responses2}. -bounce_receivers(State, Reason) -> +bounce_receivers(State, _Reason) -> Receivers = gb_trees:to_list(State#state.receivers), ShapedReceivers = lists:map(fun ({_, From, #body{attrs = Attrs} = Body}) -> @@ -726,18 +698,13 @@ bounce_receivers(State, Reason) -> {RID, {From, Body}} end, p1_queue:to_list(State#state.shaped_receivers)), - lists:foldl(fun ({RID, {From, Body}}, AccState) -> - NewBody = if Reason == closed -> - #body{http_reason = - <<"Session closed">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"other-request">>}]}; - Reason == migrated -> - Body#body{http_reason = - <<"Session migrated">>} - end, + lists:foldl(fun ({RID, {From, _Body}}, AccState) -> + NewBody = #body{http_reason = + <<"Session closed">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"other-request">>}]}, do_reply(AccState, From, NewBody, RID) end, State, Receivers ++ ShapedReceivers). diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 76166db9e..a6434c974 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -22,14 +22,14 @@ -module(ejabberd_c2s). -behaviour(xmpp_stream_in). -behaviour(ejabberd_config). --behaviour(xmpp_socket). +-behaviour(ejabberd_listener). -protocol({rfc, 6121}). -%% xmpp_socket callbacks --export([start/2, start_link/2, socket_type/0]). +%% ejabberd_listener callbacks +-export([start/2, start_link/2, accept/1, listen_opt_type/1]). %% ejabberd_config callbacks --export([opt_type/1, listen_opt_type/1, transform_listen_option/2]). +-export([opt_type/1, transform_listen_option/2]). %% xmpp_stream_in callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -61,26 +61,18 @@ -export_type([state/0]). %%%=================================================================== -%%% xmpp_socket API +%%% ejabberd_listener API %%%=================================================================== start(SockData, Opts) -> - case proplists:get_value(supervisor, Opts, true) of - true -> - case supervisor:start_child(ejabberd_c2s_sup, [SockData, Opts]) of - {ok, undefined} -> ignore; - Res -> Res - end; - _ -> - xmpp_stream_in:start(?MODULE, [SockData, Opts], - ejabberd_config:fsm_limit_opts(Opts)) - end. + xmpp_stream_in:start(?MODULE, [SockData, Opts], + ejabberd_config:fsm_limit_opts(Opts)). start_link(SockData, Opts) -> xmpp_stream_in:start_link(?MODULE, [SockData, Opts], ejabberd_config:fsm_limit_opts(Opts)). -socket_type() -> - xml_stream. +accept(Ref) -> + xmpp_stream_in:accept(Ref). %%%=================================================================== %%% Common API diff --git a/src/ejabberd_http.erl b/src/ejabberd_http.erl index db9182cd8..4212c519b 100644 --- a/src/ejabberd_http.erl +++ b/src/ejabberd_http.erl @@ -24,14 +24,14 @@ %%%---------------------------------------------------------------------- -module(ejabberd_http). - +-behaviour(ejabberd_listener). -behaviour(ejabberd_config). -author('alexey@process-one.net'). %% External exports --export([start/2, start_link/2, become_controller/1, - socket_type/0, receive_headers/1, recv_file/2, +-export([start/2, start_link/2, + accept/1, receive_headers/1, recv_file/2, transform_listen_option/2, listen_opt_type/1]). -export([init/2, opt_type/1]). @@ -164,12 +164,9 @@ init({SockMod, Socket}, Opts) -> {error, _} -> State end. -become_controller(_Pid) -> +accept(_Pid) -> ok. -socket_type() -> - raw. - send_text(_State, none) -> ok; send_text(State, Text) -> diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index a9d98b882..d10dbd108 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -23,19 +23,17 @@ %%% %%%---------------------------------------------------------------------- -module(ejabberd_http_ws). - --behaviour(ejabberd_config). - -author('ecestari@process-one.net'). - +-behaviour(ejabberd_config). +-behaviour(xmpp_socket). -behaviour(p1_fsm). -export([start/1, start_link/1, init/1, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3, send_xml/2, setopts/2, sockname/1, - peername/1, controlling_process/2, become_controller/2, - monitor/1, reset_stream/1, close/1, change_shaper/2, - socket_handoff/3, opt_type/1]). + peername/1, controlling_process/2, get_owner/1, + reset_stream/1, close/1, change_shaper/2, + socket_handoff/3, get_transport/1, opt_type/1]). -include("logger.hrl"). @@ -54,8 +52,8 @@ timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(), timer = make_ref() :: reference(), input = [] :: list(), - waiting_input = false :: false | pid(), - last_receiver = self() :: pid(), + active = false :: boolean(), + c2s_pid :: pid(), ws :: {#ws{}, pid()}, rfc_compilant = undefined :: boolean() | undefined}). @@ -104,15 +102,9 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}. controlling_process(_Socket, _Pid) -> ok. -become_controller(FsmRef, C2SPid) -> - p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}). - close({http_ws, FsmRef, _IP}) -> catch p1_fsm:sync_send_all_state_event(FsmRef, close). -monitor({http_ws, FsmRef, _IP}) -> - erlang:monitor(process, FsmRef). - reset_stream({http_ws, _FsmRef, _IP} = Socket) -> Socket. @@ -120,6 +112,12 @@ change_shaper({http_ws, _FsmRef, _IP}, _Shaper) -> %% TODO??? ok. +get_transport(_Socket) -> + websocket. + +get_owner({http_ws, FsmRef, _IP}) -> + FsmRef. + socket_handoff(LocalPath, Request, Opts) -> ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0). @@ -145,31 +143,34 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) -> Socket = {http_ws, self(), IP}, ?DEBUG("Client connected through websocket ~p", [Socket]), - xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, - [{receiver, self()}|Opts]), - Timer = erlang:start_timer(WSTimeout, self(), []), - {ok, loop, - #state{socket = Socket, timeout = WSTimeout, - timer = Timer, ws = WS, - ping_interval = PingInterval}}. - -handle_event({activate, From}, StateName, StateData) -> - case StateData#state.input of - [] -> - {next_state, StateName, - StateData#state{waiting_input = From}}; - Input -> - Receiver = From, - lists:foreach(fun(I) when is_binary(I)-> - Receiver ! {tcp, StateData#state.socket, I}; - (I2) -> - Receiver ! {tcp, StateData#state.socket, [I2]} - end, Input), - {next_state, StateName, - StateData#state{input = [], waiting_input = false, - last_receiver = Receiver}} + case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of + {ok, C2SPid} -> + ejabberd_c2s:accept(C2SPid), + Timer = erlang:start_timer(WSTimeout, self(), []), + {ok, loop, + #state{socket = Socket, timeout = WSTimeout, + timer = Timer, ws = WS, c2s_pid = C2SPid, + ping_interval = PingInterval}}; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore end. +handle_event({activate, From}, StateName, State) -> + State1 = case State#state.input of + [] -> State#state{active = true}; + Input -> + lists:foreach( + fun(I) when is_binary(I)-> + From ! {tcp, State#state.socket, I}; + (I2) -> + From ! {tcp, State#state.socket, [I2]} + end, Input), + State#state{active = false, input = []} + end, + {next_state, StateName, State1#state{c2s_pid = From}}. + handle_sync_event({send_xml, Packet}, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> Packet2 = case {case R of undefined -> true; V -> V end, Packet} of @@ -233,14 +234,13 @@ handle_info(closed, _StateName, StateData) -> {stop, normal, StateData}; handle_info({received, Packet}, StateName, StateDataI) -> {StateData, Parsed} = parse(StateDataI, Packet), - SD = case StateData#state.waiting_input of + SD = case StateData#state.active of false -> Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end, StateData#state{input = Input}; - Receiver -> - Receiver ! {tcp, StateData#state.socket, Parsed}, - setup_timers(StateData#state{waiting_input = false, - last_receiver = Receiver}) + true -> + StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed}, + setup_timers(StateData#state{active = false}) end, {next_state, StateName, SD}; handle_info(PingPong, StateName, StateData) when PingPong == ping orelse @@ -273,13 +273,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. terminate(_Reason, _StateName, StateData) -> - case StateData#state.waiting_input of - false -> ok; - Receiver -> - ?DEBUG("C2S Pid : ~p", [Receiver]), - Receiver ! {tcp_closed, StateData#state.socket} - end, - ok. + StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}. setup_timers(StateData) -> misc:cancel_timer(StateData#state.timer), diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 54bc877cc..3c5192c4a 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -36,6 +36,13 @@ -include("logger.hrl"). +-callback start({gen_tcp, inet:socket()}, [proplists:property()]) -> + {ok, pid()} | {error, any()} | ignore. +-callback start_link({gen_tcp, inet:socket()}, [proplists:property()]) -> + {ok, pid()} | {error, any()} | ignore. +-callback accept(pid()) -> any(). +-callback listen_opt_type(atom()) -> fun((atom()) -> term()) | [atom()]. + %% We do not block on send anymore. -define(TCP_SEND_TIMEOUT, 15000). @@ -81,11 +88,7 @@ report_duplicated_portips(L) -> start(Port, Module, Opts) -> NewOpts = validate_module_options(Module, Opts), - %% Check if the module is an ejabberd listener or an independent listener - case Module:socket_type() of - independent -> Module:start_listener(Port, NewOpts); - _ -> start_dependent(Port, Module, NewOpts) - end. + start_dependent(Port, Module, NewOpts). %% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage} start_dependent(Port, Module, Opts) -> @@ -109,7 +112,6 @@ init_udp(PortIP, Module, Opts, SockOpts, Port) -> %% Inform my parent that this port was opened successfully proc_lib:init_ack({ok, self()}), application:ensure_started(ejabberd), - start_module_sup(Port, Module), ?INFO_MSG("Start accepting UDP connections at ~s for ~p", [format_portip(PortIP), Module]), case erlang:function_exported(Module, udp_init, 2) of @@ -136,21 +138,21 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port) -> {ok, ListenSocket} -> proc_lib:init_ack({ok, self()}), application:ensure_started(ejabberd), - start_module_sup(Port, Module), + Sup = start_module_sup(Module, Opts), ?INFO_MSG("Start accepting TCP connections at ~s for ~p", [format_portip(PortIP), Module]), case erlang:function_exported(Module, tcp_init, 2) of false -> - accept(ListenSocket, Module, Opts); + accept(ListenSocket, Module, Opts, Sup); true -> case catch Module:tcp_init(ListenSocket, Opts) of {'EXIT', _} = Err -> ?ERROR_MSG("failed to process callback function " "~p:~s(~p, ~p): ~p", [Module, tcp_init, ListenSocket, Opts, Err]), - accept(ListenSocket, Module, Opts); + accept(ListenSocket, Module, Opts, Sup); NewOpts -> - accept(ListenSocket, Module, NewOpts) + accept(ListenSocket, Module, NewOpts, Sup) end end; {error, _} = Err -> @@ -240,20 +242,22 @@ get_ip_tuple(no_ip_option, inet6) -> get_ip_tuple(IPOpt, _IPVOpt) -> IPOpt. -accept(ListenSocket, Module, Opts) -> +accept(ListenSocket, Module, Opts, Sup) -> Interval = proplists:get_value(accept_interval, Opts, 0), - accept(ListenSocket, Module, Opts, Interval). + accept(ListenSocket, Module, Opts, Sup, Interval). -accept(ListenSocket, Module, Opts, Interval) -> +accept(ListenSocket, Module, Opts, Sup, Interval) -> NewInterval = check_rate_limit(Interval), case gen_tcp:accept(ListenSocket) of {ok, Socket} -> case {inet:sockname(Socket), inet:peername(Socket)} of {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} -> - Receiver = case xmpp_socket:start(Module, - gen_tcp, Socket, Opts) of - {ok, RecvPid} -> RecvPid; - _ -> none + Receiver = case start_connection(Module, Socket, Opts, Sup) of + {ok, RecvPid} -> + RecvPid; + _ -> + gen_tcp:close(Socket), + none end, ?INFO_MSG("(~p) Accepted connection ~s:~p -> ~s:~p", [Receiver, @@ -262,11 +266,11 @@ accept(ListenSocket, Module, Opts, Interval) -> _ -> gen_tcp:close(Socket) end, - accept(ListenSocket, Module, Opts, NewInterval); + accept(ListenSocket, Module, Opts, Sup, NewInterval); {error, Reason} -> ?ERROR_MSG("(~w) Failed TCP accept: ~s", [ListenSocket, inet:format_error(Reason)]), - accept(ListenSocket, Module, Opts, NewInterval) + accept(ListenSocket, Module, Opts, Sup, NewInterval) end. udp_recv(Socket, Module, Opts) -> @@ -287,6 +291,25 @@ udp_recv(Socket, Module, Opts) -> throw({error, Reason}) end. +start_connection(Module, Socket, Opts, Sup) -> + Res = case Sup of + undefined -> Module:start({gen_tcp, Socket}, Opts); + _ -> supervisor:start_child(Sup, [{gen_tcp, Socket}, Opts]) + end, + case Res of + {ok, Pid} -> + case gen_tcp:controlling_process(Socket, Pid) of + ok -> + Module:accept(Pid), + {ok, Pid}; + Err -> + exit(Pid, kill), + Err + end; + Err -> + Err + end. + %% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error} start_listener(Port, Module, Opts) -> case start_listener2(Port, Module, Opts) of @@ -309,16 +332,21 @@ start_listener2(Port, Module, Opts) -> %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}} start_listener_sup(Port, Module, Opts). -start_module_sup(_Port, Module) -> - Proc1 = gen_mod:get_module_proc(<<"sup">>, Module), - ChildSpec1 = - {Proc1, - {ejabberd_tmp_sup, start_link, [Proc1, Module]}, - permanent, - infinity, - supervisor, - [ejabberd_tmp_sup]}, - supervisor:start_child(ejabberd_sup, ChildSpec1). +-spec start_module_sup(module(), [proplists:property()]) -> atom(). +start_module_sup(Module, Opts) -> + case proplists:get_value(supervisor, Opts, true) of + true -> + Proc = list_to_atom(atom_to_list(Module) ++ "_sup"), + ChildSpec = {Proc, {ejabberd_tmp_sup, start_link, [Proc, Module]}, + permanent, + infinity, + supervisor, + [ejabberd_tmp_sup]}, + supervisor:start_child(ejabberd_sup, ChildSpec), + Proc; + false -> + undefined + end. start_listener_sup(Port, Module, Opts) -> ChildSpec = {Port, diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index 62227984e..5166004b8 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -21,12 +21,10 @@ %%%------------------------------------------------------------------- -module(ejabberd_s2s_in). -behaviour(xmpp_stream_in). --behaviour(xmpp_socket). +-behaviour(ejabberd_listener). -%% xmpp_socket callbacks --export([start/2, start_link/2, socket_type/0]). %% ejabberd_listener callbacks --export([listen_opt_type/1]). +-export([start/2, start_link/2, accept/1, listen_opt_type/1]). %% xmpp_stream_in callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -53,16 +51,8 @@ %%% API %%%=================================================================== start(SockData, Opts) -> - case proplists:get_value(supervisor, Opts, true) of - true -> - case supervisor:start_child(ejabberd_s2s_in_sup, [SockData, Opts]) of - {ok, undefined} -> ignore; - Res -> Res - end; - _ -> - xmpp_stream_in:start(?MODULE, [SockData, Opts], - ejabberd_config:fsm_limit_opts(Opts)) - end. + xmpp_stream_in:start(?MODULE, [SockData, Opts], + ejabberd_config:fsm_limit_opts(Opts)). start_link(SockData, Opts) -> xmpp_stream_in:start_link(?MODULE, [SockData, Opts], @@ -77,8 +67,8 @@ close(Ref, Reason) -> stop(Ref) -> xmpp_stream_in:stop(Ref). -socket_type() -> - xml_stream. +accept(Ref) -> + xmpp_stream_in:accept(Ref). -spec send(pid(), xmpp_element()) -> ok; (state(), xmpp_element()) -> state(). diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index c39a023ab..0655a5f54 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -21,20 +21,19 @@ %%%------------------------------------------------------------------- -module(ejabberd_service). -behaviour(xmpp_stream_in). --behaviour(xmpp_socket). +-behaviour(ejabberd_listener). -protocol({xep, 114, '1.6'}). -%% xmpp_socket callbacks --export([start/2, start_link/2, socket_type/0, close/1, close/2]). %% ejabberd_listener callbacks +-export([start/2, start_link/2, accept/1]). -export([listen_opt_type/1, transform_listen_option/2]). %% xmpp_stream_in callbacks -export([init/1, handle_info/2, terminate/2, code_change/3]). -export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4, handle_authenticated_packet/2, get_password_fun/1, tls_options/1]). %% API --export([send/2]). +-export([send/2, close/1, close/2]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -53,8 +52,8 @@ start_link(SockData, Opts) -> xmpp_stream_in:start_link(?MODULE, [SockData, Opts], ejabberd_config:fsm_limit_opts(Opts)). -socket_type() -> - xml_stream. +accept(Ref) -> + xmpp_stream_in:accept(Ref). -spec send(pid(), xmpp_element()) -> ok; (state(), xmpp_element()) -> state(). diff --git a/src/ejabberd_sip.erl b/src/ejabberd_sip.erl index e49fb4841..effcb8a04 100644 --- a/src/ejabberd_sip.erl +++ b/src/ejabberd_sip.erl @@ -24,25 +24,29 @@ %%%------------------------------------------------------------------- -module(ejabberd_sip). +-behaviour(ejabberd_listener). -ifndef(SIP). -include("logger.hrl"). --export([socket_type/0, start/2, listen_opt_type/1]). +-export([accept/1, start/2, start_link/2, listen_opt_type/1]). log_error() -> ?CRITICAL_MSG("ejabberd is not compiled with SIP support", []). -socket_type() -> +accept(_) -> log_error(), - raw. + ok. listen_opt_type(_) -> log_error(), []. start(_, _) -> log_error(), {error, sip_not_compiled}. +start_link(_, _) -> + log_error(), + {error, sip_not_compiled}. -else. %% API -export([tcp_init/2, udp_init/2, udp_recv/5, start/2, - socket_type/0, listen_opt_type/1]). + start_link/2, accept/1, listen_opt_type/1]). %%%=================================================================== @@ -62,8 +66,11 @@ udp_recv(Sock, Addr, Port, Data, Opts) -> start(Opaque, Opts) -> esip_socket:start(Opaque, Opts). -socket_type() -> - raw. +start_link({gen_tcp, Sock}, Opts) -> + esip_socket:start_link(Sock, Opts). + +accept(_) -> + ok. set_certfile(Opts) -> case lists:keymember(certfile, 1, Opts) of diff --git a/src/ejabberd_stun.erl b/src/ejabberd_stun.erl index 53ecd5cc1..e2f9f8f8f 100644 --- a/src/ejabberd_stun.erl +++ b/src/ejabberd_stun.erl @@ -24,27 +24,30 @@ %%%------------------------------------------------------------------- -module(ejabberd_stun). - +-behaviour(ejabberd_listener). -protocol({rfc, 5766}). -protocol({xep, 176, '1.0'}). -ifndef(STUN). -include("logger.hrl"). --export([socket_type/0, start/2, listen_opt_type/1]). +-export([accept/1, start/2, start_link/2, listen_opt_type/1]). log_error() -> ?CRITICAL_MSG("ejabberd is not compiled with STUN/TURN support", []). -socket_type() -> +accept(_) -> log_error(), - raw. + ok. listen_opt_type(_) -> log_error(), []. start(_, _) -> log_error(), {error, sip_not_compiled}. +start_link(_, _) -> + log_error(), + {error, sip_not_compiled}. -else. -export([tcp_init/2, udp_init/2, udp_recv/5, start/2, - socket_type/0, listen_opt_type/1]). + start_link/2, accept/1, listen_opt_type/1]). -include("logger.hrl"). @@ -65,8 +68,11 @@ udp_recv(Socket, Addr, Port, Packet, Opts) -> start(Opaque, Opts) -> stun:start(Opaque, Opts). -socket_type() -> - raw. +start_link({gen_tcp, Sock}, Opts) -> + stun:start_link(Sock, Opts). + +accept(_Pid) -> + ok. %%%=================================================================== %%% Internal functions diff --git a/src/ejabberd_xmlrpc.erl b/src/ejabberd_xmlrpc.erl index 68a774d4b..a6afe585c 100644 --- a/src/ejabberd_xmlrpc.erl +++ b/src/ejabberd_xmlrpc.erl @@ -31,10 +31,11 @@ %%% TODO: commands strings should be strings without ~n -module(ejabberd_xmlrpc). +-behaviour(ejabberd_listener). -author('badlop@process-one.net'). --export([start/2, handler/2, process/2, socket_type/0, +-export([start/2, start_link/2, handler/2, process/2, accept/1, transform_listen_option/2, listen_opt_type/1]). -include("logger.hrl"). @@ -190,7 +191,11 @@ start({gen_tcp = _SockMod, Socket}, Opts) -> ejabberd_http:start({gen_tcp, Socket}, [{xmlrpc, true}|Opts]). -socket_type() -> raw. +start_link({gen_tcp = _SockMod, Socket}, Opts) -> + ejabberd_http:start_link({gen_tcp, Socket}, [{xmlrpc, true}|Opts]). + +accept(Pid) -> + ejabberd_http:accept(Pid). %% ----------------------------- %% HTTP interface diff --git a/src/mod_proxy65.erl b/src/mod_proxy65.erl index cc3546cf2..c911dd7aa 100644 --- a/src/mod_proxy65.erl +++ b/src/mod_proxy65.erl @@ -99,15 +99,7 @@ init([Host, Opts]) -> Service = {mod_proxy65_service, {mod_proxy65_service, start_link, [Host, Opts]}, transient, 5000, worker, [mod_proxy65_service]}, - StreamSupervisor = {ejabberd_mod_proxy65_sup, - {ejabberd_tmp_sup, start_link, - [gen_mod:get_module_proc(Host, - ejabberd_mod_proxy65_sup), - mod_proxy65_stream]}, - transient, infinity, supervisor, [ejabberd_tmp_sup]}, - {ok, - {{one_for_one, 10, 1}, - [StreamSupervisor, Service]}}. + {ok, {{one_for_one, 10, 1}, [Service]}}. depends(_Host, _Opts) -> []. diff --git a/src/mod_proxy65_stream.erl b/src/mod_proxy65_stream.erl index 4e7aa0334..668817868 100644 --- a/src/mod_proxy65_stream.erl +++ b/src/mod_proxy65_stream.erl @@ -27,18 +27,19 @@ -author('xram@jabber.ru'). -behaviour(p1_fsm). +-behaviour(ejabberd_listener). %% gen_fsm callbacks. -export([init/1, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3]). %% gen_fsm states. --export([wait_for_init/2, wait_for_auth/2, +-export([accepting/2, wait_for_init/2, wait_for_auth/2, wait_for_request/2, wait_for_activation/2, stream_established/2]). --export([start/2, stop/1, start_link/3, activate/2, - relay/3, socket_type/0, listen_opt_type/1, +-export([start/2, stop/1, start_link/2, start_link/3, activate/2, + relay/3, accept/1, listen_opt_type/1, listen_options/0]). -include("mod_proxy65.hrl"). @@ -69,10 +70,14 @@ start({gen_tcp, Socket}, Opts1) -> fun({server_host, _}) -> true; (_) -> false end, Opts1), - Supervisor = gen_mod:get_module_proc(Host, - ejabberd_mod_proxy65_sup), - supervisor:start_child(Supervisor, - [Socket, Host, Opts]). + p1_fsm:start(?MODULE, [Socket, Host, Opts], []). + +start_link({gen_tcp, Socket}, Opts1) -> + {[{server_host, Host}], Opts} = lists:partition( + fun({server_host, _}) -> true; + (_) -> false + end, Opts1), + start_link(Socket, Host, Opts). start_link(Socket, Host, Opts) -> p1_fsm:start_link(?MODULE, [Socket, Host, Opts], []). @@ -84,9 +89,8 @@ init([Socket, Host, Opts]) -> RecvBuf = gen_mod:get_opt(recbuf, Opts), SendBuf = gen_mod:get_opt(sndbuf, Opts), TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), - inet:setopts(Socket, - [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]), - {ok, wait_for_init, + inet:setopts(Socket, [{recbuf, RecvBuf}, {sndbuf, SendBuf}]), + {ok, accepting, #state{host = Host, auth_type = AuthType, socket = Socket, shaper = Shaper, timer = TRef}}. @@ -101,7 +105,8 @@ terminate(_Reason, StateName, #state{sha1 = SHA1}) -> %%%------------------------------ %%% API. %%%------------------------------ -socket_type() -> raw. +accept(StreamPid) -> + p1_fsm:send_event(StreamPid, accept). stop(StreamPid) -> StreamPid ! stop. @@ -125,6 +130,10 @@ activate({P1, J1}, {P2, J2}) -> %%%----------------------- %%% States %%%----------------------- +accepting(accept, State) -> + inet:setopts(State#state.socket, [{active, true}]), + {next_state, wait_for_init, State}. + wait_for_init(Packet, #state{socket = Socket, auth_type = AuthType} = StateData) -> -- 2.40.0