]> granicus.if.org Git - ejabberd/commitdiff
Add support for file-based queues
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 10 Mar 2017 12:12:43 +0000 (15:12 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 10 Mar 2017 12:12:43 +0000 (15:12 +0300)
It's now possible to use files as internal packet queues.
The following options are introduced:

* queue_type: the option can be set to `ram` (default) or `file`.
  The option can be set per virtual host.
* queue_dir: path to the directory where queues will be allocated.
  The default is 'queue' directory inside Mnesia directory.
  This is a global option and cannot be set per virtual host.

17 files changed:
include/mod_muc_room.hrl
rebar.config
src/ejabberd_app.erl
src/ejabberd_bosh.erl
src/ejabberd_config.erl
src/ejabberd_s2s.erl
src/ejabberd_s2s_out.erl
src/ejabberd_sql.erl
src/jlib.erl
src/mod_bosh.erl
src/mod_irc_connection.erl
src/mod_mam.erl
src/mod_muc.erl
src/mod_muc_admin.erl
src/mod_muc_room.erl
src/mod_stream_mgmt.erl
test/ejabberd_SUITE_data/ejabberd.yml

index c0d8f50bdec57a644debb9bb805ce30f5ceb4bb7..cf30153ac03ba32efa5db87af89cc056317c7730 100644 (file)
@@ -28,9 +28,8 @@
 
 -record(lqueue,
 {
-    queue = queue:new() :: ?TQUEUE,
-    len = 0             :: integer(),
-    max = 0             :: integer()
+    queue   :: p1_queue:queue(),
+    max = 0 :: integer()
 }).
 
 -type lqueue() :: #lqueue{}.
     robots                  = (?DICT):new() :: ?TDICT,
     nicks                   = (?DICT):new() :: ?TDICT,
     affiliations            = (?DICT):new() :: ?TDICT,
-    history                 = #lqueue{} :: lqueue(),
+    history                 :: lqueue(),
     subject                 = <<"">> :: binary(),
     subject_author          = <<"">> :: binary(),
     just_created            = false :: boolean(),
     activity                = treap:empty() :: treap:treap(),
     room_shaper             = none :: shaper:shaper(),
-    room_queue              = queue:new() :: ?TQUEUE
+    room_queue              :: p1_queue:queue() | undefined
 }).
index f3859195580119a88c150e9f297b8c395e65e06d..7088cfefee34acef68f564e5690bd8c3ca8038d4 100644 (file)
@@ -19,7 +19,7 @@
 %%%----------------------------------------------------------------------
 
 {deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
-        {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.7"}}},
+        {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}},
         {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}},
         {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}},
         {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}},
@@ -81,6 +81,7 @@
             {i, "include"},
            {i, "deps/fast_xml/include"},
            {i, "deps/xmpp/include"},
+           {i, "deps/p1_utils/include"},
             {if_var_false, debug, no_debug_info},
             {if_var_true, debug, debug_info},
             {if_var_true, roster_gateway_workaround, {d, 'ROSTER_GATWAY_WORKAROUND'}},
 
 {eunit_compile_opts, [{i, "tools"},
                      {i, "include"},
+                     {i, "deps/p1_utils/include"},
                      {i, "deps/fast_xml/include"},
                      {i, "deps/xmpp/include"}]}.
 
index f4d10e5a583c1fd0a8463f513e897f8f0146bab3..01f0c4c9964f792dce0d0e358f9a11708b734b7a 100644 (file)
@@ -49,6 +49,7 @@ start(normal, _Args) ->
     setup_if_elixir_conf_used(),
     ejabberd_config:start(),
     set_settings_from_config(),
+    file_queue_init(),
     maybe_add_nameservers(),
     connect_nodes(),
     case ejabberd_sup:start_link() of
@@ -167,6 +168,16 @@ set_settings_from_config() ->
                  60),
     net_kernel:set_net_ticktime(Ticktime).
 
+file_queue_init() ->
+    QueueDir = case ejabberd_config:queue_dir() of
+                  undefined ->
+                      {ok, MnesiaDir} = application:get_env(mnesia, dir),
+                      filename:join(MnesiaDir, "queue");
+                  Path ->
+                      Path
+              end,
+    p1_queue:start(QueueDir).
+
 start_apps() ->
     crypto:start(),
     ejabberd:start_app(sasl),
index 204c7b6e58e519da0112cf18cac68bf887873b7d..d34736a4406c584a08899c22a5101fcc36bffe19 100644 (file)
@@ -96,8 +96,8 @@
 -record(state,
        {host = <<"">>                            :: binary(),
          sid = <<"">>                             :: binary(),
-         el_ibuf = buf_new()                      :: ?TQUEUE,
-         el_obuf = buf_new()                      :: ?TQUEUE,
+         el_ibuf                                  :: p1_queue:queue(),
+         el_obuf                                  :: p1_queue:queue(),
          shaper_state = none                      :: shaper:shaper(),
          c2s_pid                                  :: pid() | undefined,
         xmpp_ver = <<"">>                        :: binary(),
          max_concat = unlimited                   :: unlimited | non_neg_integer(),
         responses = gb_trees:empty()             :: ?TGB_TREE,
         receivers = gb_trees:empty()             :: ?TGB_TREE,
-        shaped_receivers = queue:new()           :: ?TQUEUE,
+        shaped_receivers                         :: p1_queue:queue(),
          ip                                       :: inet:ip_address(),
          max_requests = 1                         :: non_neg_integer()}).
 
@@ -305,10 +305,10 @@ init([#body{attrs = Attrs}, IP, SID]) ->
                            false) of
                         true ->
                             JID = make_random_jid(XMPPDomain),
-                            {buf_new(), [{jid, JID} | Opts2]};
+                            {buf_new(XMPPDomain), [{jid, JID} | Opts2]};
                         false ->
                             {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)],
-                                    buf_new()),
+                                    buf_new(XMPPDomain)),
                              Opts2}
                    end,
     ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
@@ -321,10 +321,12 @@ init([#body{attrs = Attrs}, IP, SID]) ->
                                        fun(unlimited) -> unlimited;
                                           (N) when is_integer(N), N>0 -> N
                                        end, unlimited),
+    ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
     State = #state{host = XMPPDomain, sid = SID, ip = IP,
                   xmpp_ver = XMPPVer, el_ibuf = InBuf,
-                  max_concat = MaxConcat, el_obuf = buf_new(),
+                  max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
                   inactivity_timeout = Inactivity,
+                  shaped_receivers = ShapedReceivers,
                   shaper_state = ShaperState},
     NewState = restart_inactivity_timer(State),
     mod_bosh:open_session(SID, self()),
