]> granicus.if.org Git - ejabberd/commitdiff
BOSH module optimization and clean-up (thanks to Aleksey Shchepin and Mickaël Rémond...
authorBadlop <badlop@process-one.net>
Mon, 31 Aug 2009 18:37:52 +0000 (18:37 +0000)
committerBadlop <badlop@process-one.net>
Mon, 31 Aug 2009 18:37:52 +0000 (18:37 +0000)
SVN Revision: 2574

src/ejabberd_c2s.erl
src/ejabberd_receiver.erl
src/ejabberd_socket.erl
src/web/ejabberd_http_bind.erl

index 88d26b1d512b8f87880a6adfb7515ca0d736e244..e9f53b6e0c9a3b29a9916aa5d83f04052904cd73 100644 (file)
@@ -67,6 +67,7 @@
 -record(state, {socket,
                sockmod,
                socket_monitor,
+               xml_socket,
                streamid,
                sasl_state,
                access,
 
 -define(STREAM_TRAILER, "</stream:stream>").
 
--define(INVALID_NS_ERR,
-       xml:element_to_string(?SERR_INVALID_NAMESPACE)).
--define(INVALID_XML_ERR,
-       xml:element_to_string(?SERR_XML_NOT_WELL_FORMED)).
--define(HOST_UNKNOWN_ERR,
-       xml:element_to_string(?SERR_HOST_UNKNOWN)).
+-define(INVALID_NS_ERR, ?SERR_INVALID_NAMESPACE).
+-define(INVALID_XML_ERR, ?SERR_XML_NOT_WELL_FORMED).
+-define(HOST_UNKNOWN_ERR, ?SERR_HOST_UNKNOWN).
 -define(POLICY_VIOLATION_ERR(Lang, Text),
-       xml:element_to_string(?SERRT_POLICY_VIOLATION(Lang, Text))).
--define(INVALID_FROM,
-       xml:element_to_string(?SERR_INVALID_FROM)).
+       ?SERRT_POLICY_VIOLATION(Lang, Text)).
+-define(INVALID_FROM, ?SERR_INVALID_FROM).
 
 
 %%%----------------------------------------------------------------------
@@ -175,6 +172,11 @@ init([{SockMod, Socket}, Opts]) ->
                 {value, {_, S}} -> S;
                 _ -> none
             end,
+    XMLSocket =
+       case lists:keysearch(xml_socket, 1, Opts) of
+           {value, {_, XS}} -> XS;
+           _ -> false
+       end,
     Zlib = lists:member(zlib, Opts),
     StartTLS = lists:member(starttls, Opts),
     StartTLSRequired = lists:member(starttls_required, Opts),
@@ -205,6 +207,7 @@ init([{SockMod, Socket}, Opts]) ->
            {ok, wait_for_stream, #state{socket         = Socket1,
                                         sockmod        = SockMod,
                                         socket_monitor = SocketMonitor,
+                                        xml_socket     = XMLSocket,
                                         zlib           = Zlib,
                                         tls            = TLS,
                                         tls_required   = StartTLSRequired,
@@ -231,9 +234,9 @@ get_subscribed(FsmRef) ->
 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
     DefaultLang = case ?MYLANG of
                      undefined ->
-                         " xml:lang='en'";
+                         "en";
                      DL ->
-                         " xml:lang='" ++ DL ++ "'"
+                         DL
                  end,
     case xml:get_attr_s("xmlns:stream", Attrs) of
        ?NS_STREAM ->
@@ -244,12 +247,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
                    change_shaper(StateData, jlib:make_jid("", Server, "")),
                    case xml:get_attr_s("version", Attrs) of
                        "1.0" ->
-                           Header = io_lib:format(?STREAM_HEADER,
-                                                  [StateData#state.streamid,
-                                                   Server,
-                                                   " version='1.0'",
-                                                   DefaultLang]),
-                           send_text(StateData, Header),
+                           send_header(StateData, Server, "1.0", DefaultLang),
                            case StateData#state.authenticated of
                                false ->
                                    SASLState =
@@ -351,22 +349,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
                                    end
                            end;
                        _ ->
-                           Header = io_lib:format(
-                                      ?STREAM_HEADER,
-                                      [StateData#state.streamid, Server, "",
-                                       DefaultLang]),
+                           send_header(StateData, Server, "", DefaultLang),
                            if
                                (not StateData#state.tls_enabled) and
                                StateData#state.tls_required ->
-                                   send_text(StateData,
-                                             Header ++
-                                             ?POLICY_VIOLATION_ERR(
-                                                Lang,
-                                                "Use of STARTTLS required") ++
-                                             ?STREAM_TRAILER),
+                                   send_element(
+                                     StateData,
+                                     ?POLICY_VIOLATION_ERR(
+                                        Lang,
+                                        "Use of STARTTLS required")),
+                                   send_trailer(StateData),
                                    {stop, normal, StateData};
                                true ->
-                                   send_text(StateData, Header),
                                    fsm_next_state(wait_for_auth,
                                                   StateData#state{
                                                     server = Server,
@@ -374,20 +368,15 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
                            end
                    end;
                _ ->
-                   Header = io_lib:format(
-                              ?STREAM_HEADER,
-                              [StateData#state.streamid, ?MYNAME, "",
-                               DefaultLang]),
-                   send_text(StateData,
-                             Header ++ ?HOST_UNKNOWN_ERR ++ ?STREAM_TRAILER),
+                   send_header(StateData, ?MYNAME, "", DefaultLang),
+                   send_element(StateData, ?HOST_UNKNOWN_ERR),
+                   send_trailer(StateData),
                    {stop, normal, StateData}
            end;
        _ ->
-           Header = io_lib:format(
-                      ?STREAM_HEADER,
-                      [StateData#state.streamid, ?MYNAME, "", DefaultLang]),
-           send_text(StateData,
-                     Header ++ ?INVALID_NS_ERR ++ ?STREAM_TRAILER),
+           send_header(StateData, ?MYNAME, "", DefaultLang),
+           send_element(StateData, ?INVALID_NS_ERR),
+           send_trailer(StateData),
            {stop, normal, StateData}
     end;
 
@@ -395,18 +384,19 @@ wait_for_stream(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_stream({xmlstreamelement, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_stream({xmlstreamend, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_stream({xmlstreamerror, _}, StateData) ->
-    Header = io_lib:format(?STREAM_HEADER,
-                          ["none", ?MYNAME, " version='1.0'", ""]),
-    send_text(StateData,
-             Header ++ ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_header(StateData, ?MYNAME, "1.0", ""),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_stream(closed, StateData) ->
@@ -538,11 +528,12 @@ wait_for_auth(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_auth({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_auth({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_auth(closed, StateData) ->
@@ -665,10 +656,10 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
            if
                (SockMod == gen_tcp) and TLSRequired ->
                    Lang = StateData#state.lang,
-                   send_text(StateData, ?POLICY_VIOLATION_ERR(
-                                           Lang,
-                                           "Use of STARTTLS required") ++
-                                        ?STREAM_TRAILER),
+                   send_element(StateData, ?POLICY_VIOLATION_ERR(
+                                              Lang,
+                                              "Use of STARTTLS required")),
+                   send_trailer(StateData),
                    {stop, normal, StateData};
                true ->
                    process_unauthenticated_stanza(StateData, El),
@@ -680,11 +671,12 @@ wait_for_feature_request(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_feature_request({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_feature_request({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_feature_request(closed, StateData) ->
@@ -748,11 +740,12 @@ wait_for_sasl_response(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_sasl_response({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_sasl_response({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_sasl_response(closed, StateData) ->
@@ -797,11 +790,12 @@ wait_for_bind(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_bind({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_bind({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_bind(closed, StateData) ->
@@ -868,11 +862,12 @@ wait_for_session(timeout, StateData) ->
     {stop, normal, StateData};
 
 wait_for_session({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_session({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 wait_for_session(closed, StateData) ->
@@ -884,7 +879,8 @@ session_established({xmlstreamelement, El}, StateData) ->
     % Check 'from' attribute in stanza RFC 3920 Section 9.1.2
     case check_from(El, FromJID) of
        'invalid-from' ->
-           send_text(StateData, ?INVALID_FROM ++ ?STREAM_TRAILER),
+           send_element(StateData, ?INVALID_FROM),
+           send_trailer(StateData),
            {stop, normal, StateData};
        _NewEl ->
            session_established2(El, StateData)
@@ -900,16 +896,17 @@ session_established(timeout, StateData) ->
     fsm_next_state(session_established, StateData);
 
 session_established({xmlstreamend, _Name}, StateData) ->
-    send_text(StateData, ?STREAM_TRAILER),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 session_established({xmlstreamerror, "XML stanza is too big" = E}, StateData) ->
-    Text = ?POLICY_VIOLATION_ERR(StateData#state.lang, E) ++ ?STREAM_TRAILER,
-    send_text(StateData, Text),
+    send_element(StateData, ?POLICY_VIOLATION_ERR(StateData#state.lang, E)),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 session_established({xmlstreamerror, _}, StateData) ->
-    send_text(StateData, ?INVALID_XML_ERR ++ ?STREAM_TRAILER),
+    send_element(StateData, ?INVALID_XML_ERR),
+    send_trailer(StateData),
     {stop, normal, StateData};
 
 session_established(closed, StateData) ->
@@ -1070,10 +1067,9 @@ handle_info({send_text, Text}, StateName, StateData) ->
     fsm_next_state(StateName, StateData);
 handle_info(replaced, _StateName, StateData) ->
     Lang = StateData#state.lang,
-    send_text(StateData,
-             xml:element_to_string(
-               ?SERRT_CONFLICT(Lang, "Replaced by new connection"))
-             ++ ?STREAM_TRAILER),
+    send_element(StateData,
+                ?SERRT_CONFLICT(Lang, "Replaced by new connection")),
+    send_trailer(StateData),
     {stop, normal, StateData#state{authenticated = replaced}};
 %% Process Packets that are to be send to the user
 handle_info({route, From, To, Packet}, StateName, StateData) ->
@@ -1273,18 +1269,15 @@ handle_info({route, From, To, Packet}, StateName, StateData) ->
        Pass == exit ->
            %% When Pass==exit, NewState contains a string instead of a #state{}
            Lang = StateData#state.lang,
-            catch send_text(StateData,
-                           xml:element_to_string(
-                             ?SERRT_CONFLICT(Lang, NewState))
-                           ++ ?STREAM_TRAILER),
+           send_element(StateData, ?SERRT_CONFLICT(Lang, NewState)),
+           send_trailer(StateData),
            {stop, normal, StateData};
        Pass ->
            Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
                                                jlib:jid_to_string(To),
                                                NewAttrs),
            FixedPacket = {xmlelement, Name, Attrs2, Els},
-           Text = xml:element_to_string(FixedPacket),
-           send_text(StateData, Text),
+           send_element(StateData, FixedPacket),
            ejabberd_hooks:run(user_receive_packet,
                               StateData#state.server,
                               [StateData#state.jid, From, To, FixedPacket]),
@@ -1379,9 +1372,60 @@ send_text(StateData, Text) ->
     ?DEBUG("Send XML on stream = ~p", [lists:flatten(Text)]),
     (StateData#state.sockmod):send(StateData#state.socket, Text).
 
+send_element(StateData, El) when StateData#state.xml_socket ->
+    (StateData#state.sockmod):send_xml(StateData#state.socket,
+                                      {xmlstreamelement, El});
 send_element(StateData, El) ->
     send_text(StateData, xml:element_to_string(El)).
 
+send_header(StateData, Server, Version, Lang)
+  when StateData#state.xml_socket ->
+    VersionAttr =
+       case Version of
+           "" -> [];
+           _ -> [{"version", Version}]
+       end,
+    LangAttr =
+       case Lang of
+           "" -> [];
+           _ -> [{"xml:lang", Lang}]
+       end,
+    Header =
+       {xmlstreamstart,
+        "stream:stream",
+        VersionAttr ++
+        LangAttr ++
+        [{"xmlns", "jabber:client"},
+         {"xmlns:stream", "http://etherx.jabber.org/streams"},
+         {"id", StateData#state.streamid},
+         {"from", Server}]},
+    (StateData#state.sockmod):send_xml(
+      StateData#state.socket, Header);
+send_header(StateData, Server, Version, Lang) ->
+    VersionStr =
+       case Version of
+           "" -> "";
+           _ -> [" version='", Version, "'"]
+       end,
+    LangStr =
+       case Lang of
+           "" -> "";
+           _ -> [" xml:lang='", Lang, "'"]
+       end,
+    Header = io_lib:format(?STREAM_HEADER,
+                          [StateData#state.streamid,
+                           Server,
+                           VersionStr,
+                           LangStr]),
+    send_text(StateData, Header).
+
+send_trailer(StateData) when StateData#state.xml_socket ->
+    (StateData#state.sockmod):send_xml(
+      StateData#state.socket,
+      {xmlstreamend, "stream:stream"});
+send_trailer(StateData) ->
+    send_text(StateData, ?STREAM_TRAILER).
+
 
 new_id() ->
     randoms:get_string().
index 424f67e55ec12e445f945af1ae77101bf2f9015e..5a7a3bf73e77a245c8899d49f17592a03d6bb811 100644 (file)
@@ -205,7 +205,7 @@ handle_cast(_Msg, State) ->
 handle_info({Tag, _TCPSocket, Data},
            #state{socket = Socket,
                   sock_mod = SockMod} = State)
-  when (Tag == tcp) or (Tag == ssl) ->
+  when (Tag == tcp) or (Tag == ssl) or (Tag == ejabberd_xml) ->
     case SockMod of
        tls ->
            case tls:recv_data(Socket, Data) of
@@ -294,6 +294,25 @@ activate_socket(#state{socket = Socket,
            ok
     end.
 
+%% Data processing for connectors directly generating xmlelement in
+%% Erlang data structure.
+%% WARNING: Shaper does not work with Erlang data structure.
+process_data([], State) ->
+    activate_socket(State),
+    State;
+process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
+  when element(1, Element) == xmlelement;
+       element(1, Element) == xmlstreamstart;
+      element(1, Element) == xmlstreamelement;
+       element(1, Element) == xmlstreamend ->
+    if
+       C2SPid == undefined ->
+           State;
+       true ->
+           catch gen_fsm:send_event(C2SPid, element_wrapper(Element)),
+           process_data(Els, State)
+    end;
+%% Data processing for connectors receivind data as string.
 process_data(Data,
             #state{xml_stream_state = XMLStreamState,
                    shaper_state = ShaperState,
@@ -312,6 +331,16 @@ process_data(Data,
     State#state{xml_stream_state = XMLStreamState1,
                shaper_state = NewShaperState}.
 
+%% Element coming from XML parser are wrapped inside xmlstreamelement
+%% When we receive directly xmlelement tuple (from a socket module
+%% speaking directly Erlang XML), we wrap it inside the same
+%% xmlstreamelement coming from the XML parser.
+element_wrapper(XMLElement)
+  when element(1, XMLElement) == xmlelement ->
+    {xmlstreamelement, XMLElement};
+element_wrapper(Element) ->
+    Element.
+
 close_stream(undefined) ->
     ok;
 close_stream(XMLStreamState) ->
index 566ee1e435c655f2ee5ed5ba511c72fa9d180722..d629a77d9aff83d7d7c4c8177e41a15ffef29977 100644 (file)
@@ -37,6 +37,7 @@
         compress/2,
         reset_stream/1,
         send/2,
+        send_xml/2,
         change_shaper/2,
         monitor/1,
         get_sockmod/1,
@@ -62,10 +63,18 @@ start(Module, SockMod, Socket, Opts) ->
                    {value, {_, Size}} -> Size;
                    _ -> infinity
                end,
-           Receiver = ejabberd_receiver:start(Socket, SockMod, none, MaxStanzaSize),
+           {ReceiverMod, Receiver, RecRef} =
+               case catch SockMod:custom_receiver(Socket) of
+                   {receiver, RecMod, RecPid} ->
+                       {RecMod, RecPid, RecMod};
+                   _ -> 
+                       RecPid = ejabberd_receiver:start(
+                                  Socket, SockMod, none, MaxStanzaSize),
+                       {ejabberd_receiver, RecPid, RecPid}
+               end,
            SocketData = #socket_state{sockmod = SockMod,
                                       socket = Socket,
-                                      receiver = Receiver},
+                                      receiver = RecRef},
            case Module:start({?MODULE, SocketData}, Opts) of
                {ok, Pid} ->
                    case SockMod:controlling_process(Socket, Receiver) of
@@ -74,7 +83,7 @@ start(Module, SockMod, Socket, Opts) ->
                        {error, _Reason} ->
                            SockMod:close(Socket)
                    end,
-                   ejabberd_receiver:become_controller(Receiver, Pid);
+                   ReceiverMod:become_controller(Receiver, Pid);
                {error, _Reason} ->
                    SockMod:close(Socket)
            end;
@@ -143,18 +152,33 @@ compress(SocketData, Data) ->
     send(SocketData, Data),
     SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}.
 
-reset_stream(SocketData) ->
-    ejabberd_receiver:reset_stream(SocketData#socket_state.receiver).
+reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) ->
+    ejabberd_receiver:reset_stream(SocketData#socket_state.receiver);
+reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) ->
+    (SocketData#socket_state.receiver):reset_stream(
+      SocketData#socket_state.socket).
 
 send(SocketData, Data) ->
     catch (SocketData#socket_state.sockmod):send(
            SocketData#socket_state.socket, Data).
 
-change_shaper(SocketData, Shaper) ->
-    ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper).
+send_xml(SocketData, Data) ->
+    catch (SocketData#socket_state.sockmod):send_xml(
+           SocketData#socket_state.socket, Data).
 
-monitor(SocketData) ->
-    erlang:monitor(process, SocketData#socket_state.receiver).
+change_shaper(SocketData, Shaper)
+  when is_pid(SocketData#socket_state.receiver) ->
+    ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper);
+change_shaper(SocketData, Shaper)
+  when is_atom(SocketData#socket_state.receiver) ->
+    (SocketData#socket_state.receiver):change_shaper(
+      SocketData#socket_state.socket, Shaper).
+
+monitor(SocketData) when is_pid(SocketData#socket_state.receiver) ->
+    erlang:monitor(process, SocketData#socket_state.receiver);
+monitor(SocketData) when is_atom(SocketData#socket_state.receiver) ->
+    (SocketData#socket_state.receiver):monitor(
+      SocketData#socket_state.socket).
 
 get_sockmod(SocketData) ->
     SocketData#socket_state.sockmod.
index 68bb8e383f8e946b0dc7bceaa16c78caf4646c15..336d4171ee27a0cbafb0b22b7ac2cdac39898329 100644 (file)
@@ -4,29 +4,11 @@
 %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as
 %%%           HTTP Binding)
 %%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2009   ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License
-%%% along with this program; if not, write to the Free Software
-%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
-%%% 02111-1307 USA
-%%%
+%%% Modified: may 2009 by Mickael Remond, Alexey Schepin
+%%% Id      : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $
 %%%----------------------------------------------------------------------
 
 -module(ejabberd_http_bind).
--author('steve@zeank.in-berlin.de').
 
 -behaviour(gen_fsm).
 
         handle_info/3,
         terminate/3,
         send/2,
-        setopts/2,
+        send_xml/2,
         sockname/1,
         peername/1,
+        setopts/2,
         controlling_process/2,
+        become_controller/2,
+        custom_receiver/1,
+        reset_stream/1,
+        change_shaper/2,
+        monitor/1,
         close/1,
         process_request/2]).
 
--define(ejabberd_debug, true).
-
 -include("ejabberd.hrl").
 -include("jlib.hrl").
 -include("ejabberd_http.hrl").
@@ -59,7 +45,6 @@
 %% http binding request
 -record(hbr, {rid,
              key,
-             in,
              out}).
 
 -record(state, {id,
                key,
                socket,
                output = "",
-               input = "",
+               input = queue:new(),
                waiting_input = false,
+               shaper_state,
+               shaper_timer,
                last_receiver,
                last_poll,
                http_receiver,
                ctime = 0,
                timer,
                pause=0,
-               unprocessed_req_list = [], % list of request that have been delayed for proper reordering
+               unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID}
                req_list = [], % list of requests (cache)
                max_inactivity,
+               max_pause,
                ip = ?NULL_PEER
               }).
 
+%% Internal request format:
+-record(http_put, {rid,
+                  attrs,
+                  payload,
+                  payload_size,
+                  hold,
+                  stream,
+                  ip}).
 
 %%-define(DBGFSM, true).
-
 -ifdef(DBGFSM).
 -define(FSMOPTS, [{debug, [trace]}]).
 -else.
 -define(FSMOPTS, []).
 -endif.
 
--define(BOSH_VERSION, "1.6").
+-define(BOSH_VERSION, "1.8").
 -define(NS_CLIENT, "jabber:client").
 -define(NS_BOSH, "urn:xmpp:xbosh").
 -define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind").
@@ -128,6 +123,9 @@ start_link(Sid, Key, IP) ->
 send({http_bind, FsmRef, _IP}, Packet) ->
     gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}).
 
+send_xml({http_bind, FsmRef, _IP}, Packet) ->
+    gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}).
+
 setopts({http_bind, FsmRef, _IP}, Opts) ->
     case lists:member({active, once}, Opts) of
        true ->
@@ -139,6 +137,21 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
 controlling_process(_Socket, _Pid) ->
     ok.
 
+custom_receiver({http_bind, FsmRef, _IP}) ->
+    {receiver, ?MODULE, FsmRef}.
+
+become_controller(FsmRef, C2SPid) ->
+    gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}).
+
+reset_stream({http_bind, _FsmRef, _IP}) ->
+    ok.
+
+change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
+    gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
+
+monitor({http_bind, FsmRef, _IP}) ->
+    erlang:monitor(process, FsmRef).
+
 close({http_bind, FsmRef, _IP}) ->
     catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}).
 
@@ -148,8 +161,19 @@ sockname(_Socket) ->
 peername({http_bind, _FsmRef, IP}) ->
     {ok, IP}.
 
+
+%% Entry point for data coming from client through ejabberd HTTP server:
 process_request(Data, IP) ->
-    case catch parse_request(Data) of
+    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
+    Opts = [{xml_socket, true} | Opts1],
+    MaxStanzaSize =
+       case lists:keysearch(max_stanza_size, 1, Opts) of
+           {value, {_, Size}} -> Size;
+           _ -> infinity
+       end,
+    PayloadSize = iolist_size(Data),
+    case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
+       %% No existing session:
        {ok, {"", Rid, Attrs, Payload}} ->
            case xml:get_attr_s("to",Attrs) of
                 "" ->
@@ -166,11 +190,13 @@ process_request(Data, IP) ->
                             "condition='internal-server-error' "
                             "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body"};
                        {ok, Pid} ->
-                           handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP)
+                           handle_session_start(
+                             Pid, XmppDomain, Sid, Rid, Attrs,
+                             Payload, PayloadSize, IP)
                    end
             end;
+       %% Existing session
         {ok, {Sid, Rid, Attrs, Payload1}} ->
-            %% old session
             StreamStart =
                 case xml:get_attr_s("xmpp:restart",Attrs) of
                     "true" ->
@@ -181,17 +207,21 @@ process_request(Data, IP) ->
             Payload2 = case xml:get_attr_s("type",Attrs) of
                            "terminate" ->
                                %% close stream
-                               Payload1 ++ "</stream:stream>";
+                               Payload1 ++ [{xmlstreamend, "stream:stream"}];
                            _ ->
                                Payload1
                        end,
-            handle_http_put(Sid, Rid, Attrs, Payload2, StreamStart, IP);
+            handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
+                           StreamStart, IP);
+        {error, size_limit} ->
+            {413, ?HEADER, "Request Too Large"};
         _ ->
-           ?ERROR_MSG("Received bad request: ~p", [Data]),
+           ?DEBUG("Received bad request: ~p", [Data]),
             {400, ?HEADER, ""}
     end.
 
-handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) ->
+handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
+                    Payload, PayloadSize, IP) ->
     ?DEBUG("got pid: ~p", [Pid]),
     Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of
               {error, _} ->
@@ -235,7 +265,7 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) ->
                           version = Version
                          })
       end),
-    handle_http_put(Sid, Rid, Attrs, Payload, true, IP).
+    handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).
 
 %%%----------------------------------------------------------------------
 %%% Callback functions from gen_fsm
@@ -257,58 +287,46 @@ init([Sid, Key, IP]) ->
     %% each connector. The default behaviour should be however to use
     %% the default c2s restrictions if not defined for the current
     %% connector.
-    Opts = ejabberd_c2s_config:get_c2s_limits(),
+    Opts1 = ejabberd_c2s_config:get_c2s_limits(),
+    Opts = [{xml_socket, true} | Opts1],
 
+    Shaper = none,
+    ShaperState = shaper:new(Shaper),
     Socket = {http_bind, self(), IP},
     ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts),
     Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []),
     {ok, loop, #state{id = Sid,
                      key = Key,
                      socket = Socket,
+                     shaper_state = ShaperState,
                      max_inactivity = ?MAX_INACTIVITY,
+                     max_pause = ?MAX_PAUSE,
                      timer = Timer}}.
 
-%%----------------------------------------------------------------------
-%% Func: StateName/2
-%% Returns: {next_state, NextStateName, NextStateData}          |
-%%          {next_state, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}
-%%----------------------------------------------------------------------
-
-
-%%----------------------------------------------------------------------
-%% Func: StateName/3
-%% Returns: {next_state, NextStateName, NextStateData}            |
-%%          {next_state, NextStateName, NextStateData, Timeout}   |
-%%          {reply, Reply, NextStateName, NextStateData}          |
-%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
-%%          {stop, Reason, NewStateData}                          |
-%%          {stop, Reason, Reply, NewStateData}
-%%----------------------------------------------------------------------
-%state_name(Event, From, StateData) ->
-%    Reply = ok,
-%    {reply, Reply, state_name, StateData}.
-
 %%----------------------------------------------------------------------
 %% Func: handle_event/3
 %% Returns: {next_state, NextStateName, NextStateData}          |
 %%          {next_state, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
-handle_event({activate, From}, StateName, StateData) ->
+handle_event({become_controller, C2SPid}, StateName, StateData) ->
     case StateData#state.input of
-       "" ->
+       cancel ->
            {next_state, StateName, StateData#state{
-                                    waiting_input = {From, ok}}};
+                                     waiting_input = C2SPid}};
        Input ->
-            Receiver = From,
-           Receiver ! {tcp, StateData#state.socket, list_to_binary(Input)},
+           lists:foreach(
+             fun(Event) ->
+                     C2SPid ! Event
+             end, queue:to_list(Input)),
            {next_state, StateName, StateData#state{
-                                    input = "",
-                                    waiting_input = false,
-                                    last_receiver = Receiver}}
+                                     input = queue:new(),
+                                     waiting_input = C2SPid}}
     end;
 
+handle_event({change_shaper, Shaper}, StateName, StateData) ->
+    NewShaperState = shaper:new(Shaper),
+    {next_state, StateName, StateData#state{shaper_state = NewShaperState}};
 handle_event(_Event, StateName, StateData) ->
     {next_state, StateName, StateData}.
 
@@ -321,37 +339,34 @@ handle_event(_Event, StateName, StateData) ->
 %%          {stop, Reason, NewStateData}                          |
 %%          {stop, Reason, Reply, NewStateData}
 %%----------------------------------------------------------------------
-handle_sync_event({send, Packet}, _From, StateName, StateData) ->
-    Output = [StateData#state.output | Packet],
-    if
-       StateData#state.http_receiver /= undefined ->
-           cancel_timer(StateData#state.timer),
-           Timer = if
-                       StateData#state.pause > 0 ->
-                           erlang:start_timer(
-                             StateData#state.pause*1000, self(), []);
-                       true ->
-                           erlang:start_timer(
-                             StateData#state.max_inactivity, self(), [])
-                   end,
-           HTTPReply = case Output of
-                           [[]| OutPacket] ->
-                               {ok, OutPacket};
-                           _ ->
-                               {ok, Output}
-                       end,
-           gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
-           cancel_timer(StateData#state.wait_timer),
-           Reply = ok,
-           {reply, Reply, StateName,
-            StateData#state{output = [],
-                            http_receiver = undefined,
-                            wait_timer = undefined,
-                            timer = Timer}};
-       true ->
-           Reply = ok,
-           {reply, Reply, StateName, StateData#state{output = Output}}
-    end;
+handle_sync_event({send_xml, Packet}, _From, StateName,
+                 #state{http_receiver = undefined} = StateData) ->
+    Output = [Packet | StateData#state.output],
+    Reply = ok,
+    {reply, Reply, StateName, StateData#state{output = Output}};
+handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
+    Output = [Packet | StateData#state.output],
+    cancel_timer(StateData#state.timer),
+    Timer = set_inactivity_timer(StateData#state.pause,
+                                StateData#state.max_inactivity),
+    HTTPReply = {ok, Output},
+    gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
+    cancel_timer(StateData#state.wait_timer),
+    Rid = StateData#state.rid,
+    ReqList = [#hbr{rid = Rid,
+                   key = StateData#state.key,
+                   out = Output
+                  } |
+              [El || El <- StateData#state.req_list,
+                     El#hbr.rid /= Rid ]
+             ],
+    Reply = ok,
+    {reply, Reply, StateName,
+     StateData#state{output = [],
+                    http_receiver = undefined,
+                    req_list = ReqList,
+                    wait_timer = undefined,
+                    timer = Timer}};
 
 handle_sync_event({stop,close}, _From, _StateName, StateData) ->
     Reply = ok,
@@ -360,130 +375,82 @@ handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) ->
     Reply = ok,
     {stop, normal, Reply, StateData};
 handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
-    ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
+    ?ERROR_MSG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
     Reply = ok,
     {stop, normal, Reply, StateData};
 
-
 %% HTTP PUT: Receive packets from the client
-handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request,
-                 _From, StateName, StateData) ->
-    %% Check if Rid valid
-    RidAllow =
-       case StateData#state.rid of
-           none ->
-               %% First request - nothing saved so far
-               {true, 0};
-           OldRid ->
-               ?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]),
-               if
-                   %% We did not miss any packet, we can process it immediately:
-                   Rid == OldRid + 1 ->
-                   case catch list_to_integer(
-                                xml:get_attr_s("pause", Attrs)) of
-                       {'EXIT', _} ->
-                           {true, 0};
-                       Pause1 when Pause1 =< ?MAX_PAUSE ->
-                           ?DEBUG("got pause: ~p", [Pause1]),
-                           {true, Pause1};
-                       _ ->
-                           {true, 0}
-                   end;
-                   %% We have missed packets, we need to cached it to process it later on:
-                   (OldRid < Rid) and
-                   (Rid =< (OldRid + Hold + 1)) ->
-                       buffer;
-                   (Rid =< OldRid) and
-                   (Rid > OldRid - Hold - 1) ->
-                       repeat;
-                   true ->
-                       false
-               end
+handle_sync_event(#http_put{rid = Rid},
+                 _From, StateName, StateData)
+  when StateData#state.shaper_timer /= undefined ->
+    Pause =
+       case erlang:read_timer(StateData#state.shaper_timer) of
+           false ->
+               0;
+           P -> P
        end,
+    Reply = {wait, Pause},
+    ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
+    {reply, Reply, StateName, StateData};
 
-    %% Check if Rid is in sequence or out of sequence:
-    case RidAllow of
-       buffer ->
-           ?DEBUG("Buffered request: ~p", [Request]),
-           %% Request is out of sequence:
-           PendingRequests = StateData#state.unprocessed_req_list,
-           %% In case an existing RID was already buffered:
-           Requests = lists:keydelete(Rid, 2, PendingRequests),
-           {reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}};
-       _ ->
-           %% Request is in sequence:
-           process_http_put(Request, StateName, StateData, RidAllow)
-    end;
+handle_sync_event(#http_put{rid = _Rid, attrs = _Attrs,
+                           payload_size = PayloadSize,
+                           hold = _Hold} = Request,
+                 _From, StateName, StateData) ->
+    ?DEBUG("New request: ~p",[Request]),
+    %% Updating trafic shaper
+    {NewShaperState, NewShaperTimer} =
+       update_shaper(StateData#state.shaper_state, PayloadSize),
+
+    handle_http_put_event(Request, StateName,
+                         StateData#state{shaper_state = NewShaperState,
+                                         shaper_timer = NewShaperTimer});
 
 %% HTTP GET: send packets to the client
 handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
     %% setup timer
-    if
-       StateData#state.http_receiver /= undefined ->
-           gen_fsm:reply(StateData#state.http_receiver, {ok, empty});
-       true ->
-           ok
-    end,
+    send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
     cancel_timer(StateData#state.wait_timer),
-    {TMegSec, TSec, TMSec} = now(),
-    TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
+    TNow = tnow(),
     if
        (Hold > 0) and
-       (StateData#state.output == "") and
+       (StateData#state.output == []) and
        ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and
        (StateData#state.rid == Rid) and
-       (StateData#state.input /= "cancel") and
+       (StateData#state.input /= cancel) and
         (StateData#state.pause == 0) ->
            WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
+           %% MR: Not sure we should cancel the state timer here.
            cancel_timer(StateData#state.timer),
            {next_state, StateName, StateData#state{
                                      http_receiver = From,
                                      wait_timer = WaitTimer,
                                      timer = undefined}};
-       (StateData#state.input == "cancel") ->
+       (StateData#state.input == cancel) ->
            cancel_timer(StateData#state.timer),
-           Timer = if
-                       StateData#state.pause > 0 ->
-                           erlang:start_timer(
-                             StateData#state.pause*1000, self(), []);
-                       true ->
-                           erlang:start_timer(
-                             StateData#state.max_inactivity, self(), [])
-                   end,
+           Timer = set_inactivity_timer(StateData#state.pause,
+                                        StateData#state.max_inactivity),
            Reply = {ok, cancel},
            {reply, Reply, StateName, StateData#state{
-                                       input = "",
+                                       input = queue:new(),
                                        http_receiver = undefined,
                                        wait_timer = undefined,
                                        timer = Timer}};
        true ->
            cancel_timer(StateData#state.timer),
-           Timer = if
-                       StateData#state.pause > 0 ->
-                           erlang:start_timer(
-                             StateData#state.pause*1000, self(), []);
-                       true ->
-                           erlang:start_timer(
-                             StateData#state.max_inactivity, self(), [])
-                   end,
-           case StateData#state.output of
-               [[]| OutPacket] ->
-                   Reply = {ok, OutPacket};
-               _ ->
-                   Reply = {ok, StateData#state.output}
-           end,
+           Timer = set_inactivity_timer(StateData#state.pause,
+                                        StateData#state.max_inactivity),
+           Reply = {ok, StateData#state.output},
            %% save request
-           ReqList = [#hbr{rid=Rid,
-                           key=StateData#state.key,
-                           in=StateData#state.input,
-                           out=StateData#state.output
+           ReqList = [#hbr{rid = Rid,
+                           key = StateData#state.key,
+                           out = StateData#state.output
                           } |
                       [El || El <- StateData#state.req_list,
                              El#hbr.rid /= Rid ]
                      ],
            {reply, Reply, StateName, StateData#state{
-                                       input = "",
-                                       output = "",
+                                       output = [],
                                        http_receiver = undefined,
                                        wait_timer = undefined,
                                        timer = Timer,
@@ -507,6 +474,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
 %%          {next_state, NextStateName, NextStateData, Timeout} |
 %%          {stop, Reason, NewStateData}
 %%----------------------------------------------------------------------
+%% We reached the max_inactivity timeout:
 handle_info({timeout, Timer, _}, _StateName,
            #state{id=SID, timer = Timer} = StateData) ->
     ?WARNING_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]),
@@ -517,23 +485,30 @@ handle_info({timeout, WaitTimer, _}, StateName,
     if
        StateData#state.http_receiver /= undefined ->
            cancel_timer(StateData#state.timer),
-           Timer = if
-                       StateData#state.pause > 0 ->
-                           erlang:start_timer(
-                             StateData#state.pause*1000, self(), []);
-                       true ->
-                           erlang:start_timer(
-                             StateData#state.max_inactivity, self(), [])
-                   end,
+           Timer = set_inactivity_timer(StateData#state.pause,
+                                        StateData#state.max_inactivity),
            gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
+           Rid = StateData#state.rid,
+           ReqList = [#hbr{rid = Rid,
+                           key = StateData#state.key,
+                           out = []
+                          } |
+                      [El || El <- StateData#state.req_list,
+                             El#hbr.rid /= Rid ]
+                     ],
            {next_state, StateName,
             StateData#state{http_receiver = undefined,
+                            req_list = ReqList,
                             wait_timer = undefined,
                             timer = Timer}};
        true ->
            {next_state, StateName, StateData}
     end;
 
+handle_info({timeout, ShaperTimer, _}, StateName,
+           #state{shaper_timer = ShaperTimer} = StateData) ->
+    {next_state, StateName, StateData#state{shaper_timer = undefined}};
+
 handle_info(_, StateName, StateData) ->
     {next_state, StateName, StateData}.
 
@@ -548,19 +523,12 @@ terminate(_Reason, _StateName, StateData) ->
       fun() ->
              mnesia:delete({http_bind, StateData#state.id})
       end),
-    if
-       StateData#state.http_receiver /= undefined ->
-           gen_fsm:reply(StateData#state.http_receiver, {ok, terminate});
-       true ->
-           ok
-    end,
+    send_receiver_reply(StateData#state.http_receiver, {ok, terminate}),
     case StateData#state.waiting_input of
        false ->
-           case StateData#state.last_receiver of
-               undefined -> ok;
-               Receiver -> Receiver ! {tcp_closed, StateData#state.socket}
-           end;
-       {Receiver, _Tag} -> Receiver ! {tcp_closed, StateData#state.socket}
+           ok;
+       C2SPid ->
+           gen_fsm:send_event(C2SPid, closed)
     end,
     ok.
 
@@ -569,8 +537,47 @@ terminate(_Reason, _StateName, StateData) ->
 %%%----------------------------------------------------------------------
 
 %% PUT / Get processing:
-process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
+handle_http_put_event(#http_put{rid = Rid, attrs = Attrs,
+                               hold = Hold} = Request,
+                     StateName, StateData) ->
+    ?DEBUG("New request: ~p",[Request]),
+    %% Check if Rid valid
+    RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold,
+                        StateData#state.max_pause),
+
+    %% Check if Rid is in sequence or out of sequence:
+    case RidAllow of
+       buffer ->
+           ?DEBUG("Buffered request: ~p", [Request]),
+           %% Request is out of sequence:
+           PendingRequests = StateData#state.unprocessed_req_list,
+           %% In case an existing RID was already buffered:
+           Requests = lists:keydelete(Rid, 2, PendingRequests),
+           ReqList = [#hbr{rid = Rid,
+                           key = StateData#state.key,
+                           out = []
+                          } |
+                      [El || El <- StateData#state.req_list,
+                             El#hbr.rid > (Rid - 1 - Hold)]
+                     ],
+           ?DEBUG("reqlist: ~p", [ReqList]),
+           UnprocessedReqList = [Request | Requests],
+           cancel_timer(StateData#state.timer),
+           Timer = set_inactivity_timer(0, StateData#state.max_inactivity),
+           {reply, buffered, StateName,
+            StateData#state{unprocessed_req_list = UnprocessedReqList,
+                            req_list = ReqList,
+                            timer = Timer}};
+       _ ->
+           %% Request is in sequence:
+           process_http_put(Request, StateName, StateData, RidAllow)
+    end.
+
+process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload,
+                          hold = Hold,  stream = StreamTo,
+                          ip = IP} = Request,
                 StateName, StateData, RidAllow) ->
+    ?DEBUG("Actually processing request: ~p", [Request]),
     %% Check if key valid
     Key = xml:get_attr_s("key", Attrs),
     NewKey = xml:get_attr_s("newkey", Attrs),
@@ -596,16 +603,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
                        end
                end
        end,
-    {TMegSec, TSec, TMSec} = now(),
-    TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
+    TNow = tnow(),
     LastPoll = if
-                  Payload == "" ->
+                  Payload == [] ->
                       TNow;
                   true ->
                       0
               end,
     if
-       (Payload == "") and
+       (Payload == []) and
         (Hold == 0) and
        (TNow - StateData#state.last_poll < ?MIN_POLLING) ->
            Reply = {error, polling_too_frequently},
@@ -618,17 +624,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
                repeat ->
                    ?DEBUG("REPEATING ~p", [Rid]),
                    Reply = case [El#hbr.out ||
-                                     El <- StateData#state.req_list,
+                                    El <- StateData#state.req_list,
                                     El#hbr.rid == Rid] of
                                [] ->
                                    {error, not_exists};
-                               [ [[] | Out] | _XS] ->
-                                   {repeat, Out};
                                [Out | _XS] ->
-                                   {repeat, Out}
-                   end,
-                   {reply, Reply, StateName,
-                    StateData#state{input = "cancel", last_poll = LastPoll}};
+                                   {repeat, lists:reverse(Out)}
+                           end,
+                   {reply, Reply, StateName, StateData#state{input = cancel,
+                                                             last_poll = LastPoll}};
                {true, Pause} ->
                    SaveKey = if
                                  NewKey == "" ->
@@ -639,30 +643,33 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
                    ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
 
                    %% save request
-                   ReqList = [#hbr{rid=Rid,
-                                   key=StateData#state.key,
-                                   in=StateData#state.input,
-                                   out=StateData#state.output
-                                  } |
-                              [El || El <- StateData#state.req_list,
-                                     El#hbr.rid < Rid,
-                                     El#hbr.rid > (Rid - 1 - Hold)]
-                             ],
+                   ReqList1 =
+                       [El || El <- StateData#state.req_list,
+                              El#hbr.rid > (Rid - 1 - Hold)],
+                   ReqList =
+                       case lists:keymember(Rid, #hbr.rid, ReqList1) of
+                           true ->
+                               ReqList1;
+                           false ->
+                               [#hbr{rid = Rid,
+                                     key = StateData#state.key,
+                                     out = []
+                                    } |
+                                ReqList1
+                               ]
+                       end,
                    ?DEBUG("reqlist: ~p", [ReqList]),
 
                     %% setup next timer
                    cancel_timer(StateData#state.timer),
-                   Timer = if
-                               Pause > 0 ->
-                                   erlang:start_timer(
-                                     Pause*1000, self(), []);
-                               true ->
-                                   erlang:start_timer(
-                                     StateData#state.max_inactivity, self(), [])
-                           end,
+                   Timer = set_inactivity_timer(Pause,
+                                                StateData#state.max_inactivity),
                    case StateData#state.waiting_input of
                        false ->
-                           Input = Payload ++ [StateData#state.input],
+                           Input =
+                               lists:foldl(
+                                 fun queue:in/2,
+                                 StateData#state.input, Payload),
                            Reply = ok,
                            process_buffered_request(Reply, StateName,
                                                     StateData#state{input = Input,
@@ -675,32 +682,42 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
                                                                     req_list = ReqList,
                                                                     ip = IP
                                                                    });
-                       {Receiver, _Tag} ->
-                            SendPacket =
-                                case StreamTo of
-                                    {To, ""} ->
-                                        ["<stream:stream to='", To, "' "
-                                         "xmlns='"++?NS_CLIENT++"' "
-                                         "xmlns:stream='"++?NS_STREAM++"'>"]
-                                            ++ Payload;
-                                    {To, Version} ->
-                                        ["<stream:stream to='", To, "' "
-                                         "xmlns='"++?NS_CLIENT++"' "
-                                         "version='", Version, "' "
-                                         "xmlns:stream='"++?NS_STREAM++"'>"]
-                                            ++ Payload;
-                                    _ ->
-                                        Payload
-                                end,
+                       C2SPid ->
+                           case StreamTo of
+                               {To, ""} ->
+                                   gen_fsm:send_event(
+                                     C2SPid,
+                                     {xmlstreamstart, "stream:stream",
+                                      [{"to", To},
+                                       {"xmlns", ?NS_CLIENT},
+                                       {"xmlns:stream", ?NS_STREAM}]});
+                               {To, Version} ->
+                                   gen_fsm:send_event(
+                                     C2SPid,
+                                     {xmlstreamstart, "stream:stream",
+                                      [{"to", To},
+                                       {"xmlns", ?NS_CLIENT},
+                                       {"version", Version},
+                                       {"xmlns:stream", ?NS_STREAM}]});
+                               _ ->
+                                   ok
+                           end,
+
                             MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity),
-                            ?DEBUG("really sending now: ~s", [SendPacket]),
-                           Receiver ! {tcp, StateData#state.socket,
-                                       list_to_binary(SendPacket)},
+                            MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause),
+
+                           ?DEBUG("really sending now: ~p", [Payload]),
+                           lists:foreach(
+                             fun({xmlstreamend, End}) ->
+                                     gen_fsm:send_event(
+                                       C2SPid, {xmlstreamend, End});
+                                (El) ->
+                                     gen_fsm:send_event(
+                                       C2SPid, {xmlstreamelement, El})
+                             end, Payload),
                            Reply = ok,
                            process_buffered_request(Reply, StateName,
-                                                    StateData#state{waiting_input = false,
-                                                                    last_receiver = Receiver,
-                                                                    input = "",
+                                                    StateData#state{input = queue:new(),
                                                                     rid = Rid,
                                                                     key = SaveKey,
                                                                     ctime = TNow,
@@ -709,6 +726,7 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
                                                                     last_poll = LastPoll,
                                                                     req_list = ReqList,
                                                                     max_inactivity = MaxInactivity,
+                                                                    max_pause = MaxPause,
                                                                     ip = IP
                                                                    })
                    end
@@ -724,29 +742,42 @@ process_buffered_request(Reply, StateName, StateData) ->
     case lists:keysearch(Rid+1, 2, Requests) of
        {value, Request} ->
            ?DEBUG("Processing buffered request: ~p", [Request]),
-           NewRequests = Requests -- [Request],
-           handle_sync_event(Request, undefined, StateName,
-                             StateData#state{unprocessed_req_list=NewRequests});
+           NewRequests = lists:keydelete(Rid+1, 2, Requests),
+           handle_http_put_event(
+             Request, StateName,
+             StateData#state{unprocessed_req_list = NewRequests});
        _ ->
            {reply, Reply, StateName, StateData}
     end.
 
-handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
-    case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of
+handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
+    case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of
         {error, not_exists} ->
-            ?DEBUG("no session associated with sid: ~p", [Sid]),
+            ?ERROR_MSG("no session associated with sid: ~p", [Sid]),
             {404, ?HEADER, ""};
         {{error, Reason}, Sess} ->
-            ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
+            ?ERROR_MSG("Error on HTTP put. Reason: ~p", [Reason]),
             handle_http_put_error(Reason, Sess);
         {{repeat, OutPacket}, Sess} ->
             ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]),
             send_outpacket(Sess, OutPacket);
+        {{wait, Pause}, _Sess} ->
+           ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
+           timer:sleep(Pause),
+            %{200, ?HEADER,
+            % xml:element_to_string(
+            %   {xmlelement, "body",
+            %    [{"xmlns", ?NS_HTTP_BIND},
+            %     {"type", "error"}], []})};
+            handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
+                           StreamStart, IP);
+        {buffered, _Sess} ->
+            {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
         {ok, Sess} ->
             prepare_response(Sess, Rid, Attrs, StreamStart)
     end.
 
-http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
+http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
     ?DEBUG("Looking for session: ~p", [Sid]),
     case mnesia:dirty_read({http_bind, Sid}) of
        [] ->
@@ -760,7 +791,9 @@ http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
                         ""
                 end,
             {gen_fsm:sync_send_all_state_event(
-               FsmRef, {http_put, Rid, Attrs, Payload, Hold, NewStream, IP}), Sess}
+               FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload,
+                                payload_size = PayloadSize, hold = Hold,
+                                stream = NewStream, ip = IP}, 30000), Sess}
     end.
 
 handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version})
@@ -803,6 +836,46 @@ handle_http_put_error(Reason, #http_bind{pid=FsmRef}) ->
             {403, ?HEADER, ""}
     end.
 
+%% Control RID ordering
+rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
+    %% First request - nothing saved so far
+    {true, 0};
+rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
+    ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]),
+    if
+       %% We did not miss any packet, we can process it immediately:
+       NewRid == OldRid + 1 ->
+           case catch list_to_integer(
+                        xml:get_attr_s("pause", Attrs)) of
+               {'EXIT', _} ->
+                   {true, 0};
+               Pause1 when Pause1 =< MaxPause ->
+                   ?DEBUG("got pause: ~p", [Pause1]),
+                   {true, Pause1};
+               _ ->
+                   {true, 0}
+           end;
+       %% We have missed packets, we need to cached it to process it later on:
+       (OldRid < NewRid) and
+       (NewRid =< (OldRid + Hold + 1)) ->
+           buffer;
+       (NewRid =< OldRid) and
+       (NewRid > OldRid - Hold - 1) ->
+           repeat;
+       true ->
+           false
+    end.
+
+update_shaper(ShaperState, PayloadSize) ->
+    {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize),
+    if
+       Pause > 0 ->
+           ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled
+           {NewShaperState, ShaperTimer};
+       true ->
+           {NewShaperState, undefined}
+    end.
+
 prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess,
                  Rid, _, StreamStart) ->
     receive after 100 -> ok end, %% TODO: Why is this needed. Argh. Bad programming practice.
@@ -816,62 +889,52 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess,
             {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
        {ok, terminate} ->
             {200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"};
-       {ok, OutPacket} ->
-            ?DEBUG("OutPacket: ~s", [OutPacket]),
+       {ok, ROutPacket} ->
+           OutPacket = lists:reverse(ROutPacket),
+            ?DEBUG("OutPacket: ~p", [OutPacket]),
            case StreamStart of
                 false ->
                    send_outpacket(Sess, OutPacket);
                true ->
-                   OutEls =
-                        case xml_stream:parse_element(
-                               OutPacket++"</stream:stream>") of
-                            El when element(1, El) == xmlelement ->
-                                ?DEBUG("~p", [El]),
-                                {xmlelement, _, OutAttrs, Els} = El,
-                                AuthID = xml:get_attr_s("id", OutAttrs),
-                                From = xml:get_attr_s("from", OutAttrs),
-                                Version = xml:get_attr_s("version", OutAttrs),
-                                StreamError = false,
-                                case Els of
-                                    [] ->
-                                        [];
-                                    [{xmlelement, "stream:features",
-                                      StreamAttribs, StreamEls}
-                                     | StreamTail] ->
-                                        [{xmlelement, "stream:features",
-                                          [{"xmlns:stream",
-                                            ?NS_STREAM}
-                                          ]
-                                          ++ StreamAttribs,
-                                          StreamEls
-                                         }] ++ StreamTail;
-                                    Xml ->
-                                        Xml
-                                end;
-                            {error, _} ->
-                                AuthID = "",
-                                From = "",
-                                Version = "",
-                                StreamError = true,
-                                []
-                        end,
-                   if
-                       StreamError == true ->
-                           {200, ?HEADER, "<body type='terminate' "
-                            "condition='host-unknown' "
-                            "xmlns='"++?NS_HTTP_BIND++"'/>"};
-                       true ->
-                            BOSH_attribs =
-                                [{"authid", AuthID},
-                                 {"xmlns:xmpp", ?NS_BOSH},
-                                 {"xmlns:stream", ?NS_STREAM}] ++
-                                case OutEls of
-                                    [] ->
-                                        [];
-                                    _ ->
-                                        [{"xmpp:version", Version}]
-                                end,
-                            MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
+                   case OutPacket of
+                       [{xmlstreamstart, _, OutAttrs} | Els] ->
+                           AuthID = xml:get_attr_s("id", OutAttrs),
+                           From = xml:get_attr_s("from", OutAttrs),
+                           Version = xml:get_attr_s("version", OutAttrs),
+                           OutEls =
+                               case Els of
+                                   [] ->
+                                       [];
+                                   [{xmlstreamelement,
+                                     {xmlelement, "stream:features",
+                                      StreamAttribs, StreamEls}}
+                                    | StreamTail] ->
+                                       TypedTail =
+                                           [check_default_xmlns(OEl) ||
+                                               {xmlstreamelement, OEl} <-
+                                                   StreamTail],
+                                       [{xmlelement, "stream:features",
+                                         [{"xmlns:stream",
+                                           ?NS_STREAM}] ++
+                                         StreamAttribs, StreamEls}] ++
+                                           TypedTail;
+                                   StreamTail ->
+                                       [check_default_xmlns(OEl) ||
+                                           {xmlstreamelement, OEl} <-
+                                               StreamTail]
+                               end,
+                           BOSH_attribs =
+                               [{"authid", AuthID},
+                                {"xmlns:xmpp", ?NS_BOSH},
+                                {"xmlns:stream", ?NS_STREAM}] ++
+                               case OutEls of
+                                   [] ->
+                                       [];
+                                   _ ->
+                                       [{"xmpp:version", Version}]
+                               end,
+                           MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
+                           MaxPause = get_max_pause(To),
                            {200, ?HEADER,
                             xml:element_to_string(
                               {xmlelement,"body",
@@ -882,16 +945,20 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess,
                                 {"requests", integer_to_list(Hold+1)},
                                 {"inactivity",
                                  integer_to_list(
-                                    trunc(MaxInactivity/1000))},
-                                 {"maxpause",
-                                  integer_to_list(?MAX_PAUSE)},
+                                   trunc(MaxInactivity/1000))},
+                                {"maxpause",
+                                 integer_to_list(MaxPause)},
                                 {"polling",
-                                  integer_to_list(
-                                    trunc(?MIN_POLLING/1000000))},
-                                 {"ver", ?BOSH_VERSION},
-                                 {"from", From},
-                                 {"secure", "true"} %% we're always being secure
-                               ] ++ BOSH_attribs,OutEls})}
+                                 integer_to_list(
+                                   trunc(?MIN_POLLING/1000000))},
+                                {"ver", ?BOSH_VERSION},
+                                {"from", From},
+                                {"secure", "true"} %% we're always being secure
+                               ] ++ BOSH_attribs,OutEls})};
+                       {error, _} ->
+                           {200, ?HEADER, "<body type='terminate' "
+                            "condition='host-unknown' "
+                            "xmlns='"++?NS_HTTP_BIND++"'/>"}
                    end
            end;
        {'EXIT', {shutdown, _}} ->
@@ -906,77 +973,85 @@ http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) ->
 
 send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
     case OutPacket of
-       "" ->
+       [] ->
            {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
-       "</stream:stream>" ->
+       [{xmlstreamend, _}] ->
             gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}),
            {200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
        _ ->
-           case xml_stream:parse_element("<body>"
-                                         ++ OutPacket
-                                         ++ "</body>")
-               of
-               El when element(1, El) == xmlelement ->
-                   {xmlelement, _, _, OEls} = El,
+           %% TODO: We parse to add a default namespace to packet,
+           %% The spec says adding the jabber:client namespace if
+           %% mandatory, even if some implementation do not do that
+           %% change on packets.
+           %% I think this should be an option to avoid modifying
+           %% packet in most case.
+           AllElements =
+               lists:all(fun({xmlstreamelement,
+                              {xmlelement, "stream:error", _, _}}) -> false;
+                            ({xmlstreamelement, _}) -> true;
+                            (_) -> false
+                         end, OutPacket),
+           case AllElements of
+               true ->
                    TypedEls = [check_default_xmlns(OEl) ||
-                                  OEl <- OEls],
+                                  {xmlstreamelement, OEl} <- OutPacket],
+                   Body = xml:element_to_string(
+                            {xmlelement,"body",
+                             [{"xmlns",
+                               ?NS_HTTP_BIND}],
+                             TypedEls}),
                    ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n",
-                          [xml:element_to_string(
-                             {xmlelement,"body",
-                              [{"xmlns",
-                                ?NS_HTTP_BIND}],
-                              TypedEls})]
-                         ),
-                   {200, ?HEADER,
-                    xml:element_to_string(
-                      {xmlelement,"body",
-                       [{"xmlns",
-                         ?NS_HTTP_BIND}],
-                       TypedEls})};
-               {error, _E} ->
-                   OutEls = case xml_stream:parse_element(
-                                    OutPacket++"</stream:stream>") of
-                                 SEl when element(1, SEl) == xmlelement ->
-                                     {xmlelement, _, _OutAttrs, SEls} = SEl,
-                                     StreamError = false,
-                                     case SEls of
-                                         [] ->
-                                             [];
-                                         [{xmlelement,
-                                           "stream:features",
-                                           StreamAttribs, StreamEls} |
-                                          StreamTail] ->
-                                             TypedTail =
-                                                 [check_default_xmlns(OEl) ||
-                                                    OEl <- StreamTail],
-                                             [{xmlelement,
-                                               "stream:features",
-                                               [{"xmlns:stream",
-                                                 ?NS_STREAM}] ++
-                                               StreamAttribs, StreamEls}] ++
-                                                 TypedTail;
-                                         Xml ->
-                                             Xml
-                                     end;
-                                 {error, _} ->
-                                     StreamError = true,
-                                     []
-                             end,
-                    if
-                        StreamError ->
+                          [Body]),
+                   {200, ?HEADER, Body};
+               false ->
+                   case OutPacket of
+                       [{xmlstreamstart, _, _} | SEls] ->
+                           OutEls =
+                               case SEls of
+                                   [{xmlstreamelement,
+                                     {xmlelement,
+                                      "stream:features",
+                                      StreamAttribs, StreamEls}} |
+                                    StreamTail] ->
+                                       TypedTail =
+                                           [check_default_xmlns(OEl) ||
+                                               {xmlstreamelement, OEl} <-
+                                                   StreamTail],
+                                       [{xmlelement,
+                                         "stream:features",
+                                         [{"xmlns:stream",
+                                           ?NS_STREAM}] ++
+                                         StreamAttribs, StreamEls}] ++
+                                           TypedTail;
+                                   StreamTail ->
+                                       [check_default_xmlns(OEl) ||
+                                           {xmlstreamelement, OEl} <-
+                                               StreamTail]
+                               end,
+                            {200, ?HEADER,
+                             xml:element_to_string(
+                               {xmlelement,"body",
+                                [{"xmlns",
+                                  ?NS_HTTP_BIND}],
+                                OutEls})};
+                       _ ->
+                           SErrCond =
+                               lists:filter(
+                                 fun({xmlstreamelement,
+                                      {xmlelement, "stream:error",
+                                       _, _}}) ->
+                                         true;
+                                    (_) -> false
+                                 end, OutPacket),
                             StreamErrCond =
-                                case xml_stream:parse_element(
-                                       "<stream:stream>" ++ OutPacket) of
-                                    El when element(1, El) == xmlelement ->
-                                       case xml:get_subtag(El, "stream:error") of
-                                           false ->
-                                               null;
-                                           {xmlelement, _, _, _Cond} = StreamErrorTag ->
-                                               [StreamErrorTag]
-                                       end;
-                                    {error, _E} ->
-                                        null
-                                end,
+                               case SErrCond of
+                                   [] ->
+                                       null;
+                                   [{xmlstreamelement,
+                                     {xmlelement, _, _, _Cond} =
+                                     StreamErrorTag} | _] ->
+                                       [StreamErrorTag]
+                               end,
                             gen_fsm:sync_send_all_state_event(FsmRef,
                                                               {stop, {stream_error,OutPacket}}),
                             case StreamErrCond of
@@ -993,27 +1068,22 @@ send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
                                      "xmlns:stream='"++?NS_STREAM++"'>" ++
                                      elements_to_string(StreamErrCond) ++
                                      "</body>"}
-                            end;
-                        true ->
-                            {200, ?HEADER,
-                             xml:element_to_string(
-                               {xmlelement,"body",
-                                [{"xmlns",
-                                  ?NS_HTTP_BIND}],
-                                OutEls})}
-                    end
+                            end
+                   end
            end
     end.
 
-parse_request(Data) ->
+parse_request(_Data, PayloadSize, MaxStanzaSize)
+  when PayloadSize > MaxStanzaSize ->
+    {error, size_limit};
+parse_request(Data, _PayloadSize, _MaxStanzaSize) ->
     ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]),
+    %% MR: I do not think it works if put put several elements in the
+    %% same body:
     case xml_stream:parse_element(Data) of
-       El when element(1, El) == xmlelement ->
-           {xmlelement, Name, Attrs, Els} = El,
+       {xmlelement, "body", Attrs, Els} ->
            Xmlns = xml:get_attr_s("xmlns",Attrs),
            if
-               Name /= "body" ->
-                   {error, bad_request};
                Xmlns /= ?NS_HTTP_BIND ->
                    {error, bad_request};
                true ->
@@ -1021,7 +1091,7 @@ parse_request(Data) ->
                         {'EXIT', _} ->
                             {error, bad_request};
                         Rid ->
-                           %% I guess this is to remove XMLCDATA:
+                           %% I guess this is to remove XMLCDATA: Is it really needed ?
                             FixedEls =
                                 lists:filter(
                                   fun(I) ->
@@ -1032,28 +1102,22 @@ parse_request(Data) ->
                                                   false
                                           end
                                   end, Els),
-                           %% MR: I commented this code, because it is not used.
-%%                             lists:map(
-%%                               fun(E) ->
-%%                                       EXmlns = xml:get_tag_attr_s("xmlns",E),
-%%                                       if
-%%                                           EXmlns == ?NS_CLIENT ->
-%%                                               remove_tag_attr("xmlns",E);
-%%                                           true ->
-%%                                               ok
-%%                                       end
-%%                               end, FixedEls),
-                            Payload = [xml:element_to_string(E) || E <- FixedEls],
                             Sid = xml:get_attr_s("sid",Attrs),
-                           %% MR: I do not think we need to extract
-                           %% Sid. We should have it somewhere else:
-                            {ok, {Sid, Rid, Attrs, Payload}}
+                           {ok, {Sid, Rid, Attrs, FixedEls}}
                     end
            end;
+       {xmlelement, _Name, _Attrs, _Els} ->
+           {error, bad_request};
        {error, _Reason} ->
            {error, bad_request}
     end.
 
+send_receiver_reply(undefined, _Reply) ->
+    ok;
+send_receiver_reply(Receiver, Reply) ->
+    gen_fsm:reply(Receiver, Reply).
+
+
 %% Cancel timer and empty message queue.
 cancel_timer(undefined) ->
     ok;
@@ -1066,6 +1130,15 @@ cancel_timer(Timer) ->
            ok
     end.
 
+%% If client asked for a pause (pause > 0), we apply the pause value
+%% as inactivity timer:
+set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 ->
+    erlang:start_timer(Pause*1000, self(), []);
+%% Otherwise, we apply the max_inactivity value as inactivity timer:
+set_inactivity_timer(_Pause, MaxInactivity) ->
+    erlang:start_timer(MaxInactivity, self(), []).
+
+
 %% TODO: Use tail recursion and list reverse ?
 elements_to_string([]) ->
     [];
@@ -1084,11 +1157,15 @@ get_max_inactivity({Host, _}, Default) ->
 get_max_inactivity(_, Default) ->
     Default.
 
-%% remove_tag_attr(Attr, {xmlelement, Name, Attrs, Els}) ->
-%%     Attrs1 = lists:keydelete(Attr, 1, Attrs),
-%%     {xmlelement, Name, Attrs1, Els};
-%% remove_tag_attr(Attr, El) ->
-%%     El.
+get_max_pause({Host, _}) ->
+    gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE);
+get_max_pause(_) ->
+    ?MAX_PAUSE.
+
+%% Current time as integer
+tnow() ->
+    {TMegSec, TSec, TMSec} = now(),
+    (TMegSec * 1000000 + TSec) * 1000000 + TMSec.
 
 check_default_xmlns({xmlelement, Name, Attrs, Els} = El) ->
     case xml:get_tag_attr_s("xmlns", El) of
@@ -1101,6 +1178,6 @@ check_default_xmlns({xmlelement, Name, Attrs, Els} = El) ->
 check_bind_module(XmppDomain) ->
     case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
        true -> ok;
-       false -> ?ERROR_MSG("You are trying to use HTTP Bind (BOSH), but the module mod_http_bind is not started.~n"
+       false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n"
                            "Check your 'modules' section in your ejabberd configuration file.",[])
     end.