From: Badlop Date: Mon, 31 Aug 2009 18:37:52 +0000 (+0000) Subject: BOSH module optimization and clean-up (thanks to Aleksey Shchepin and Mickaël Rémond... X-Git-Tag: v2.1.0~18^2~64 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=a033b0615072010929379a64e7e285d1a27506ba;p=ejabberd BOSH module optimization and clean-up (thanks to Aleksey Shchepin and Mickaël Rémond)(EJAB-936) SVN Revision: 2574 --- diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 88d26b1d5..e9f53b6e0 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -67,6 +67,7 @@ -record(state, {socket, sockmod, socket_monitor, + xml_socket, streamid, sasl_state, access, @@ -124,16 +125,12 @@ -define(STREAM_TRAILER, ""). --define(INVALID_NS_ERR, - xml:element_to_string(?SERR_INVALID_NAMESPACE)). --define(INVALID_XML_ERR, - xml:element_to_string(?SERR_XML_NOT_WELL_FORMED)). --define(HOST_UNKNOWN_ERR, - xml:element_to_string(?SERR_HOST_UNKNOWN)). +-define(INVALID_NS_ERR, ?SERR_INVALID_NAMESPACE). +-define(INVALID_XML_ERR, ?SERR_XML_NOT_WELL_FORMED). +-define(HOST_UNKNOWN_ERR, ?SERR_HOST_UNKNOWN). -define(POLICY_VIOLATION_ERR(Lang, Text), - xml:element_to_string(?SERRT_POLICY_VIOLATION(Lang, Text))). --define(INVALID_FROM, - xml:element_to_string(?SERR_INVALID_FROM)). + ?SERRT_POLICY_VIOLATION(Lang, Text)). +-define(INVALID_FROM, ?SERR_INVALID_FROM). %%%---------------------------------------------------------------------- @@ -175,6 +172,11 @@ init([{SockMod, Socket}, Opts]) -> {value, {_, S}} -> S; _ -> none end, + XMLSocket = + case lists:keysearch(xml_socket, 1, Opts) of + {value, {_, XS}} -> XS; + _ -> false + end, Zlib = lists:member(zlib, Opts), StartTLS = lists:member(starttls, Opts), StartTLSRequired = lists:member(starttls_required, Opts), @@ -205,6 +207,7 @@ init([{SockMod, Socket}, Opts]) -> {ok, wait_for_stream, #state{socket = Socket1, sockmod = SockMod, socket_monitor = SocketMonitor, + xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, @@ -231,9 +234,9 @@ get_subscribed(FsmRef) -> wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> DefaultLang = case ?MYLANG of undefined -> - " xml:lang='en'"; + "en"; DL -> - " xml:lang='" ++ DL ++ "'" + DL end, case xml:get_attr_s("xmlns:stream", Attrs) of ?NS_STREAM -> @@ -244,12 +247,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> change_shaper(StateData, jlib:make_jid("", Server, "")), case xml:get_attr_s("version", Attrs) of "1.0" -> - Header = io_lib:format(?STREAM_HEADER, - [StateData#state.streamid, - Server, - " version='1.0'", - DefaultLang]), - send_text(StateData, Header), + send_header(StateData, Server, "1.0", DefaultLang), case StateData#state.authenticated of false -> SASLState = @@ -351,22 +349,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> end end; _ -> - Header = io_lib:format( - ?STREAM_HEADER, - [StateData#state.streamid, Server, "", - DefaultLang]), + send_header(StateData, Server, "", DefaultLang), if (not StateData#state.tls_enabled) and StateData#state.tls_required -> - send_text(StateData, - Header ++ - ?POLICY_VIOLATION_ERR( - Lang, - "Use of STARTTLS required") ++ - ?STREAM_TRAILER), + send_element( + StateData, + ?POLICY_VIOLATION_ERR( + Lang, + "Use of STARTTLS required")), + send_trailer(StateData), {stop, normal, StateData}; true -> - send_text(StateData, Header), fsm_next_state(wait_for_auth, StateData#state{ server = Server, @@ -374,20 +368,15 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> end end; _ -> - Header = io_lib:format( - ?STREAM_HEADER, - [StateData#state.streamid, ?MYNAME, "", - DefaultLang]), - send_text(StateData, - Header ++ ?HOST_UNKNOWN_ERR ++ ?STREAM_TRAILER), + send_header(StateData, ?MYNAME, "", DefaultLang), + send_element(StateData, ?HOST_UNKNOWN_ERR), + send_trailer(StateData), {stop, normal, StateData} end; _ -> - Header = io_lib:format( - ?STREAM_HEADER, - [StateData#state.streamid, ?MYNAME, "", DefaultLang]), - send_text(StateData, - Header ++ ?INVALID_NS_ERR ++ ?STREAM_TRAILER), + send_header(StateData, ?MYNAME, "", DefaultLang), + send_element(StateData, ?INVALID_NS_ERR), + send_trailer(StateData), {stop, normal, StateData} end; @@ -395,18 +384,19 @@ wait_for_stream(timeout, StateData) -> {stop, normal, StateData}; wait_for_stream({xmlstreamelement, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream({xmlstreamend, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream({xmlstreamerror, _}, StateData) -> - Header = io_lib:format(?STREAM_HEADER, - ["none", ?MYNAME, " version='1.0'", ""]), - send_text(StateData, - Header ++ ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_header(StateData, ?MYNAME, "1.0", ""), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream(closed, StateData) -> @@ -538,11 +528,12 @@ wait_for_auth(timeout, StateData) -> {stop, normal, StateData}; wait_for_auth({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; wait_for_auth({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_auth(closed, StateData) -> @@ -665,10 +656,10 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) -> if (SockMod == gen_tcp) and TLSRequired -> Lang = StateData#state.lang, - send_text(StateData, ?POLICY_VIOLATION_ERR( - Lang, - "Use of STARTTLS required") ++ - ?STREAM_TRAILER), + send_element(StateData, ?POLICY_VIOLATION_ERR( + Lang, + "Use of STARTTLS required")), + send_trailer(StateData), {stop, normal, StateData}; true -> process_unauthenticated_stanza(StateData, El), @@ -680,11 +671,12 @@ wait_for_feature_request(timeout, StateData) -> {stop, normal, StateData}; wait_for_feature_request({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; wait_for_feature_request({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_feature_request(closed, StateData) -> @@ -748,11 +740,12 @@ wait_for_sasl_response(timeout, StateData) -> {stop, normal, StateData}; wait_for_sasl_response({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; wait_for_sasl_response({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_sasl_response(closed, StateData) -> @@ -797,11 +790,12 @@ wait_for_bind(timeout, StateData) -> {stop, normal, StateData}; wait_for_bind({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; wait_for_bind({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_bind(closed, StateData) -> @@ -868,11 +862,12 @@ wait_for_session(timeout, StateData) -> {stop, normal, StateData}; wait_for_session({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; wait_for_session({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_session(closed, StateData) -> @@ -884,7 +879,8 @@ session_established({xmlstreamelement, El}, StateData) -> % Check 'from' attribute in stanza RFC 3920 Section 9.1.2 case check_from(El, FromJID) of 'invalid-from' -> - send_text(StateData, ?INVALID_FROM ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_FROM), + send_trailer(StateData), {stop, normal, StateData}; _NewEl -> session_established2(El, StateData) @@ -900,16 +896,17 @@ session_established(timeout, StateData) -> fsm_next_state(session_established, StateData); session_established({xmlstreamend, _Name}, StateData) -> - send_text(StateData, ?STREAM_TRAILER), + send_trailer(StateData), {stop, normal, StateData}; session_established({xmlstreamerror, "XML stanza is too big" = E}, StateData) -> - Text = ?POLICY_VIOLATION_ERR(StateData#state.lang, E) ++ ?STREAM_TRAILER, - send_text(StateData, Text), + send_element(StateData, ?POLICY_VIOLATION_ERR(StateData#state.lang, E)), + send_trailer(StateData), {stop, normal, StateData}; session_established({xmlstreamerror, _}, StateData) -> - send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; session_established(closed, StateData) -> @@ -1070,10 +1067,9 @@ handle_info({send_text, Text}, StateName, StateData) -> fsm_next_state(StateName, StateData); handle_info(replaced, _StateName, StateData) -> Lang = StateData#state.lang, - send_text(StateData, - xml:element_to_string( - ?SERRT_CONFLICT(Lang, "Replaced by new connection")) - ++ ?STREAM_TRAILER), + send_element(StateData, + ?SERRT_CONFLICT(Lang, "Replaced by new connection")), + send_trailer(StateData), {stop, normal, StateData#state{authenticated = replaced}}; %% Process Packets that are to be send to the user handle_info({route, From, To, Packet}, StateName, StateData) -> @@ -1273,18 +1269,15 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> Pass == exit -> %% When Pass==exit, NewState contains a string instead of a #state{} Lang = StateData#state.lang, - catch send_text(StateData, - xml:element_to_string( - ?SERRT_CONFLICT(Lang, NewState)) - ++ ?STREAM_TRAILER), + send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)), + send_trailer(StateData), {stop, normal, StateData}; Pass -> Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = {xmlelement, Name, Attrs2, Els}, - Text = xml:element_to_string(FixedPacket), - send_text(StateData, Text), + send_element(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, StateData#state.server, [StateData#state.jid, From, To, FixedPacket]), @@ -1379,9 +1372,60 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [lists:flatten(Text)]), (StateData#state.sockmod):send(StateData#state.socket, Text). +send_element(StateData, El) when StateData#state.xml_socket -> + (StateData#state.sockmod):send_xml(StateData#state.socket, + {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, xml:element_to_string(El)). +send_header(StateData, Server, Version, Lang) + when StateData#state.xml_socket -> + VersionAttr = + case Version of + "" -> []; + _ -> [{"version", Version}] + end, + LangAttr = + case Lang of + "" -> []; + _ -> [{"xml:lang", Lang}] + end, + Header = + {xmlstreamstart, + "stream:stream", + VersionAttr ++ + LangAttr ++ + [{"xmlns", "jabber:client"}, + {"xmlns:stream", "http://etherx.jabber.org/streams"}, + {"id", StateData#state.streamid}, + {"from", Server}]}, + (StateData#state.sockmod):send_xml( + StateData#state.socket, Header); +send_header(StateData, Server, Version, Lang) -> + VersionStr = + case Version of + "" -> ""; + _ -> [" version='", Version, "'"] + end, + LangStr = + case Lang of + "" -> ""; + _ -> [" xml:lang='", Lang, "'"] + end, + Header = io_lib:format(?STREAM_HEADER, + [StateData#state.streamid, + Server, + VersionStr, + LangStr]), + send_text(StateData, Header). + +send_trailer(StateData) when StateData#state.xml_socket -> + (StateData#state.sockmod):send_xml( + StateData#state.socket, + {xmlstreamend, "stream:stream"}); +send_trailer(StateData) -> + send_text(StateData, ?STREAM_TRAILER). + new_id() -> randoms:get_string(). diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 424f67e55..5a7a3bf73 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -205,7 +205,7 @@ handle_cast(_Msg, State) -> handle_info({Tag, _TCPSocket, Data}, #state{socket = Socket, sock_mod = SockMod} = State) - when (Tag == tcp) or (Tag == ssl) -> + when (Tag == tcp) or (Tag == ssl) or (Tag == ejabberd_xml) -> case SockMod of tls -> case tls:recv_data(Socket, Data) of @@ -294,6 +294,25 @@ activate_socket(#state{socket = Socket, ok end. +%% Data processing for connectors directly generating xmlelement in +%% Erlang data structure. +%% WARNING: Shaper does not work with Erlang data structure. +process_data([], State) -> + activate_socket(State), + State; +process_data([Element|Els], #state{c2s_pid = C2SPid} = State) + when element(1, Element) == xmlelement; + element(1, Element) == xmlstreamstart; + element(1, Element) == xmlstreamelement; + element(1, Element) == xmlstreamend -> + if + C2SPid == undefined -> + State; + true -> + catch gen_fsm:send_event(C2SPid, element_wrapper(Element)), + process_data(Els, State) + end; +%% Data processing for connectors receivind data as string. process_data(Data, #state{xml_stream_state = XMLStreamState, shaper_state = ShaperState, @@ -312,6 +331,16 @@ process_data(Data, State#state{xml_stream_state = XMLStreamState1, shaper_state = NewShaperState}. +%% Element coming from XML parser are wrapped inside xmlstreamelement +%% When we receive directly xmlelement tuple (from a socket module +%% speaking directly Erlang XML), we wrap it inside the same +%% xmlstreamelement coming from the XML parser. +element_wrapper(XMLElement) + when element(1, XMLElement) == xmlelement -> + {xmlstreamelement, XMLElement}; +element_wrapper(Element) -> + Element. + close_stream(undefined) -> ok; close_stream(XMLStreamState) -> diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index 566ee1e43..d629a77d9 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -37,6 +37,7 @@ compress/2, reset_stream/1, send/2, + send_xml/2, change_shaper/2, monitor/1, get_sockmod/1, @@ -62,10 +63,18 @@ start(Module, SockMod, Socket, Opts) -> {value, {_, Size}} -> Size; _ -> infinity end, - Receiver = ejabberd_receiver:start(Socket, SockMod, none, MaxStanzaSize), + {ReceiverMod, Receiver, RecRef} = + case catch SockMod:custom_receiver(Socket) of + {receiver, RecMod, RecPid} -> + {RecMod, RecPid, RecMod}; + _ -> + RecPid = ejabberd_receiver:start( + Socket, SockMod, none, MaxStanzaSize), + {ejabberd_receiver, RecPid, RecPid} + end, SocketData = #socket_state{sockmod = SockMod, socket = Socket, - receiver = Receiver}, + receiver = RecRef}, case Module:start({?MODULE, SocketData}, Opts) of {ok, Pid} -> case SockMod:controlling_process(Socket, Receiver) of @@ -74,7 +83,7 @@ start(Module, SockMod, Socket, Opts) -> {error, _Reason} -> SockMod:close(Socket) end, - ejabberd_receiver:become_controller(Receiver, Pid); + ReceiverMod:become_controller(Receiver, Pid); {error, _Reason} -> SockMod:close(Socket) end; @@ -143,18 +152,33 @@ compress(SocketData, Data) -> send(SocketData, Data), SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. -reset_stream(SocketData) -> - ejabberd_receiver:reset_stream(SocketData#socket_state.receiver). +reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) -> + ejabberd_receiver:reset_stream(SocketData#socket_state.receiver); +reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):reset_stream( + SocketData#socket_state.socket). send(SocketData, Data) -> catch (SocketData#socket_state.sockmod):send( SocketData#socket_state.socket, Data). -change_shaper(SocketData, Shaper) -> - ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper). +send_xml(SocketData, Data) -> + catch (SocketData#socket_state.sockmod):send_xml( + SocketData#socket_state.socket, Data). -monitor(SocketData) -> - erlang:monitor(process, SocketData#socket_state.receiver). +change_shaper(SocketData, Shaper) + when is_pid(SocketData#socket_state.receiver) -> + ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper); +change_shaper(SocketData, Shaper) + when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):change_shaper( + SocketData#socket_state.socket, Shaper). + +monitor(SocketData) when is_pid(SocketData#socket_state.receiver) -> + erlang:monitor(process, SocketData#socket_state.receiver); +monitor(SocketData) when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):monitor( + SocketData#socket_state.socket). get_sockmod(SocketData) -> SocketData#socket_state.sockmod. diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl index 68bb8e383..336d4171e 100644 --- a/src/web/ejabberd_http_bind.erl +++ b/src/web/ejabberd_http_bind.erl @@ -4,29 +4,11 @@ %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as %%% HTTP Binding) %%% Created : 21 Sep 2005 by Stefan Strigler -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2009 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA -%%% +%%% Modified: may 2009 by Mickael Remond, Alexey Schepin +%%% Id : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $ %%%---------------------------------------------------------------------- -module(ejabberd_http_bind). --author('steve@zeank.in-berlin.de'). -behaviour(gen_fsm). @@ -39,15 +21,19 @@ handle_info/3, terminate/3, send/2, - setopts/2, + send_xml/2, sockname/1, peername/1, + setopts/2, controlling_process/2, + become_controller/2, + custom_receiver/1, + reset_stream/1, + change_shaper/2, + monitor/1, close/1, process_request/2]). --define(ejabberd_debug, true). - -include("ejabberd.hrl"). -include("jlib.hrl"). -include("ejabberd_http.hrl"). @@ -59,7 +45,6 @@ %% http binding request -record(hbr, {rid, key, - in, out}). -record(state, {id, @@ -67,8 +52,10 @@ key, socket, output = "", - input = "", + input = queue:new(), waiting_input = false, + shaper_state, + shaper_timer, last_receiver, last_poll, http_receiver, @@ -76,22 +63,30 @@ ctime = 0, timer, pause=0, - unprocessed_req_list = [], % list of request that have been delayed for proper reordering + unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID} req_list = [], % list of requests (cache) max_inactivity, + max_pause, ip = ?NULL_PEER }). +%% Internal request format: +-record(http_put, {rid, + attrs, + payload, + payload_size, + hold, + stream, + ip}). %%-define(DBGFSM, true). - -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. --define(BOSH_VERSION, "1.6"). +-define(BOSH_VERSION, "1.8"). -define(NS_CLIENT, "jabber:client"). -define(NS_BOSH, "urn:xmpp:xbosh"). -define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind"). @@ -128,6 +123,9 @@ start_link(Sid, Key, IP) -> send({http_bind, FsmRef, _IP}, Packet) -> gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}). +send_xml({http_bind, FsmRef, _IP}, Packet) -> + gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}). + setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of true -> @@ -139,6 +137,21 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. +custom_receiver({http_bind, FsmRef, _IP}) -> + {receiver, ?MODULE, FsmRef}. + +become_controller(FsmRef, C2SPid) -> + gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). + +reset_stream({http_bind, _FsmRef, _IP}) -> + ok. + +change_shaper({http_bind, FsmRef, _IP}, Shaper) -> + gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). + +monitor({http_bind, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + close({http_bind, FsmRef, _IP}) -> catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}). @@ -148,8 +161,19 @@ sockname(_Socket) -> peername({http_bind, _FsmRef, IP}) -> {ok, IP}. + +%% Entry point for data coming from client through ejabberd HTTP server: process_request(Data, IP) -> - case catch parse_request(Data) of + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + MaxStanzaSize = + case lists:keysearch(max_stanza_size, 1, Opts) of + {value, {_, Size}} -> Size; + _ -> infinity + end, + PayloadSize = iolist_size(Data), + case catch parse_request(Data, PayloadSize, MaxStanzaSize) of + %% No existing session: {ok, {"", Rid, Attrs, Payload}} -> case xml:get_attr_s("to",Attrs) of "" -> @@ -166,11 +190,13 @@ process_request(Data, IP) -> "condition='internal-server-error' " "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started - handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) + handle_session_start( + Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) end end; + %% Existing session {ok, {Sid, Rid, Attrs, Payload1}} -> - %% old session StreamStart = case xml:get_attr_s("xmpp:restart",Attrs) of "true" -> @@ -181,17 +207,21 @@ process_request(Data, IP) -> Payload2 = case xml:get_attr_s("type",Attrs) of "terminate" -> %% close stream - Payload1 ++ ""; + Payload1 ++ [{xmlstreamend, "stream:stream"}]; _ -> Payload1 end, - handle_http_put(Sid, Rid, Attrs, Payload2, StreamStart, IP); + handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, + StreamStart, IP); + {error, size_limit} -> + {413, ?HEADER, "Request Too Large"}; _ -> - ?ERROR_MSG("Received bad request: ~p", [Data]), + ?DEBUG("Received bad request: ~p", [Data]), {400, ?HEADER, ""} end. -handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> +handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) -> ?DEBUG("got pid: ~p", [Pid]), Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of {error, _} -> @@ -235,7 +265,7 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> version = Version }) end), - handle_http_put(Sid, Rid, Attrs, Payload, true, IP). + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -257,58 +287,46 @@ init([Sid, Key, IP]) -> %% each connector. The default behaviour should be however to use %% the default c2s restrictions if not defined for the current %% connector. - Opts = ejabberd_c2s_config:get_c2s_limits(), + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + Shaper = none, + ShaperState = shaper:new(Shaper), Socket = {http_bind, self(), IP}, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts), Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []), {ok, loop, #state{id = Sid, key = Key, socket = Socket, + shaper_state = ShaperState, max_inactivity = ?MAX_INACTIVITY, + max_pause = ?MAX_PAUSE, timer = Timer}}. -%%---------------------------------------------------------------------- -%% Func: StateName/2 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} -%%---------------------------------------------------------------------- - - -%%---------------------------------------------------------------------- -%% Func: StateName/3 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {reply, Reply, NextStateName, NextStateData} | -%% {reply, Reply, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} -%%---------------------------------------------------------------------- -%state_name(Event, From, StateData) -> -% Reply = ok, -% {reply, Reply, state_name, StateData}. - %%---------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- -handle_event({activate, From}, StateName, StateData) -> +handle_event({become_controller, C2SPid}, StateName, StateData) -> case StateData#state.input of - "" -> + cancel -> {next_state, StateName, StateData#state{ - waiting_input = {From, ok}}}; + waiting_input = C2SPid}}; Input -> - Receiver = From, - Receiver ! {tcp, StateData#state.socket, list_to_binary(Input)}, + lists:foreach( + fun(Event) -> + C2SPid ! Event + end, queue:to_list(Input)), {next_state, StateName, StateData#state{ - input = "", - waiting_input = false, - last_receiver = Receiver}} + input = queue:new(), + waiting_input = C2SPid}} end; +handle_event({change_shaper, Shaper}, StateName, StateData) -> + NewShaperState = shaper:new(Shaper), + {next_state, StateName, StateData#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -321,37 +339,34 @@ handle_event(_Event, StateName, StateData) -> %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- -handle_sync_event({send, Packet}, _From, StateName, StateData) -> - Output = [StateData#state.output | Packet], - if - StateData#state.http_receiver /= undefined -> - cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - HTTPReply = case Output of - [[]| OutPacket] -> - {ok, OutPacket}; - _ -> - {ok, Output} - end, - gen_fsm:reply(StateData#state.http_receiver, HTTPReply), - cancel_timer(StateData#state.wait_timer), - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = [], - http_receiver = undefined, - wait_timer = undefined, - timer = Timer}}; - true -> - Reply = ok, - {reply, Reply, StateName, StateData#state{output = Output}} - end; +handle_sync_event({send_xml, Packet}, _From, StateName, + #state{http_receiver = undefined} = StateData) -> + Output = [Packet | StateData#state.output], + Reply = ok, + {reply, Reply, StateName, StateData#state{output = Output}}; +handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> + Output = [Packet | StateData#state.output], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + HTTPReply = {ok, Output}, + gen_fsm:reply(StateData#state.http_receiver, HTTPReply), + cancel_timer(StateData#state.wait_timer), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = Output + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], + Reply = ok, + {reply, Reply, StateName, + StateData#state{output = [], + http_receiver = undefined, + req_list = ReqList, + wait_timer = undefined, + timer = Timer}}; handle_sync_event({stop,close}, _From, _StateName, StateData) -> Reply = ok, @@ -360,130 +375,82 @@ handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> - ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), + ?ERROR_MSG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, {stop, normal, Reply, StateData}; - %% HTTP PUT: Receive packets from the client -handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request, - _From, StateName, StateData) -> - %% Check if Rid valid - RidAllow = - case StateData#state.rid of - none -> - %% First request - nothing saved so far - {true, 0}; - OldRid -> - ?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]), - if - %% We did not miss any packet, we can process it immediately: - Rid == OldRid + 1 -> - case catch list_to_integer( - xml:get_attr_s("pause", Attrs)) of - {'EXIT', _} -> - {true, 0}; - Pause1 when Pause1 =< ?MAX_PAUSE -> - ?DEBUG("got pause: ~p", [Pause1]), - {true, Pause1}; - _ -> - {true, 0} - end; - %% We have missed packets, we need to cached it to process it later on: - (OldRid < Rid) and - (Rid =< (OldRid + Hold + 1)) -> - buffer; - (Rid =< OldRid) and - (Rid > OldRid - Hold - 1) -> - repeat; - true -> - false - end +handle_sync_event(#http_put{rid = Rid}, + _From, StateName, StateData) + when StateData#state.shaper_timer /= undefined -> + Pause = + case erlang:read_timer(StateData#state.shaper_timer) of + false -> + 0; + P -> P end, + Reply = {wait, Pause}, + ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), + {reply, Reply, StateName, StateData}; - %% Check if Rid is in sequence or out of sequence: - case RidAllow of - buffer -> - ?DEBUG("Buffered request: ~p", [Request]), - %% Request is out of sequence: - PendingRequests = StateData#state.unprocessed_req_list, - %% In case an existing RID was already buffered: - Requests = lists:keydelete(Rid, 2, PendingRequests), - {reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}}; - _ -> - %% Request is in sequence: - process_http_put(Request, StateName, StateData, RidAllow) - end; +handle_sync_event(#http_put{rid = _Rid, attrs = _Attrs, + payload_size = PayloadSize, + hold = _Hold} = Request, + _From, StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Updating trafic shaper + {NewShaperState, NewShaperTimer} = + update_shaper(StateData#state.shaper_state, PayloadSize), + + handle_http_put_event(Request, StateName, + StateData#state{shaper_state = NewShaperState, + shaper_timer = NewShaperTimer}); %% HTTP GET: send packets to the client handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> %% setup timer - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, empty}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, empty}), cancel_timer(StateData#state.wait_timer), - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), if (Hold > 0) and - (StateData#state.output == "") and + (StateData#state.output == []) and ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and (StateData#state.rid == Rid) and - (StateData#state.input /= "cancel") and + (StateData#state.input /= cancel) and (StateData#state.pause == 0) -> WaitTimer = erlang:start_timer(Wait * 1000, self(), []), + %% MR: Not sure we should cancel the state timer here. cancel_timer(StateData#state.timer), {next_state, StateName, StateData#state{ http_receiver = From, wait_timer = WaitTimer, timer = undefined}}; - (StateData#state.input == "cancel") -> + (StateData#state.input == cancel) -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), Reply = {ok, cancel}, {reply, Reply, StateName, StateData#state{ - input = "", + input = queue:new(), http_receiver = undefined, wait_timer = undefined, timer = Timer}}; true -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - case StateData#state.output of - [[]| OutPacket] -> - Reply = {ok, OutPacket}; - _ -> - Reply = {ok, StateData#state.output} - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + Reply = {ok, StateData#state.output}, %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = StateData#state.output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {reply, Reply, StateName, StateData#state{ - input = "", - output = "", + output = [], http_receiver = undefined, wait_timer = undefined, timer = Timer, @@ -507,6 +474,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +%% We reached the max_inactivity timeout: handle_info({timeout, Timer, _}, _StateName, #state{id=SID, timer = Timer} = StateData) -> ?WARNING_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]), @@ -517,23 +485,30 @@ handle_info({timeout, WaitTimer, _}, StateName, if StateData#state.http_receiver /= undefined -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], {next_state, StateName, StateData#state{http_receiver = undefined, + req_list = ReqList, wait_timer = undefined, timer = Timer}}; true -> {next_state, StateName, StateData} end; +handle_info({timeout, ShaperTimer, _}, StateName, + #state{shaper_timer = ShaperTimer} = StateData) -> + {next_state, StateName, StateData#state{shaper_timer = undefined}}; + handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. @@ -548,19 +523,12 @@ terminate(_Reason, _StateName, StateData) -> fun() -> mnesia:delete({http_bind, StateData#state.id}) end), - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, terminate}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, terminate}), case StateData#state.waiting_input of false -> - case StateData#state.last_receiver of - undefined -> ok; - Receiver -> Receiver ! {tcp_closed, StateData#state.socket} - end; - {Receiver, _Tag} -> Receiver ! {tcp_closed, StateData#state.socket} + ok; + C2SPid -> + gen_fsm:send_event(C2SPid, closed) end, ok. @@ -569,8 +537,47 @@ terminate(_Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- %% PUT / Get processing: -process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, +handle_http_put_event(#http_put{rid = Rid, attrs = Attrs, + hold = Hold} = Request, + StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Check if Rid valid + RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold, + StateData#state.max_pause), + + %% Check if Rid is in sequence or out of sequence: + case RidAllow of + buffer -> + ?DEBUG("Buffered request: ~p", [Request]), + %% Request is out of sequence: + PendingRequests = StateData#state.unprocessed_req_list, + %% In case an existing RID was already buffered: + Requests = lists:keydelete(Rid, 2, PendingRequests), + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)] + ], + ?DEBUG("reqlist: ~p", [ReqList]), + UnprocessedReqList = [Request | Requests], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(0, StateData#state.max_inactivity), + {reply, buffered, StateName, + StateData#state{unprocessed_req_list = UnprocessedReqList, + req_list = ReqList, + timer = Timer}}; + _ -> + %% Request is in sequence: + process_http_put(Request, StateName, StateData, RidAllow) + end. + +process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload, + hold = Hold, stream = StreamTo, + ip = IP} = Request, StateName, StateData, RidAllow) -> + ?DEBUG("Actually processing request: ~p", [Request]), %% Check if key valid Key = xml:get_attr_s("key", Attrs), NewKey = xml:get_attr_s("newkey", Attrs), @@ -596,16 +603,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, end end end, - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), LastPoll = if - Payload == "" -> + Payload == [] -> TNow; true -> 0 end, if - (Payload == "") and + (Payload == []) and (Hold == 0) and (TNow - StateData#state.last_poll < ?MIN_POLLING) -> Reply = {error, polling_too_frequently}, @@ -618,17 +624,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, repeat -> ?DEBUG("REPEATING ~p", [Rid]), Reply = case [El#hbr.out || - El <- StateData#state.req_list, + El <- StateData#state.req_list, El#hbr.rid == Rid] of [] -> {error, not_exists}; - [ [[] | Out] | _XS] -> - {repeat, Out}; [Out | _XS] -> - {repeat, Out} - end, - {reply, Reply, StateName, - StateData#state{input = "cancel", last_poll = LastPoll}}; + {repeat, lists:reverse(Out)} + end, + {reply, Reply, StateName, StateData#state{input = cancel, + last_poll = LastPoll}}; {true, Pause} -> SaveKey = if NewKey == "" -> @@ -639,30 +643,33 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output - } | - [El || El <- StateData#state.req_list, - El#hbr.rid < Rid, - El#hbr.rid > (Rid - 1 - Hold)] - ], + ReqList1 = + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)], + ReqList = + case lists:keymember(Rid, #hbr.rid, ReqList1) of + true -> + ReqList1; + false -> + [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + ReqList1 + ] + end, ?DEBUG("reqlist: ~p", [ReqList]), %% setup next timer cancel_timer(StateData#state.timer), - Timer = if - Pause > 0 -> - erlang:start_timer( - Pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(Pause, + StateData#state.max_inactivity), case StateData#state.waiting_input of false -> - Input = Payload ++ [StateData#state.input], + Input = + lists:foldl( + fun queue:in/2, + StateData#state.input, Payload), Reply = ok, process_buffered_request(Reply, StateName, StateData#state{input = Input, @@ -675,32 +682,42 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, req_list = ReqList, ip = IP }); - {Receiver, _Tag} -> - SendPacket = - case StreamTo of - {To, ""} -> - [""] - ++ Payload; - {To, Version} -> - [""] - ++ Payload; - _ -> - Payload - end, + C2SPid -> + case StreamTo of + {To, ""} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"xmlns:stream", ?NS_STREAM}]}); + {To, Version} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"version", Version}, + {"xmlns:stream", ?NS_STREAM}]}); + _ -> + ok + end, + MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity), - ?DEBUG("really sending now: ~s", [SendPacket]), - Receiver ! {tcp, StateData#state.socket, - list_to_binary(SendPacket)}, + MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause), + + ?DEBUG("really sending now: ~p", [Payload]), + lists:foreach( + fun({xmlstreamend, End}) -> + gen_fsm:send_event( + C2SPid, {xmlstreamend, End}); + (El) -> + gen_fsm:send_event( + C2SPid, {xmlstreamelement, El}) + end, Payload), Reply = ok, process_buffered_request(Reply, StateName, - StateData#state{waiting_input = false, - last_receiver = Receiver, - input = "", + StateData#state{input = queue:new(), rid = Rid, key = SaveKey, ctime = TNow, @@ -709,6 +726,7 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, last_poll = LastPoll, req_list = ReqList, max_inactivity = MaxInactivity, + max_pause = MaxPause, ip = IP }) end @@ -724,29 +742,42 @@ process_buffered_request(Reply, StateName, StateData) -> case lists:keysearch(Rid+1, 2, Requests) of {value, Request} -> ?DEBUG("Processing buffered request: ~p", [Request]), - NewRequests = Requests -- [Request], - handle_sync_event(Request, undefined, StateName, - StateData#state{unprocessed_req_list=NewRequests}); + NewRequests = lists:keydelete(Rid+1, 2, Requests), + handle_http_put_event( + Request, StateName, + StateData#state{unprocessed_req_list = NewRequests}); _ -> {reply, Reply, StateName, StateData} end. -handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> - case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of +handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> + case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of {error, not_exists} -> - ?DEBUG("no session associated with sid: ~p", [Sid]), + ?ERROR_MSG("no session associated with sid: ~p", [Sid]), {404, ?HEADER, ""}; {{error, Reason}, Sess} -> - ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), + ?ERROR_MSG("Error on HTTP put. Reason: ~p", [Reason]), handle_http_put_error(Reason, Sess); {{repeat, OutPacket}, Sess} -> ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]), send_outpacket(Sess, OutPacket); + {{wait, Pause}, _Sess} -> + ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), + timer:sleep(Pause), + %{200, ?HEADER, + % xml:element_to_string( + % {xmlelement, "body", + % [{"xmlns", ?NS_HTTP_BIND}, + % {"type", "error"}], []})}; + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP); + {buffered, _Sess} -> + {200, ?HEADER, ""}; {ok, Sess} -> prepare_response(Sess, Rid, Attrs, StreamStart) end. -http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> +http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), case mnesia:dirty_read({http_bind, Sid}) of [] -> @@ -760,7 +791,9 @@ http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> "" end, {gen_fsm:sync_send_all_state_event( - FsmRef, {http_put, Rid, Attrs, Payload, Hold, NewStream, IP}), Sess} + FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, + payload_size = PayloadSize, hold = Hold, + stream = NewStream, ip = IP}, 30000), Sess} end. handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version}) @@ -803,6 +836,46 @@ handle_http_put_error(Reason, #http_bind{pid=FsmRef}) -> {403, ?HEADER, ""} end. +%% Control RID ordering +rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> + %% First request - nothing saved so far + {true, 0}; +rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> + ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]), + if + %% We did not miss any packet, we can process it immediately: + NewRid == OldRid + 1 -> + case catch list_to_integer( + xml:get_attr_s("pause", Attrs)) of + {'EXIT', _} -> + {true, 0}; + Pause1 when Pause1 =< MaxPause -> + ?DEBUG("got pause: ~p", [Pause1]), + {true, Pause1}; + _ -> + {true, 0} + end; + %% We have missed packets, we need to cached it to process it later on: + (OldRid < NewRid) and + (NewRid =< (OldRid + Hold + 1)) -> + buffer; + (NewRid =< OldRid) and + (NewRid > OldRid - Hold - 1) -> + repeat; + true -> + false + end. + +update_shaper(ShaperState, PayloadSize) -> + {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize), + if + Pause > 0 -> + ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled + {NewShaperState, ShaperTimer}; + true -> + {NewShaperState, undefined} + end. + prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, Rid, _, StreamStart) -> receive after 100 -> ok end, %% TODO: Why is this needed. Argh. Bad programming practice. @@ -816,62 +889,52 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {200, ?HEADER, ""}; {ok, terminate} -> {200, ?HEADER, ""}; - {ok, OutPacket} -> - ?DEBUG("OutPacket: ~s", [OutPacket]), + {ok, ROutPacket} -> + OutPacket = lists:reverse(ROutPacket), + ?DEBUG("OutPacket: ~p", [OutPacket]), case StreamStart of false -> send_outpacket(Sess, OutPacket); true -> - OutEls = - case xml_stream:parse_element( - OutPacket++"") of - El when element(1, El) == xmlelement -> - ?DEBUG("~p", [El]), - {xmlelement, _, OutAttrs, Els} = El, - AuthID = xml:get_attr_s("id", OutAttrs), - From = xml:get_attr_s("from", OutAttrs), - Version = xml:get_attr_s("version", OutAttrs), - StreamError = false, - case Els of - [] -> - []; - [{xmlelement, "stream:features", - StreamAttribs, StreamEls} - | StreamTail] -> - [{xmlelement, "stream:features", - [{"xmlns:stream", - ?NS_STREAM} - ] - ++ StreamAttribs, - StreamEls - }] ++ StreamTail; - Xml -> - Xml - end; - {error, _} -> - AuthID = "", - From = "", - Version = "", - StreamError = true, - [] - end, - if - StreamError == true -> - {200, ?HEADER, ""}; - true -> - BOSH_attribs = - [{"authid", AuthID}, - {"xmlns:xmpp", ?NS_BOSH}, - {"xmlns:stream", ?NS_STREAM}] ++ - case OutEls of - [] -> - []; - _ -> - [{"xmpp:version", Version}] - end, - MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + case OutPacket of + [{xmlstreamstart, _, OutAttrs} | Els] -> + AuthID = xml:get_attr_s("id", OutAttrs), + From = xml:get_attr_s("from", OutAttrs), + Version = xml:get_attr_s("version", OutAttrs), + OutEls = + case Els of + [] -> + []; + [{xmlstreamelement, + {xmlelement, "stream:features", + StreamAttribs, StreamEls}} + | StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + BOSH_attribs = + [{"authid", AuthID}, + {"xmlns:xmpp", ?NS_BOSH}, + {"xmlns:stream", ?NS_STREAM}] ++ + case OutEls of + [] -> + []; + _ -> + [{"xmpp:version", Version}] + end, + MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + MaxPause = get_max_pause(To), {200, ?HEADER, xml:element_to_string( {xmlelement,"body", @@ -882,16 +945,20 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {"requests", integer_to_list(Hold+1)}, {"inactivity", integer_to_list( - trunc(MaxInactivity/1000))}, - {"maxpause", - integer_to_list(?MAX_PAUSE)}, + trunc(MaxInactivity/1000))}, + {"maxpause", + integer_to_list(MaxPause)}, {"polling", - integer_to_list( - trunc(?MIN_POLLING/1000000))}, - {"ver", ?BOSH_VERSION}, - {"from", From}, - {"secure", "true"} %% we're always being secure - ] ++ BOSH_attribs,OutEls})} + integer_to_list( + trunc(?MIN_POLLING/1000000))}, + {"ver", ?BOSH_VERSION}, + {"from", From}, + {"secure", "true"} %% we're always being secure + ] ++ BOSH_attribs,OutEls})}; + {error, _} -> + {200, ?HEADER, ""} end end; {'EXIT', {shutdown, _}} -> @@ -906,77 +973,85 @@ http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) -> send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> case OutPacket of - "" -> + [] -> {200, ?HEADER, ""}; - "" -> + [{xmlstreamend, _}] -> gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}), {200, ?HEADER, ""}; _ -> - case xml_stream:parse_element("" - ++ OutPacket - ++ "") - of - El when element(1, El) == xmlelement -> - {xmlelement, _, _, OEls} = El, + %% TODO: We parse to add a default namespace to packet, + %% The spec says adding the jabber:client namespace if + %% mandatory, even if some implementation do not do that + %% change on packets. + %% I think this should be an option to avoid modifying + %% packet in most case. + AllElements = + lists:all(fun({xmlstreamelement, + {xmlelement, "stream:error", _, _}}) -> false; + ({xmlstreamelement, _}) -> true; + (_) -> false + end, OutPacket), + case AllElements of + true -> TypedEls = [check_default_xmlns(OEl) || - OEl <- OEls], + {xmlstreamelement, OEl} <- OutPacket], + Body = xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + TypedEls}), ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n", - [xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})] - ), - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})}; - {error, _E} -> - OutEls = case xml_stream:parse_element( - OutPacket++"") of - SEl when element(1, SEl) == xmlelement -> - {xmlelement, _, _OutAttrs, SEls} = SEl, - StreamError = false, - case SEls of - [] -> - []; - [{xmlelement, - "stream:features", - StreamAttribs, StreamEls} | - StreamTail] -> - TypedTail = - [check_default_xmlns(OEl) || - OEl <- StreamTail], - [{xmlelement, - "stream:features", - [{"xmlns:stream", - ?NS_STREAM}] ++ - StreamAttribs, StreamEls}] ++ - TypedTail; - Xml -> - Xml - end; - {error, _} -> - StreamError = true, - [] - end, - if - StreamError -> + [Body]), + {200, ?HEADER, Body}; + false -> + case OutPacket of + [{xmlstreamstart, _, _} | SEls] -> + OutEls = + case SEls of + [{xmlstreamelement, + {xmlelement, + "stream:features", + StreamAttribs, StreamEls}} | + StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, + "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + {200, ?HEADER, + xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + OutEls})}; + _ -> + SErrCond = + lists:filter( + fun({xmlstreamelement, + {xmlelement, "stream:error", + _, _}}) -> + true; + (_) -> false + end, OutPacket), StreamErrCond = - case xml_stream:parse_element( - "" ++ OutPacket) of - El when element(1, El) == xmlelement -> - case xml:get_subtag(El, "stream:error") of - false -> - null; - {xmlelement, _, _, _Cond} = StreamErrorTag -> - [StreamErrorTag] - end; - {error, _E} -> - null - end, + case SErrCond of + [] -> + null; + [{xmlstreamelement, + {xmlelement, _, _, _Cond} = + StreamErrorTag} | _] -> + [StreamErrorTag] + end, gen_fsm:sync_send_all_state_event(FsmRef, {stop, {stream_error,OutPacket}}), case StreamErrCond of @@ -993,27 +1068,22 @@ send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> "xmlns:stream='"++?NS_STREAM++"'>" ++ elements_to_string(StreamErrCond) ++ ""} - end; - true -> - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - OutEls})} - end + end + end end end. -parse_request(Data) -> +parse_request(_Data, PayloadSize, MaxStanzaSize) + when PayloadSize > MaxStanzaSize -> + {error, size_limit}; +parse_request(Data, _PayloadSize, _MaxStanzaSize) -> ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]), + %% MR: I do not think it works if put put several elements in the + %% same body: case xml_stream:parse_element(Data) of - El when element(1, El) == xmlelement -> - {xmlelement, Name, Attrs, Els} = El, + {xmlelement, "body", Attrs, Els} -> Xmlns = xml:get_attr_s("xmlns",Attrs), if - Name /= "body" -> - {error, bad_request}; Xmlns /= ?NS_HTTP_BIND -> {error, bad_request}; true -> @@ -1021,7 +1091,7 @@ parse_request(Data) -> {'EXIT', _} -> {error, bad_request}; Rid -> - %% I guess this is to remove XMLCDATA: + %% I guess this is to remove XMLCDATA: Is it really needed ? FixedEls = lists:filter( fun(I) -> @@ -1032,28 +1102,22 @@ parse_request(Data) -> false end end, Els), - %% MR: I commented this code, because it is not used. -%% lists:map( -%% fun(E) -> -%% EXmlns = xml:get_tag_attr_s("xmlns",E), -%% if -%% EXmlns == ?NS_CLIENT -> -%% remove_tag_attr("xmlns",E); -%% true -> -%% ok -%% end -%% end, FixedEls), - Payload = [xml:element_to_string(E) || E <- FixedEls], Sid = xml:get_attr_s("sid",Attrs), - %% MR: I do not think we need to extract - %% Sid. We should have it somewhere else: - {ok, {Sid, Rid, Attrs, Payload}} + {ok, {Sid, Rid, Attrs, FixedEls}} end end; + {xmlelement, _Name, _Attrs, _Els} -> + {error, bad_request}; {error, _Reason} -> {error, bad_request} end. +send_receiver_reply(undefined, _Reply) -> + ok; +send_receiver_reply(Receiver, Reply) -> + gen_fsm:reply(Receiver, Reply). + + %% Cancel timer and empty message queue. cancel_timer(undefined) -> ok; @@ -1066,6 +1130,15 @@ cancel_timer(Timer) -> ok end. +%% If client asked for a pause (pause > 0), we apply the pause value +%% as inactivity timer: +set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 -> + erlang:start_timer(Pause*1000, self(), []); +%% Otherwise, we apply the max_inactivity value as inactivity timer: +set_inactivity_timer(_Pause, MaxInactivity) -> + erlang:start_timer(MaxInactivity, self(), []). + + %% TODO: Use tail recursion and list reverse ? elements_to_string([]) -> []; @@ -1084,11 +1157,15 @@ get_max_inactivity({Host, _}, Default) -> get_max_inactivity(_, Default) -> Default. -%% remove_tag_attr(Attr, {xmlelement, Name, Attrs, Els}) -> -%% Attrs1 = lists:keydelete(Attr, 1, Attrs), -%% {xmlelement, Name, Attrs1, Els}; -%% remove_tag_attr(Attr, El) -> -%% El. +get_max_pause({Host, _}) -> + gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE); +get_max_pause(_) -> + ?MAX_PAUSE. + +%% Current time as integer +tnow() -> + {TMegSec, TSec, TMSec} = now(), + (TMegSec * 1000000 + TSec) * 1000000 + TMSec. check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> case xml:get_tag_attr_s("xmlns", El) of @@ -1101,6 +1178,6 @@ check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of true -> ok; - false -> ?ERROR_MSG("You are trying to use HTTP Bind (BOSH), but the module mod_http_bind is not started.~n" + false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n" "Check your 'modules' section in your ejabberd configuration file.",[]) end.