@@ -417,15 +419,15 @@ active(#body{attrs = Attrs, size = Size} = Req, From,
        shaper:update(State#state.shaper_state, Size),
     State1 = State#state{shaper_state = ShaperState},
     if Pause > 0 ->
-          QLen = queue:len(State1#state.shaped_receivers),
-          if QLen < (?MAX_SHAPED_REQUESTS_QUEUE_LEN) ->
-                 TRef = start_shaper_timer(Pause),
-                 Q = queue:in({TRef, From, Req},
-                              State1#state.shaped_receivers),
-                 State2 = stop_inactivity_timer(State1),
-                 {next_state, active,
-                  State2#state{shaped_receivers = Q}};
-             true ->
+           TRef = start_shaper_timer(Pause),
+           try p1_queue:in({TRef, From, Req},
+                           State1#state.shaped_receivers) of
+               Q ->
+                   State2 = stop_inactivity_timer(State1),
+                   {next_state, active,
+                    State2#state{shaped_receivers = Q}}
+           catch error:full ->
+                 cancel_timer(TRef),
                  RID = get_attr(rid, Attrs),
                  reply_stop(State1,
                             #body{http_reason = <<"Too many requests">>,
@@ -572,7 +574,7 @@ handle_sync_event({send_xml, El}, _From, StateName,
           reply(State2, Body#body{els = Els},
                 State2#state.prev_rid, From)};
       none ->
-         State2 = case queue:out(State1#state.shaped_receivers)
+         State2 = case p1_queue:out(State1#state.shaped_receivers)
                       of
                     {{value, {TRef, From, Body}}, Q} ->
                         cancel_timer(TRef),
@@ -601,7 +603,7 @@ handle_info({timeout, TRef, inactive}, _StateName,
     {stop, normal, State};
 handle_info({timeout, TRef, shaper_timeout}, StateName,
            State) ->
-    case queue:out(State#state.shaped_receivers) of
+    case p1_queue:out(State#state.shaped_receivers) of
       {{value, {TRef, From, Req}}, Q} ->
          (?GEN_FSM):send_event(self(), {Req, From}),
          {next_state, StateName,
@@ -646,9 +648,13 @@ code_change(_OldVsn, StateName, State, _Extra) ->
 
 print_state(State) -> State.
 
-route_els(#state{el_ibuf = Buf} = State) ->
-    route_els(State#state{el_ibuf = buf_new()},
-             buf_to_list(Buf)).
+route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) ->
+    NewBuf = p1_queue:dropwhile(
+              fun(El) ->
+                      ?GEN_FSM:send_event(C2SPid, El),
+                      true
+              end, Buf),
+    State#state{el_ibuf = NewBuf}.
 
 route_els(State, Els) ->
     case State#state.c2s_pid of
@@ -734,7 +740,7 @@ bounce_receivers(State, Reason) ->
                                        RID = get_attr(rid, Attrs),
                                        {RID, {From, Body}}
                                end,
-                               queue:to_list(State#state.shaped_receivers)),
+                               p1_queue:to_list(State#state.shaped_receivers)),
     lists:foldl(fun ({RID, {From, Body}}, AccState) ->
                        NewBody = if Reason == closed ->
                                         #body{http_reason =
@@ -752,7 +758,7 @@ bounce_receivers(State, Reason) ->
                State, Receivers ++ ShapedReceivers).
 
 bounce_els_from_obuf(State) ->
-    lists:foreach(
+    p1_queue:foreach(
       fun({xmlstreamelement, El}) ->
              try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of
                  Pkt when ?is_stanza(Pkt) ->
@@ -769,7 +775,7 @@ bounce_els_from_obuf(State) ->
              end;
         (_) ->
              ok
-      end, buf_to_list(State#state.el_obuf)).
+      end, State#state.el_obuf).
 
 is_valid_key(<<"">>, <<"">>) -> true;
 is_valid_key(PrevKey, Key) ->
@@ -1029,26 +1035,33 @@ get_attr(Attr, Attrs, Default) ->
       _ -> Default
     end.
 
-buf_new() -> queue:new().
+buf_new(Host) ->
+    buf_new(Host, unlimited).
+
+buf_new(Host, Limit) ->
+    QueueType = case gen_mod:get_module_opt(
+                      Host, mod_bosh, queue_type,
+                      mod_bosh:mod_opt_type(queue_type)) of
+                   undefined -> ejabberd_config:default_queue_type(Host);
+                   T -> T
+               end,
+    p1_queue:new(QueueType, Limit).
 
 buf_in(Xs, Buf) ->
-    lists:foldl(fun (X, Acc) -> queue:in(X, Acc) end, Buf,
-               Xs).
+    lists:foldl(fun p1_queue:in/2, Buf, Xs).
 
 buf_out(Buf, Num) when is_integer(Num), Num > 0 ->
     buf_out(Buf, Num, []);
-buf_out(Buf, _) -> {queue:to_list(Buf), buf_new()}.
+buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}.
 
 buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf};
 buf_out(Buf, I, Els) ->
-    case queue:out(Buf) of
+    case p1_queue:out(Buf) of
       {{value, El}, NewBuf} ->
          buf_out(NewBuf, I - 1, [El | Els]);
       {empty, _} -> buf_out(Buf, 0, Els)
     end.
 
-buf_to_list(Buf) -> queue:to_list(Buf).
-
 cancel_timer(TRef) when is_reference(TRef) ->
     (?GEN_FSM):cancel_timer(TRef);
 cancel_timer(_) -> false.
index 720e4cafa8b6ce7a71ba4dec2dc122467fbbf44f..5856546ce9d1270fe1c1006d98f81efb154488e8 100644 (file)
@@ -37,7 +37,7 @@
         env_binary_to_list/2, opt_type/1, may_hide_data/1,
         is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1,
         default_db/1, default_db/2, default_ram_db/1, default_ram_db/2,
-        fsm_limit_opts/1]).
+        default_queue_type/1, queue_dir/0, fsm_limit_opts/1]).
 
 -export([start/2]).
 
@@ -1455,9 +1455,13 @@ opt_type(default_ram_db) ->
     fun(T) when is_atom(T) -> T end;
 opt_type(loglevel) ->
     fun (P) when P >= 0, P =< 5 -> P end;
+opt_type(queue_dir) ->
+    fun iolist_to_binary/1;
+opt_type(queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 opt_type(_) ->
-    [hide_sensitive_log_data, hosts, language,
-     default_db, default_ram_db, loglevel].
+    [hide_sensitive_log_data, hosts, language, max_fsm_queue,
+     default_db, default_ram_db, queue_type, queue_dir, loglevel].
 
 -spec may_hide_data(any()) -> any().
 may_hide_data(Data) ->
@@ -1486,3 +1490,11 @@ fsm_limit_opts(Opts) ->
                N -> [{max_queue, N}]
            end
     end.
+
+-spec queue_dir() -> binary() | undefined.
+queue_dir() ->
+    get_option(queue_dir, opt_type(queue_dir)).
+
+-spec default_queue_type(binary()) -> ram | file.
+default_queue_type(Host) ->
+    get_option({queue_type, Host}, opt_type(queue_type), ram).
index 99ee6de9f914772ab28c3841ce27131b79c298ab..40492d8da7b58b006cfa9cc9fbf59bf290d7b4b9 100644 (file)
@@ -45,7 +45,7 @@
         external_host_overloaded/1, is_temporarly_blocked/1,
         get_commands_spec/0, zlib_enabled/1, get_idle_timeout/1,
         tls_required/1, tls_verify/1, tls_enabled/1, tls_options/2,
-        host_up/1, host_down/1]).
+        host_up/1, host_down/1, queue_type/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2,
@@ -285,6 +285,14 @@ get_idle_timeout(LServer) ->
         (infinity) -> infinity
       end, timer:minutes(10)).
 
+-spec queue_type(binary()) -> ram | file.
+queue_type(LServer) ->
+    case ejabberd_config:get_option(
+          {s2s_queue_type, LServer}, opt_type(s2s_queue_type)) of
+       undefined -> ejabberd_config:default_queue_type(LServer);
+       Type -> Type
+    end.
+
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
@@ -739,7 +747,9 @@ opt_type(s2s_timeout) ->
     fun(I) when is_integer(I), I>=0 -> I;
        (infinity) -> infinity
     end;
+opt_type(s2s_queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 opt_type(_) ->
     [route_subdomains, s2s_access,  s2s_certfile,
      s2s_ciphers, s2s_dhfile, s2s_cafile, s2s_protocol_options,
-     s2s_tls_compression, s2s_use_starttls, s2s_timeout].
+     s2s_tls_compression, s2s_use_starttls, s2s_timeout, s2s_queue_type].
index 77f5554755ad56892623ffd8ae8643854fea2585..8c9f9d631837b50eb6ba3b6aabfdeb77dcfb4a6e 100644 (file)
@@ -277,8 +277,14 @@ handle_timeout(#{on_route := Action} = State) ->
 
 init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
     ServerHost = ejabberd_router:host_of_route(LServer),
+    QueueType = ejabberd_s2s:queue_type(LServer),
+    QueueLimit = case lists:keyfind(
+                       max_queue, 1, ejabberd_config:fsm_limit_opts([])) of
+                    {_, N} -> N;
+                    false -> unlimited
+                end,
     State1 = State#{on_route => queue,
-                   queue => queue:new(),
+                   queue => p1_queue:new(QueueType, QueueLimit),
                    xmlns => ?NS_SERVER,
                    lang => ?MYLANG,
                    server_host => ServerHost,
@@ -300,7 +306,15 @@ handle_cast(Msg, #{server_host := ServerHost} = State) ->
 
 handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) ->
     case Action of
-       queue -> State#{queue => queue:in(Pkt, Q)};
+       queue ->
+           try State#{queue => p1_queue:in(Pkt, Q)}
+           catch error:full ->
+                   #{server := LServer, remote_server := RServer} = State,
+                   ?INFO_MSG("Failed to establish outbound s2s connection "
+                             "~s -> ~s: message queue is overloaded",
+                             [LServer, RServer]),
+                   stop(State#{stop_reason => queue_full})
+           end;
        bounce -> bounce_packet(Pkt, State);
        send -> set_idle_timeout(send(State, Pkt))
     end;
@@ -324,20 +338,18 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 -spec resend_queue(state()) -> state().
-resend_queue(#{queue := Q} = State) ->
-    State1 = State#{queue => queue:new()},
-    jlib:queue_foldl(
+resend_queue(State) ->
+    queue_fold(
       fun(Pkt, AccState) ->
              send(AccState, Pkt)
-      end, State1, Q).
+      end, State).
 
 -spec bounce_queue(state()) -> state().
-bounce_queue(#{queue := Q} = State) ->
-    State1 = State#{queue => queue:new()},
-    jlib:queue_foldl(
+bounce_queue(State) ->
+    queue_fold(
       fun(Pkt, AccState) ->
              bounce_packet(Pkt, AccState)
-      end, State1, Q).
+      end, State).
 
 -spec bounce_message_queue(state()) -> state().
 bounce_message_queue(State) ->
@@ -363,6 +375,8 @@ mk_bounce_error(Lang, #{stop_reason := Why}) ->
     case Why of
        internal_failure ->
            xmpp:err_internal_server_error();
+       queue_full ->
+           xmpp:err_resource_constraint();
        {dns, _} ->
            xmpp:err_remote_server_not_found(Reason, Lang);
                                             _ ->
@@ -387,6 +401,15 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) ->
 set_idle_timeout(State) ->
     State.
 
+queue_fold(F, #{queue := Q} = State) ->
+    case p1_queue:out(Q) of
+       {{value, Pkt}, Q1} ->
+           State1 = F(Pkt, State#{queue => Q1}),
+           queue_fold(F, State1);
+       {empty, Q1} ->
+           State#{queue => Q1}
+    end.
+
 transform_options(Opts) ->
     lists:foldl(fun transform_options/2, [], Opts).
 
index 51ff9d436b72342840a8455e5d47f34ac9cd368c..616b6b73adc1379ca4c4e020ea538a8bdaec418a 100644 (file)
@@ -75,8 +75,7 @@
         db_version = undefined              :: undefined | non_neg_integer(),
         start_interval = 0                  :: non_neg_integer(),
         host = <<"">>                       :: binary(),
-        max_pending_requests_len            :: non_neg_integer(),
-        pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}).
+        pending_requests                    :: p1_queue:queue()}).
 
 -define(STATE_KEY, ejabberd_sql_state).
 
@@ -271,10 +270,16 @@ init([Host, StartInterval]) ->
     [DBType | _] = db_opts(Host),
     (?GEN_FSM):send_event(self(), connect),
     ejabberd_sql_sup:add_pid(Host, self()),
+    QueueType = case ejabberd_config:get_option(
+                      {sql_queue_type, Host}, opt_type(sql_queue_type)) of
+                   undefined ->
+                       ejabberd_config:default_queue_type(Host);
+                   Type ->
+                       Type
+               end,
     {ok, connecting,
      #state{db_type = DBType, host = Host,
-           max_pending_requests_len = max_fsm_queue(),
-           pending_requests = {0, queue:new()},
+           pending_requests = p1_queue:new(QueueType, max_fsm_queue()),
            start_interval = StartInterval}}.
 
 connecting(connect, #state{host = Host} = State) ->
@@ -285,16 +290,17 @@ connecting(connect, #state{host = Host} = State) ->
                   [mssql | Args] -> apply(fun odbc_connect/1, Args);
                   [odbc | Args] -> apply(fun odbc_connect/1, Args)
                 end,
-    {_, PendingRequests} = State#state.pending_requests,
     case ConnectRes of
         {ok, Ref} ->
             erlang:monitor(process, Ref),
-            lists:foreach(fun (Req) ->
-                                  (?GEN_FSM):send_event(self(), Req)
-                          end,
-                          queue:to_list(PendingRequests)),
+           PendingRequests =
+               p1_queue:dropwhile(
+                 fun(Req) ->
+                         ?GEN_FSM:send_event(self(), Req),
+                         true
+                 end, State#state.pending_requests),
             State1 = State#state{db_ref = Ref,
-                                 pending_requests = {0, queue:new()}},
+                                 pending_requests = PendingRequests},
             State2 = get_db_version(State1),
             {next_state, session_established, State2};
       {error, Reason} ->
@@ -321,26 +327,20 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
           State) ->
     ?DEBUG("queuing pending request while connecting:~n\t~p",
           [Req]),
-    {Len, PendingRequests} = State#state.pending_requests,
-    NewPendingRequests = if Len <
-                             State#state.max_pending_requests_len ->
-                               {Len + 1,
-                                queue:in({sql_cmd, Command, From, Timestamp},
-                                         PendingRequests)};
-                           true ->
-                               lists:foreach(fun ({sql_cmd, _, To,
-                                                   _Timestamp}) ->
-                                                     (?GEN_FSM):reply(To,
-                                                                      {error,
-                                                                       <<"SQL connection failed">>})
-                                             end,
-                                             queue:to_list(PendingRequests)),
-                               {1,
-                                queue:from_list([{sql_cmd, Command, From,
-                                                  Timestamp}])}
-                        end,
+    PendingRequests =
+       try p1_queue:in({sql_cmd, Command, From, Timestamp},
+                       State#state.pending_requests)
+       catch error:full ->
+               Q = p1_queue:dropwhile(
+                     fun({sql_cmd, _, To, _Timestamp}) ->
+                             (?GEN_FSM):reply(
+                               To, {error, <<"SQL connection failed">>}),
+                             true
+                     end, State#state.pending_requests),
+               p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
+       end,
     {next_state, connecting,
-     State#state{pending_requests = NewPendingRequests}};
+     State#state{pending_requests = PendingRequests}};
 connecting(Request, {Who, _Ref}, State) ->
     ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'",
                 [Request, Who]),
@@ -1068,15 +1068,10 @@ odbcinst_config() ->
     filename:join(tmp_dir(), "odbcinst.ini").
 
 max_fsm_queue() ->
-    ejabberd_config:get_option(
-      max_fsm_queue,
-      fun(N) when is_integer(N), N > 0 -> N end).
+    proplists:get_value(max_queue, fsm_limit_opts(), unlimited).
 
 fsm_limit_opts() ->
-    case max_fsm_queue() of
-      N when is_integer(N) -> [{max_queue, N}];
-      _ -> []
-    end.
+    ejabberd_config:fsm_limit_opts([]).
 
 check_error({error, Why} = Err, #sql_query{} = Query) ->
     ?ERROR_MSG("SQL query '~s' at ~p failed: ~p",
@@ -1093,8 +1088,6 @@ check_error({error, Why} = Err, Query) ->
 check_error(Result, _Query) ->
     Result.
 
-opt_type(max_fsm_queue) ->
-    fun (N) when is_integer(N), N > 0 -> N end;
 opt_type(sql_database) -> fun iolist_to_binary/1;
 opt_type(sql_keepalive_interval) ->
     fun (I) when is_integer(I), I > 0 -> I end;
@@ -1114,8 +1107,10 @@ opt_type(sql_ssl) -> fun(B) when is_boolean(B) -> B end;
 opt_type(sql_ssl_verify) -> fun(B) when is_boolean(B) -> B end;
 opt_type(sql_ssl_certfile) -> fun iolist_to_binary/1;
 opt_type(sql_ssl_cafile) -> fun iolist_to_binary/1;
+opt_type(sql_queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 opt_type(_) ->
-    [max_fsm_queue, sql_database, sql_keepalive_interval,
+    [sql_database, sql_keepalive_interval,
      sql_password, sql_port, sql_server, sql_type,
      sql_username, sql_ssl, sql_ssl_verify, sql_ssl_cerfile,
-     sql_ssl_cafile].
+     sql_ssl_cafile, sql_queue_type].
index 580ad1ffaeff3df5699d343b0754e27468d310fa..38a71d7ccd687b945af557410c198485eae55b70 100644 (file)
@@ -38,8 +38,7 @@
 -export([tolower/1, term_to_base64/1, base64_to_term/1,
         decode_base64/1, encode_base64/1, ip_to_list/1,
         atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
-        l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1,
-        queue_drop_while/2, queue_foldl/3, queue_foldr/3, queue_foreach/2]).
+        l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1]).
 
 %% The following functions are used by gen_iq_handler.erl for providing backward
 %% compatibility and must not be used in other parts of the code
@@ -960,48 +959,3 @@ i2l(L, N) when is_binary(L) ->
       C when C > N -> L;
       _ -> i2l(<<$0, L/binary>>, N)
     end.
-
--spec queue_drop_while(fun((term()) -> boolean()), ?TQUEUE) -> ?TQUEUE.
-
-queue_drop_while(F, Q) ->
-    case queue:peek(Q) of
-      {value, Item} ->
-         case F(Item) of
-           true ->
-               queue_drop_while(F, queue:drop(Q));
-           _ ->
-               Q
-         end;
-      empty ->
-         Q
-    end.
-
--spec queue_foldl(fun((term(), T) -> T), T, ?TQUEUE) -> T.
-queue_foldl(F, Acc, Q) ->
-    case queue:out(Q) of
-       {{value, Item}, Q1} ->
-           Acc1 = F(Item, Acc),
-           queue_foldl(F, Acc1, Q1);
-       {empty, _} ->
-           Acc
-    end.
-
--spec queue_foldr(fun((term(), T) -> T), T, ?TQUEUE) -> T.
-queue_foldr(F, Acc, Q) ->
-    case queue:out_r(Q) of
-       {{value, Item}, Q1} ->
-           Acc1 = F(Item, Acc),
-           queue_foldr(F, Acc1, Q1);
-       {empty, _} ->
-           Acc
-    end.
-
--spec queue_foreach(fun((_) -> _), ?TQUEUE) -> ok.
-queue_foreach(F, Q) ->
-    case queue:out(Q) of
-       {{value, Item}, Q1} ->
-           F(Item),
-           queue_foreach(F, Q1);
-       {empty, _} ->
-           ok
-    end.
index abe3c2f16a75c8579bc5b8d13f68df5a7de29ad1..57c81953747b586533aa2b0b45c464ac584a9064 100644 (file)
@@ -158,9 +158,11 @@ mod_opt_type(prebind) ->
     fun (B) when is_boolean(B) -> B end;
 mod_opt_type(ram_db_type) ->
     fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
+mod_opt_type(queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 mod_opt_type(_) ->
-    [json, max_concat, max_inactivity, max_pause, prebind, ram_db_type].
-
+    [json, max_concat, max_inactivity, max_pause, prebind, ram_db_type,
+     queue_type].
 
 %%%----------------------------------------------------------------------
 %%% Help Web Page
index 46ed8767eaa3aa947dcf315b04eaecfa9845e035..1e90c4005179bfbd7d181e50af0f115e471f7b49 100644 (file)
@@ -50,7 +50,6 @@
          encoding = <<"">>     :: binary(),
          port = 0              :: inet:port_number(),
          password = <<"">>     :: binary(),
-         queue = queue:new()   :: ?TQUEUE,
          user = #jid{}         :: jid(),
          host = <<"">>         :: binary(),
         server = <<"">>       :: binary(),
@@ -112,7 +111,7 @@ init([From, Host, Server, Username, Encoding, Port,
       Password, Ident, RemoteAddr, RealName, WebircPassword, Mod]) ->
     gen_fsm:send_event(self(), init),
     {ok, open_socket,
-     #state{queue = queue:new(), mod = Mod,
+     #state{mod = Mod,
            encoding = Encoding, port = Port, password = Password,
            user = From, nick = Username, host = Host,
            server = Server, ident = Ident, realname = RealName,
@@ -695,15 +694,6 @@ send_text(#state{socket = Socket, encoding = Encoding},
     CText = iconv:convert(<<"utf-8">>, Encoding, iolist_to_binary(Text)),
     gen_tcp:send(Socket, CText).
 
-%send_queue(Socket, Q) ->
-%    case queue:out(Q) of
-%      {{value, El}, Q1} ->
-%          send_element(Socket, El),
-%          send_queue(Socket, Q1);
-%      {empty, Q1} ->
-%          ok
-%    end.
-
 bounce_messages(Reason) ->
     receive
        {send_element, El} ->
index 47a9c6ce12d13b09ebb2026f41ed5ff4db06d978..04d5ec686aa2a5a5efa7fa3d00fbbf09ac0c240d 100644 (file)
@@ -837,7 +837,8 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM,
                                 history = History}} = MsgType) ->
     Start = proplists:get_value(start, Query),
     End = proplists:get_value('end', Query),
-    #lqueue{len = L, queue = Q} = History,
+    #lqueue{queue = Q} = History,
+    L = p1_queue:len(Q),
     Msgs =
        lists:flatmap(
          fun({Nick, Pkt, _HaveSubject, Now, _Size}) ->
@@ -861,7 +862,7 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM,
                      false ->
                          []
                  end
-         end, queue:to_list(Q)),
+         end, p1_queue:to_list(Q)),
     case RSM of
        #rsm_set{max = Max, before = Before} when is_binary(Before) ->
            {NewMsgs, IsComplete} = filter_by_max(lists:reverse(Msgs), Max),
index 563f4c68a65a003af4f62a8094b3ae28c7b62da9..7e8c4e78d7c8b5c703b6ff13a4a29b05933f9816 100644 (file)
@@ -80,6 +80,7 @@
          access = {none, none, none, none} :: {atom(), atom(), atom(), atom()},
          history_size = 20 :: non_neg_integer(),
          max_rooms_discoitems = 100 :: non_neg_integer(),
+        queue_type = ram :: ram | file,
          default_room_opts = [] :: list(),
          room_shaper = none :: shaper:shaper()}).
 
@@ -226,7 +227,7 @@ init([Host, Opts]) ->
     IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
                              one_queue),
     #state{access = Access, host = MyHost,
-          history_size = HistorySize,
+          history_size = HistorySize, queue_type = QueueType,
           room_shaper = RoomShaper} = State = init_state(Host, Opts),
     Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
     RMod = gen_mod:ram_db_mod(Host, Opts, ?MODULE),
@@ -234,7 +235,7 @@ init([Host, Opts]) ->
     RMod:init(Host, [{host, MyHost}|Opts]),
     register_iq_handlers(MyHost, IQDisc),
     ejabberd_router:register_route(MyHost, Host),
-    load_permanent_rooms(MyHost, Host, Access, HistorySize, RoomShaper),
+    load_permanent_rooms(MyHost, Host, Access, HistorySize, RoomShaper, QueueType),
     {ok, State}.
 
 handle_call(stop, _From, State) ->
@@ -242,7 +243,7 @@ handle_call(stop, _From, State) ->
 handle_call({create, Room, From, Nick, Opts}, _From,
            #state{host = Host, server_host = ServerHost,
                   access = Access, default_room_opts = DefOpts,
-                  history_size = HistorySize,
+                  history_size = HistorySize, queue_type = QueueType,
                   room_shaper = RoomShaper} = State) ->
     ?DEBUG("MUC: create new room '~s'~n", [Room]),
     NewOpts = case Opts of
@@ -253,7 +254,7 @@ handle_call({create, Room, From, Nick, Opts}, _From,
                  Host, ServerHost, Access,
                  Room, HistorySize,
                  RoomShaper, From,
-                 Nick, NewOpts),
+                 Nick, NewOpts, QueueType),
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
     RMod:register_online_room(Room, Host, Pid),
     {reply, ok, State}.
@@ -300,13 +301,14 @@ handle_cast(Msg, State) ->
 handle_info({route, Packet},
            #state{host = Host, server_host = ServerHost,
                   access = Access, default_room_opts = DefRoomOpts,
-                  history_size = HistorySize,
+                  history_size = HistorySize, queue_type = QueueType,
                   max_rooms_discoitems = MaxRoomsDiscoItems,
                   room_shaper = RoomShaper} = State) ->
     From = xmpp:get_from(Packet),
     To = xmpp:get_to(Packet),
     case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
-                       From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems) of
+                       From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems,
+                       QueueType) of
        {'EXIT', Reason} ->
            ?ERROR_MSG("~p", [Reason]);
        _ ->
@@ -353,6 +355,13 @@ init_state(Host, Opts) ->
     DefRoomOpts1 = gen_mod:get_opt(default_room_options, Opts,
                                   fun(L) when is_list(L) -> L end,
                                   []),
+    QueueType = case gen_mod:get_opt(queue_type, Opts,
+                                    mod_opt_type(queue_type)) of
+                   undefined ->
+                       ejabberd_config:default_queue_type(Host);
+                   Type ->
+                       Type
+               end,
     DefRoomOpts =
        lists:flatmap(
          fun({Opt, Val}) ->
@@ -410,6 +419,7 @@ init_state(Host, Opts) ->
           server_host = Host,
           access = {Access, AccessCreate, AccessAdmin, AccessPersistent},
           default_room_opts = DefRoomOpts,
+          queue_type = QueueType,
           history_size = HistorySize,
           max_rooms_discoitems = MaxRoomsDiscoItems,
           room_shaper = RoomShaper}.
@@ -437,12 +447,12 @@ unregister_iq_handlers(Host) ->
     gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS).
 
 do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
-        From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems) ->
+        From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems, QueueType) ->
     {AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent} = Access,
     case acl:match_rule(ServerHost, AccessRoute, From) of
        allow ->
            do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
-                     From, To, Packet, DefRoomOpts);
+                     From, To, Packet, DefRoomOpts, QueueType);
        deny ->
            Lang = xmpp:get_lang(Packet),
            ErrText = <<"Access denied by service policy">>,
@@ -452,11 +462,11 @@ do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
 
 do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper,
          _From, #jid{luser = <<"">>, lresource = <<"">>} = _To,
-         #iq{} = IQ, _DefRoomOpts) ->
+         #iq{} = IQ, _DefRoomOpts, _QueueType) ->
     ejabberd_local:process_iq(IQ);
 do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper,
          From, #jid{luser = <<"">>, lresource = <<"">>} = _To,
-         #message{lang = Lang, body = Body, type = Type} = Packet, _) ->
+         #message{lang = Lang, body = Body, type = Type} = Packet, _, _) ->
     {_AccessRoute, _AccessCreate, AccessAdmin, _AccessPersistent} = Access,
     if Type == error ->
            ok;
@@ -473,11 +483,11 @@ do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper,
            end
     end;
 do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper,
-         _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts) ->
+         _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts, _) ->
     Err = xmpp:err_service_unavailable(),
     ejabberd_router:route_error(Packet, Err);
 do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
-         From, To, Packet, DefRoomOpts) ->
+         From, To, Packet, DefRoomOpts, QueueType) ->
     {_AccessRoute, AccessCreate, _AccessAdmin, _AccessPersistent} = Access,
     {Room, _, Nick} = jid:tolower(To),
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
@@ -492,7 +502,8 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
                            {ok, Pid} = start_new_room(
                                          Host, ServerHost, Access,
                                          Room, HistorySize,
-                                         RoomShaper, From, Nick, DefRoomOpts),
+                                         RoomShaper, From, Nick, DefRoomOpts,
+                                         QueueType),
                            RMod:register_online_room(Room, Host, Pid),
                            mod_muc_room:route(Pid, Packet),
                            ok;
@@ -659,7 +670,7 @@ get_rooms(ServerHost, Host) ->
     Mod:get_rooms(LServer, Host).
 
 load_permanent_rooms(Host, ServerHost, Access,
-                    HistorySize, RoomShaper) ->
+                    HistorySize, RoomShaper, QueueType) ->
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
     lists:foreach(
       fun(R) ->
@@ -669,7 +680,7 @@ load_permanent_rooms(Host, ServerHost, Access,
                        {ok, Pid} = mod_muc_room:start(Host,
                                ServerHost, Access, Room,
                                HistorySize, RoomShaper,
-                               R#muc_room.opts),
+                               R#muc_room.opts, QueueType),
                      RMod:register_online_room(Room, Host, Pid);
                  {ok, _} ->
                      ok
@@ -679,17 +690,17 @@ load_permanent_rooms(Host, ServerHost, Access,
 
 start_new_room(Host, ServerHost, Access, Room,
            HistorySize, RoomShaper, From,
-           Nick, DefRoomOpts) ->
+           Nick, DefRoomOpts, QueueType) ->
     case restore_room(ServerHost, Host, Room) of
        error ->
            ?DEBUG("MUC: open new room '~s'~n", [Room]),
            mod_muc_room:start(Host, ServerHost, Access, Room,
                HistorySize, RoomShaper,
-               From, Nick, DefRoomOpts);
+               From, Nick, DefRoomOpts, QueueType);
        Opts ->
            ?DEBUG("MUC: restore room '~s'~n", [Room]),
            mod_muc_room:start(Host, ServerHost, Access, Room,
-               HistorySize, RoomShaper, Opts)
+               HistorySize, RoomShaper, Opts, QueueType)
     end.
 
 -spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(),
@@ -956,11 +967,13 @@ mod_opt_type(user_message_shaper) ->
     fun (A) when is_atom(A) -> A end;
 mod_opt_type(user_presence_shaper) ->
     fun (A) when is_atom(A) -> A end;
+mod_opt_type(queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 mod_opt_type(_) ->
     [access, access_admin, access_create, access_persistent,
      db_type, ram_db_type, default_room_options, history_size, host,
      max_room_desc, max_room_id, max_room_name,
      max_rooms_discoitems, max_user_conferences, max_users,
      max_users_admin_threshold, max_users_presence,
-     min_message_interval, min_presence_interval,
+     min_message_interval, min_presence_interval, queue_type,
      regexp_room_id, room_shaper, user_message_shaper, user_presence_shaper].
index 4b1509fc9bae60531de4b386ff424e92059f6c1f..d94eb3086494b4954ec0ca2bdbf6fdea78bd2118 100644 (file)
@@ -471,6 +471,8 @@ create_room_with_opts(Name1, Host1, ServerHost, CustomRoomOpts) ->
     AcPer = gen_mod:get_module_opt(ServerHost, mod_muc, access_persistent, fun(X) -> X end, all),
     HistorySize = gen_mod:get_module_opt(ServerHost, mod_muc, history_size, fun(X) -> X end, 20),
     RoomShaper = gen_mod:get_module_opt(ServerHost, mod_muc, room_shaper, fun(X) -> X end, none),
+    QueueType = gen_mod:get_module_opt(ServerHost, mod_muc, queue_type, fun(X) -> X end,
+                                      ejabberd_config:default_queue_type(ServerHost)),
 
     %% If the room does not exist yet in the muc_online_room
     case mod_muc:find_online_room(Name, Host) of
@@ -483,7 +485,8 @@ create_room_with_opts(Name1, Host1, ServerHost, CustomRoomOpts) ->
                          Name,
                          HistorySize,
                          RoomShaper,
-                         RoomOpts),
+                         RoomOpts,
+                         QueueType),
            mod_muc:register_online_room(Host, Name, Pid),
            ok;
        {ok, _} ->
index 8ab14a92beb22e46f66fe6d7be4b2b62d7bea3a5..2b3390666dc2d62f189d419bd2a6c7789b5092b0 100644 (file)
 -behaviour(gen_fsm).
 
 %% External exports
--export([start_link/9,
-        start_link/7,
-        start/9,
-        start/7,
+-export([start_link/10,
+        start_link/8,
+        start/10,
+        start/8,
         get_role/2,
         get_affiliation/2,
         is_occupant_or_admin/2,
 %%% API
 %%%----------------------------------------------------------------------
 start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
-      Creator, Nick, DefRoomOpts) ->
+      Creator, Nick, DefRoomOpts, QueueType) ->
     gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
-                           RoomShaper, Creator, Nick, DefRoomOpts],
+                           RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
                    ?FSMOPTS).
 
-start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
+start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
     gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
-                           RoomShaper, Opts],
+                           RoomShaper, Opts, QueueType],
                    ?FSMOPTS).
 
 start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
-          Creator, Nick, DefRoomOpts) ->
+          Creator, Nick, DefRoomOpts, QueueType) ->
     gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
-                                RoomShaper, Creator, Nick, DefRoomOpts],
+                                RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
                       ?FSMOPTS).
 
-start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
+start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
     gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
-                                RoomShaper, Opts],
+                                RoomShaper, Opts, QueueType],
                       ?FSMOPTS).
 
 %%%----------------------------------------------------------------------
@@ -119,15 +119,17 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
 %%%----------------------------------------------------------------------
 
 init([Host, ServerHost, Access, Room, HistorySize,
-      RoomShaper, Creator, _Nick, DefRoomOpts]) ->
+      RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) ->
     process_flag(trap_exit, true),
     Shaper = shaper:new(RoomShaper),
+    RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
     State = set_affiliation(Creator, owner,
            #state{host = Host, server_host = ServerHost,
                   access = Access, room = Room,
-                  history = lqueue_new(HistorySize),
+                  history = lqueue_new(HistorySize, QueueType),
                   jid = jid:make(Room, Host),
                   just_created = true,
+                  room_queue = RoomQueue,
                   room_shaper = Shaper}),
     State1 = set_opts(DefRoomOpts, State),
     store_room(State1),
@@ -136,15 +138,17 @@ init([Host, ServerHost, Access, Room, HistorySize,
     add_to_log(room_existence, created, State1),
     add_to_log(room_existence, started, State1),
     {ok, normal_state, State1};
-init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) ->
+init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) ->
     process_flag(trap_exit, true),
     Shaper = shaper:new(RoomShaper),
+    RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
     State = set_opts(Opts, #state{host = Host,
                                  server_host = ServerHost,
                                  access = Access,
                                  room = Room,
-                                 history = lqueue_new(HistorySize),
+                                 history = lqueue_new(HistorySize, QueueType),
                                  jid = jid:make(Room, Host),
+                                 room_queue = RoomQueue,
                                  room_shaper = Shaper}),
     add_to_log(room_existence, started, State),
     {ok, normal_state, State}.
@@ -175,7 +179,10 @@ normal_state({route, <<"">>,
               MessageShaperInterval == 0 ->
                    {RoomShaper, RoomShaperInterval} =
                        shaper:update(StateData#state.room_shaper, Size),
-                   RoomQueueEmpty = queue:is_empty(StateData#state.room_queue),
+                   RoomQueueEmpty = case StateData#state.room_queue of
+                                        undefined -> true;
+                                        RQ -> p1_queue:is_empty(RQ)
+                                    end,
                    if RoomShaperInterval == 0, RoomQueueEmpty ->
                            NewActivity = Activity#activity{
                                            message_time = Now,
@@ -200,8 +207,8 @@ normal_state({route, <<"">>,
                                            message_time = Now,
                                            message_shaper = MessageShaper,
                                            message = Packet},
-                           RoomQueue = queue:in({message, From},
-                                                StateData#state.room_queue),
+                           RoomQueue = p1_queue:in({message, From},
+                                                   StateData#state.room_queue),
                            StateData2 = store_user_activity(From,
                                                             NewActivity,
                                                             StateData1),
@@ -584,8 +591,8 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
     {ok, StateName, StateData}.
 
 handle_info({process_user_presence, From}, normal_state = _StateName, StateData) ->
-    RoomQueueEmpty = queue:is_empty(StateData#state.room_queue),
-    RoomQueue = queue:in({presence, From}, StateData#state.room_queue),
+    RoomQueueEmpty = p1_queue:is_empty(StateData#state.room_queue),
+    RoomQueue = p1_queue:in({presence, From}, StateData#state.room_queue),
     StateData1 = StateData#state{room_queue = RoomQueue},
     if RoomQueueEmpty ->
           StateData2 = prepare_room_queue(StateData1),
@@ -595,9 +602,9 @@ handle_info({process_user_presence, From}, normal_state = _StateName, StateData)
 handle_info({process_user_message, From},
            normal_state = _StateName, StateData) ->
     RoomQueueEmpty =
-       queue:is_empty(StateData#state.room_queue),
-    RoomQueue = queue:in({message, From},
-                        StateData#state.room_queue),
+       p1_queue:is_empty(StateData#state.room_queue),
+    RoomQueue = p1_queue:in({message, From},
+                           StateData#state.room_queue),
     StateData1 = StateData#state{room_queue = RoomQueue},
     if RoomQueueEmpty ->
           StateData2 = prepare_room_queue(StateData1),
@@ -606,7 +613,7 @@ handle_info({process_user_message, From},
     end;
 handle_info(process_room_queue,
            normal_state = StateName, StateData) ->
-    case queue:out(StateData#state.room_queue) of
+    case p1_queue:out(StateData#state.room_queue) of
       {{value, {message, From}}, RoomQueue} ->
          Activity = get_user_activity(From, StateData),
          Packet = Activity#activity.message,
@@ -1418,6 +1425,32 @@ get_max_users_admin_threshold(StateData) ->
                            fun(I) when is_integer(I), I>0 -> I end,
                            5).
 
+-spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue().
+room_queue_new(ServerHost, Shaper, QueueType) ->
+    HaveRoomShaper = Shaper /= none,
+    HaveMessageShaper = gen_mod:get_module_opt(
+                         ServerHost, mod_muc, user_message_shaper,
+                         fun(A) when is_atom(A) -> A end,
+                         none) /= none,
+    HavePresenceShaper = gen_mod:get_module_opt(
+                          ServerHost, mod_muc, user_presence_shaper,
+                          fun(A) when is_atom(A) -> A end,
+                          none) /= none,
+    HaveMinMessageInterval = gen_mod:get_module_opt(
+                              ServerHost, mod_muc, min_message_interval,
+                              fun(I) when is_number(I), I>=0 -> I end,
+                              0) /= 0,
+    HaveMinPresenceInterval = gen_mod:get_module_opt(
+                               ServerHost, mod_muc, min_presence_interval,
+                               fun(I) when is_number(I), I>=0 -> I end,
+                               0) /= 0,
+    if HaveRoomShaper or HaveMessageShaper or HavePresenceShaper
+       or HaveMinMessageInterval or HaveMinPresenceInterval ->
+           p1_queue:new(QueueType);
+       true ->
+           undefined
+    end.
+
 -spec get_user_activity(jid(), state()) -> #activity{}.
 get_user_activity(JID, StateData) ->
     case treap:lookup(jid:tolower(JID),
@@ -1515,7 +1548,7 @@ clean_treap(Treap, CleanPriority) ->
 
 -spec prepare_room_queue(state()) -> state().
 prepare_room_queue(StateData) ->
-    case queue:out(StateData#state.room_queue) of
+    case p1_queue:out(StateData#state.room_queue) of
       {{value, {message, From}}, _RoomQueue} ->
          Activity = get_user_activity(From, StateData),
          Packet = Activity#activity.message,
@@ -1997,38 +2030,34 @@ get_history(Nick, Packet, #state{history = History}) ->
        #muc{history = #muc_history{} = MUCHistory} ->
            Now = p1_time_compat:timestamp(),
            Q = History#lqueue.queue,
-           {NewQ, Len} = filter_history(Q, MUCHistory, Now, Nick, queue:new(), 0, 0),
-           History#lqueue{queue = NewQ, len = Len};
+           filter_history(Q, Now, Nick, MUCHistory);
        _ ->
-           History
+           p1_queue:to_list(History#lqueue.queue)
     end.
 
--spec filter_history(?TQUEUE, muc_history(), erlang:timestamp(), binary(),
-                    ?TQUEUE, non_neg_integer(), non_neg_integer()) ->
-                           {?TQUEUE, non_neg_integer()}.
-filter_history(Queue, #muc_history{since = Since,
-                                  seconds = Seconds,
-                                  maxstanzas = MaxStanzas,
-                                  maxchars = MaxChars} = MUC,
-              Now, Nick, AccQueue, NumStanzas, NumChars) ->
-    case queue:out_r(Queue) of
-       {{value, {_, _, _, TimeStamp, Size} = Elem}, NewQueue} ->
-           NowDiff = timer:now_diff(Now, TimeStamp) div 1000000,
-           Chars = Size + byte_size(Nick) + 1,
-           if (NumStanzas < MaxStanzas) andalso
-              (TimeStamp > Since) andalso
-              (NowDiff =< Seconds) andalso
-              (NumChars + Chars =< MaxChars) ->
-                   filter_history(NewQueue, MUC, Now, Nick,
-                                  queue:in_r(Elem, AccQueue),
-                                  NumStanzas + 1,
-                                  NumChars + Chars);
-              true ->
-                   {AccQueue, NumStanzas}
-           end;
-       {empty, _} ->
-           {AccQueue, NumStanzas}
-    end.
+-spec filter_history(p1_queue:queue(), erlang:timestamp(),
+                    binary(), muc_history()) -> list().
+filter_history(Queue, Now, Nick,
+              #muc_history{since = Since,
+                           seconds = Seconds,
+                           maxstanzas = MaxStanzas,
+                           maxchars = MaxChars}) ->
+    {History, _, _} =
+       lists:foldr(
+         fun({_, _, _, TimeStamp, Size} = Elem,
+             {Elems, NumStanzas, NumChars} = Acc) ->
+                 NowDiff = timer:now_diff(Now, TimeStamp) div 1000000,
+                 Chars = Size + byte_size(Nick) + 1,
+                 if (NumStanzas < MaxStanzas) andalso
+                    (TimeStamp > Since) andalso
+                    (NowDiff =< Seconds) andalso
+                    (NumChars + Chars =< MaxChars) ->
+                         {[Elem|Elems], NumStanzas + 1, NumChars + Chars};
+                    true ->
+                         Acc
+                 end
+         end, {[], 0, 0}, p1_queue:to_list(Queue)),
+    History.
 
 -spec is_room_overcrowded(state()) -> boolean().
 is_room_overcrowded(StateData) ->
@@ -2381,31 +2410,28 @@ status_codes(IsInitialPresence, _IsSelfPresence = true, StateData) ->
     end;
 status_codes(_IsInitialPresence, _IsSelfPresence = false, _StateData) -> [].
 
--spec lqueue_new(non_neg_integer()) -> lqueue().
-lqueue_new(Max) ->
-    #lqueue{queue = queue:new(), len = 0, max = Max}.
+-spec lqueue_new(non_neg_integer(), ram | file) -> lqueue().
+lqueue_new(Max, Type) ->
+    #lqueue{queue = p1_queue:new(Type), max = Max}.
 
 -spec lqueue_in(term(), lqueue()) -> lqueue().
 %% If the message queue limit is set to 0, do not store messages.
 lqueue_in(_Item, LQ = #lqueue{max = 0}) -> LQ;
 %% Otherwise, rotate messages in the queue store.
-lqueue_in(Item,
-         #lqueue{queue = Q1, len = Len, max = Max}) ->
-    Q2 = queue:in(Item, Q1),
+lqueue_in(Item, #lqueue{queue = Q1, max = Max}) ->
+    Len = p1_queue:len(Q1),
+    Q2 = p1_queue:in(Item, Q1),
     if Len >= Max ->
           Q3 = lqueue_cut(Q2, Len - Max + 1),
-          #lqueue{queue = Q3, len = Max, max = Max};
-       true -> #lqueue{queue = Q2, len = Len + 1, max = Max}
+          #lqueue{queue = Q3, max = Max};
+       true -> #lqueue{queue = Q2, max = Max}
     end.
 
--spec lqueue_cut(queue:queue(), non_neg_integer()) -> queue:queue().
+-spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue().
 lqueue_cut(Q, 0) -> Q;
 lqueue_cut(Q, N) ->
-    {_, Q1} = queue:out(Q), lqueue_cut(Q1, N - 1).
-
--spec lqueue_to_list(lqueue()) -> list().
-lqueue_to_list(#lqueue{queue = Q1}) ->
-    queue:to_list(Q1).
+    {_, Q1} = p1_queue:out(Q),
+    lqueue_cut(Q1, N - 1).
 
 -spec add_message_to_history(binary(), jid(), message(), state()) -> state().
 add_message_to_history(FromNick, FromJID, Packet, StateData) ->
@@ -2436,7 +2462,7 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) ->
            StateData
     end.
 
--spec send_history(jid(), lqueue(), state()) -> ok.
+-spec send_history(jid(), list(), state()) -> ok.
 send_history(JID, History, StateData) ->
     lists:foreach(
       fun({Nick, Packet, _HaveSubject, _TimeStamp, _Size}) ->
@@ -2445,7 +2471,7 @@ send_history(JID, History, StateData) ->
                  Packet,
                  jid:replace_resource(StateData#state.jid, Nick),
                  JID))
-      end, lqueue_to_list(History)).
+      end, History).
 
 -spec send_subject(jid(), state()) -> ok.
 send_subject(JID, #state{subject_author = Nick} = StateData) ->
index f0152a72279b9ac54501f447464406c781c36988..535d014f170a1c158fdd200a9b4241fcd4de6c19 100644 (file)
@@ -36,6 +36,7 @@
 
 -include("xmpp.hrl").
 -include("logger.hrl").
+-include("p1_queue.hrl").
 
 -define(is_sm_packet(Pkt),
        is_record(Pkt, sm_enable) or
@@ -44,7 +45,6 @@
        is_record(Pkt, sm_r)).
 
 -type state() :: ejabberd_c2s:state().
--type lqueue() :: {non_neg_integer(), queue:queue()}.
 
 %%%===================================================================
 %%% API
@@ -102,6 +102,7 @@ c2s_stream_init({ok, State}, Opts) ->
                    ({max_resume_timeout, _}) -> true;
                    ({ack_timeout, _}) -> true;
                    ({resend_on_timeout, _}) -> true;
+                   ({queue_type, _}) -> true;
                    (_) -> false
                 end, Opts),
     {ok, State#{mgmt_options => MgmtOpts}};
@@ -114,6 +115,7 @@ c2s_stream_started(#{lserver := LServer, mgmt_options := Opts} = State,
     ResumeTimeout = get_resume_timeout(LServer, Opts),
     MaxResumeTimeout = get_max_resume_timeout(LServer, Opts, ResumeTimeout),
     State1#{mgmt_state => inactive,
+           mgmt_queue_type => get_queue_type(LServer, Opts),
            mgmt_max_queue => get_max_ack_queue(LServer, Opts),
            mgmt_timeout => ResumeTimeout,
            mgmt_max_timeout => MaxResumeTimeout,
@@ -216,9 +218,10 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
 c2s_handle_send(State, _Pkt, _Result) ->
     State.
 
-c2s_handle_call(#{sid := {Time, _}, mod := Mod} = State,
+c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State,
                {resume_session, Time}, From) ->
-    Mod:reply(From, {resume, State}),
+    State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
+    Mod:reply(From, {resume, State1}),
     {stop, State#{mgmt_state => resumed}};
 c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) ->
     Mod:reply(From, {error, <<"Previous session not found">>}),
@@ -316,6 +319,7 @@ perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns} = State) ->
 
 -spec handle_enable(state(), sm_enable()) -> state().
 handle_enable(#{mgmt_timeout := DefaultTimeout,
+               mgmt_queue_type := QueueType,
                mgmt_max_timeout := MaxTimeout,
                mgmt_xmlns := Xmlns, jid := JID} = State,
              #sm_enable{resume = Resume, max = Max}) ->
@@ -339,7 +343,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout,
                  #sm_enabled{xmlns = Xmlns}
          end,
     State1 = State#{mgmt_state => active,
-                   mgmt_queue => queue_new(),
+                   mgmt_queue => p1_queue:new(QueueType),
                    mgmt_timeout => Timeout},
     send(State1, Res).
 
@@ -446,7 +450,7 @@ resend_rack(#{mgmt_ack_timer := _,
              mgmt_stanzas_out := NumStanzasOut,
              mgmt_stanzas_req := NumStanzasReq} = State) ->
     State1 = cancel_ack_timer(State),
-    case NumStanzasReq < NumStanzasOut andalso not queue_is_empty(Queue) of
+    case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of
        true -> send_rack(State1);
        false -> State1
     end;
@@ -460,13 +464,13 @@ mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut,
                 4294967295 -> 0;
                 Num -> Num + 1
             end,
-    Queue1 = queue_in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue),
+    Queue1 = p1_queue:in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue),
     State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum},
     check_queue_length(State1).
 
 -spec mgmt_queue_drop(state(), non_neg_integer()) -> state().
 mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) ->
-    NewQueue = queue_dropwhile(
+    NewQueue = p1_queue:dropwhile(
                 fun({N, _T, _E}) -> N =< NumHandled end, Queue),
     State#{mgmt_queue => NewQueue}.
 
@@ -475,7 +479,7 @@ check_queue_length(#{mgmt_max_queue := Limit} = State)
   when Limit == infinity; Limit == exceeded ->
     State;
 check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
-    case queue_len(Queue) > Limit of
+    case p1_queue:len(Queue) > Limit of
        true ->
            State#{mgmt_max_queue => exceeded};
        false ->
@@ -484,14 +488,14 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
 
 -spec resend_unacked_stanzas(state()) -> state().
 resend_unacked_stanzas(#{mgmt_state := MgmtState,
-                        mgmt_queue := {QueueLen, _} = Queue,
+                        mgmt_queue := Queue,
                         jid := JID} = State)
   when (MgmtState == active orelse
        MgmtState == pending orelse
-       MgmtState == timeout) andalso QueueLen > 0 ->
+       MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
     ?DEBUG("Resending ~B unacknowledged stanza(s) to ~s",
-          [QueueLen, jid:encode(JID)]),
-    queue_foldl(
+          [p1_queue:len(Queue), jid:encode(JID)]),
+    p1_queue:foldl(
       fun({_, Time, Pkt}, AccState) ->
              NewPkt = add_resent_delay_info(AccState, Pkt, Time),
              send(AccState, xmpp:put_meta(NewPkt, mgmt_is_resent, true))
@@ -504,11 +508,11 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
                        mgmt_resend := MgmtResend,
                        lang := Lang, user := User,
                        jid := JID, lserver := LServer,
-                       mgmt_queue := {QueueLen, _} = Queue,
+                       mgmt_queue := Queue,
                        resource := Resource} = State)
   when (MgmtState == active orelse
        MgmtState == pending orelse
-       MgmtState == timeout) andalso QueueLen > 0 ->
+       MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
     ResendOnTimeout = case MgmtResend of
                          Resend when is_boolean(Resend) ->
                              Resend;
@@ -522,8 +526,8 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
                              end
                      end,
     ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
-          [QueueLen, jid:encode(JID)]),
-    queue_foreach(
+          [p1_queue:len(Queue), jid:encode(JID)]),
+    p1_queue:foreach(
       fun({_, _Time, #presence{from = From}}) ->
              ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]);
         ({_, _Time, #iq{} = El}) ->
@@ -564,7 +568,8 @@ route_unacked_stanzas(_State) ->
 -spec inherit_session_state(state(), binary()) -> {ok, state()} |
                                                  {error, binary()} |
                                                  {error, binary(), non_neg_integer()}.
-inherit_session_state(#{user := U, server := S} = State, ResumeID) ->
+inherit_session_state(#{user := U, server := S,
+                       mgmt_queue_type := QueueType} = State, ResumeID) ->
     case jlib:base64_to_term(ResumeID) of
        {term, {R, Time}} ->
            case ejabberd_sm:get_session_pid(U, S, R) of
@@ -589,8 +594,12 @@ inherit_session_state(#{user := U, server := S} = State, ResumeID) ->
                                   mgmt_stanzas_in := NumStanzasIn,
                                   mgmt_stanzas_out := NumStanzasOut} = OldState} ->
                            State1 = ejabberd_c2s:copy_state(State, OldState),
+                           Queue1 = case QueueType of
+                                        ram -> Queue;
+                                        _ -> p1_queue:ram_to_file(Queue)
+                                    end,
                            State2 = State1#{mgmt_xmlns => Xmlns,
-                                            mgmt_queue => Queue,
+                                            mgmt_queue => Queue1,
                                             mgmt_timeout => Timeout,
                                             mgmt_stanzas_in => NumStanzasIn,
                                             mgmt_stanzas_out => NumStanzasOut,
@@ -632,44 +641,6 @@ add_resent_delay_info(_State, El, _Time) ->
 send(#{mod := Mod} = State, Pkt) ->
     Mod:send(State, Pkt).
 
--spec queue_new() -> lqueue().
-queue_new() ->
-    {0, queue:new()}.
-
--spec queue_in(term(), lqueue()) -> lqueue().
-queue_in(Elem, {N, Q}) ->
-    {N+1, queue:in(Elem, Q)}.
-
--spec queue_len(lqueue()) -> non_neg_integer().
-queue_len({N, _}) ->
-    N.
-
--spec queue_foldl(fun((term(), T) -> T), T, lqueue()) -> T.
-queue_foldl(F, Acc, {_N, Q}) ->
-    jlib:queue_foldl(F, Acc, Q).
-
--spec queue_foreach(fun((_) -> _), lqueue()) -> ok.
-queue_foreach(F, {_N, Q}) ->
-    jlib:queue_foreach(F, Q).
-
--spec queue_dropwhile(fun((term()) -> boolean()), lqueue()) -> lqueue().
-queue_dropwhile(F, {N, Q}) ->
-    case queue:peek(Q) of
-       {value, Item} ->
-           case F(Item) of
-               true ->
-                   queue_dropwhile(F, {N-1, queue:drop(Q)});
-               false ->
-                   {N, Q}
-           end;
-       empty ->
-           {N, Q}
-    end.
-
--spec queue_is_empty(lqueue()) -> boolean().
-queue_is_empty({N, _Q}) ->
-    N == 0.
-
 -spec cancel_ack_timer(state()) -> state().
 cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
     case erlang:cancel_timer(TRef) of
@@ -741,6 +712,17 @@ get_resend_on_timeout(Host, Opts) ->
        Resend -> Resend
     end.
 
+get_queue_type(Host, Opts) ->
+    VFun = mod_opt_type(queue_type),
+    case gen_mod:get_module_opt(Host, ?MODULE, queue_type, VFun) of
+       undefined ->
+           case gen_mod:get_opt(queue_type, Opts, VFun) of
+               undefined -> ejabberd_config:default_queue_type(Host);
+               Type -> Type
+           end;
+       Type -> Type
+    end.
+
 mod_opt_type(max_ack_queue) ->
     fun(I) when is_integer(I), I > 0 -> I;
        (infinity) -> infinity
@@ -757,6 +739,8 @@ mod_opt_type(resend_on_timeout) ->
     fun(B) when is_boolean(B) -> B;
        (if_offline) -> if_offline
     end;
+mod_opt_type(queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
 mod_opt_type(_) ->
     [max_ack_queue, resume_timeout, max_resume_timeout, ack_timeout,
-     resend_on_timeout].
+     resend_on_timeout, queue_type].
index 828e0c03c38cf2519b7a6b8c00a44c116de2f93e..efecf6df86e2d1dfdfc082c0ed393b0787c4f3b3 100644 (file)
@@ -180,7 +180,8 @@ Welcome to this XMPP server."
       mod_stats: []
       mod_time: []
       mod_version: []
-  "mnesia.localhost": 
+  "mnesia.localhost":
+    queue_type: ram
     auth_method: internal
     modules: 
       mod_announce: 
@@ -238,7 +239,8 @@ Welcome to this XMPP server."
       mod_stats: []
       mod_time: []
       mod_version: []
-  "redis.localhost": 
+  "redis.localhost":
+    queue_type: ram
     auth_method: internal
     sm_db_type: redis
     modules: 
@@ -297,7 +299,8 @@ Welcome to this XMPP server."
       mod_stats: []
       mod_time: []
       mod_version: []
-  "riak.localhost": 
+  "riak.localhost":
+    queue_type: ram
     auth_method: riak
     modules: 
       mod_announce: 
@@ -341,7 +344,8 @@ Welcome to this XMPP server."
       mod_version: []
   "localhost": 
     auth_method: [internal, anonymous]
-  "ldap.localhost": 
+  "ldap.localhost":
+    queue_type: ram
     ldap_servers: 
       - "localhost"
     ldap_rootdn: "cn=admin,dc=localhost"
@@ -374,7 +378,8 @@ Welcome to this XMPP server."
       mod_stats: []
       mod_time: []
       mod_version: []
-  "extauth.localhost": 
+  "extauth.localhost":
+    queue_type: ram
     extauth_program: "python extauth.py"
     auth_method: external
 hosts: 
@@ -450,6 +455,7 @@ listen:
       @@password@@
 loglevel: @@loglevel@@
 max_fsm_queue: 1000
+queue_type: file
 modules: 
   mod_adhoc: []
   mod_configure: []