From 02064ae12afe9ebbe92196575b427436398fd680 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 10 Mar 2017 15:12:43 +0300 Subject: [PATCH] Add support for file-based queues It's now possible to use files as internal packet queues. The following options are introduced: * queue_type: the option can be set to `ram` (default) or `file`. The option can be set per virtual host. * queue_dir: path to the directory where queues will be allocated. The default is 'queue' directory inside Mnesia directory. This is a global option and cannot be set per virtual host. --- include/mod_muc_room.hrl | 9 +- rebar.config | 4 +- src/ejabberd_app.erl | 11 ++ src/ejabberd_bosh.erl | 73 ++++++----- src/ejabberd_config.erl | 18 ++- src/ejabberd_s2s.erl | 14 ++- src/ejabberd_s2s_out.erl | 43 +++++-- src/ejabberd_sql.erl | 75 ++++++------ src/jlib.erl | 48 +------- src/mod_bosh.erl | 6 +- src/mod_irc_connection.erl | 12 +- src/mod_mam.erl | 5 +- src/mod_muc.erl | 51 +++++--- src/mod_muc_admin.erl | 5 +- src/mod_muc_room.erl | 168 +++++++++++++++----------- src/mod_stream_mgmt.erl | 98 +++++++-------- test/ejabberd_SUITE_data/ejabberd.yml | 16 ++- 17 files changed, 350 insertions(+), 306 deletions(-) diff --git a/include/mod_muc_room.hrl b/include/mod_muc_room.hrl index c0d8f50bd..cf30153ac 100644 --- a/include/mod_muc_room.hrl +++ b/include/mod_muc_room.hrl @@ -28,9 +28,8 @@ -record(lqueue, { - queue = queue:new() :: ?TQUEUE, - len = 0 :: integer(), - max = 0 :: integer() + queue :: p1_queue:queue(), + max = 0 :: integer() }). -type lqueue() :: #lqueue{}. @@ -112,11 +111,11 @@ robots = (?DICT):new() :: ?TDICT, nicks = (?DICT):new() :: ?TDICT, affiliations = (?DICT):new() :: ?TDICT, - history = #lqueue{} :: lqueue(), + history :: lqueue(), subject = <<"">> :: binary(), subject_author = <<"">> :: binary(), just_created = false :: boolean(), activity = treap:empty() :: treap:treap(), room_shaper = none :: shaper:shaper(), - room_queue = queue:new() :: ?TQUEUE + room_queue :: p1_queue:queue() | undefined }). diff --git a/rebar.config b/rebar.config index f38591955..7088cfefe 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ %%%---------------------------------------------------------------------- {deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}}, - {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.7"}}}, + {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}}, {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}}, @@ -81,6 +81,7 @@ {i, "include"}, {i, "deps/fast_xml/include"}, {i, "deps/xmpp/include"}, + {i, "deps/p1_utils/include"}, {if_var_false, debug, no_debug_info}, {if_var_true, debug, debug_info}, {if_var_true, roster_gateway_workaround, {d, 'ROSTER_GATWAY_WORKAROUND'}}, @@ -134,6 +135,7 @@ {eunit_compile_opts, [{i, "tools"}, {i, "include"}, + {i, "deps/p1_utils/include"}, {i, "deps/fast_xml/include"}, {i, "deps/xmpp/include"}]}. diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index f4d10e5a5..01f0c4c99 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -49,6 +49,7 @@ start(normal, _Args) -> setup_if_elixir_conf_used(), ejabberd_config:start(), set_settings_from_config(), + file_queue_init(), maybe_add_nameservers(), connect_nodes(), case ejabberd_sup:start_link() of @@ -167,6 +168,16 @@ set_settings_from_config() -> 60), net_kernel:set_net_ticktime(Ticktime). +file_queue_init() -> + QueueDir = case ejabberd_config:queue_dir() of + undefined -> + {ok, MnesiaDir} = application:get_env(mnesia, dir), + filename:join(MnesiaDir, "queue"); + Path -> + Path + end, + p1_queue:start(QueueDir). + start_apps() -> crypto:start(), ejabberd:start_app(sasl), diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 204c7b6e5..d34736a44 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -96,8 +96,8 @@ -record(state, {host = <<"">> :: binary(), sid = <<"">> :: binary(), - el_ibuf = buf_new() :: ?TQUEUE, - el_obuf = buf_new() :: ?TQUEUE, + el_ibuf :: p1_queue:queue(), + el_obuf :: p1_queue:queue(), shaper_state = none :: shaper:shaper(), c2s_pid :: pid() | undefined, xmpp_ver = <<"">> :: binary(), @@ -111,7 +111,7 @@ max_concat = unlimited :: unlimited | non_neg_integer(), responses = gb_trees:empty() :: ?TGB_TREE, receivers = gb_trees:empty() :: ?TGB_TREE, - shaped_receivers = queue:new() :: ?TQUEUE, + shaped_receivers :: p1_queue:queue(), ip :: inet:ip_address(), max_requests = 1 :: non_neg_integer()}). @@ -305,10 +305,10 @@ init([#body{attrs = Attrs}, IP, SID]) -> false) of true -> JID = make_random_jid(XMPPDomain), - {buf_new(), [{jid, JID} | Opts2]}; + {buf_new(XMPPDomain), [{jid, JID} | Opts2]}; false -> {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)], - buf_new()), + buf_new(XMPPDomain)), Opts2} end, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, @@ -321,10 +321,12 @@ init([#body{attrs = Attrs}, IP, SID]) -> fun(unlimited) -> unlimited; (N) when is_integer(N), N>0 -> N end, unlimited), + ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), State = #state{host = XMPPDomain, sid = SID, ip = IP, xmpp_ver = XMPPVer, el_ibuf = InBuf, - max_concat = MaxConcat, el_obuf = buf_new(), + max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), inactivity_timeout = Inactivity, + shaped_receivers = ShapedReceivers, shaper_state = ShaperState}, NewState = restart_inactivity_timer(State), mod_bosh:open_session(SID, self()), @@ -417,15 +419,15 @@ active(#body{attrs = Attrs, size = Size} = Req, From, shaper:update(State#state.shaper_state, Size), State1 = State#state{shaper_state = ShaperState}, if Pause > 0 -> - QLen = queue:len(State1#state.shaped_receivers), - if QLen < (?MAX_SHAPED_REQUESTS_QUEUE_LEN) -> - TRef = start_shaper_timer(Pause), - Q = queue:in({TRef, From, Req}, - State1#state.shaped_receivers), - State2 = stop_inactivity_timer(State1), - {next_state, active, - State2#state{shaped_receivers = Q}}; - true -> + TRef = start_shaper_timer(Pause), + try p1_queue:in({TRef, From, Req}, + State1#state.shaped_receivers) of + Q -> + State2 = stop_inactivity_timer(State1), + {next_state, active, + State2#state{shaped_receivers = Q}} + catch error:full -> + cancel_timer(TRef), RID = get_attr(rid, Attrs), reply_stop(State1, #body{http_reason = <<"Too many requests">>, @@ -572,7 +574,7 @@ handle_sync_event({send_xml, El}, _From, StateName, reply(State2, Body#body{els = Els}, State2#state.prev_rid, From)}; none -> - State2 = case queue:out(State1#state.shaped_receivers) + State2 = case p1_queue:out(State1#state.shaped_receivers) of {{value, {TRef, From, Body}}, Q} -> cancel_timer(TRef), @@ -601,7 +603,7 @@ handle_info({timeout, TRef, inactive}, _StateName, {stop, normal, State}; handle_info({timeout, TRef, shaper_timeout}, StateName, State) -> - case queue:out(State#state.shaped_receivers) of + case p1_queue:out(State#state.shaped_receivers) of {{value, {TRef, From, Req}}, Q} -> (?GEN_FSM):send_event(self(), {Req, From}), {next_state, StateName, @@ -646,9 +648,13 @@ code_change(_OldVsn, StateName, State, _Extra) -> print_state(State) -> State. -route_els(#state{el_ibuf = Buf} = State) -> - route_els(State#state{el_ibuf = buf_new()}, - buf_to_list(Buf)). +route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) -> + NewBuf = p1_queue:dropwhile( + fun(El) -> + ?GEN_FSM:send_event(C2SPid, El), + true + end, Buf), + State#state{el_ibuf = NewBuf}. route_els(State, Els) -> case State#state.c2s_pid of @@ -734,7 +740,7 @@ bounce_receivers(State, Reason) -> RID = get_attr(rid, Attrs), {RID, {From, Body}} end, - queue:to_list(State#state.shaped_receivers)), + p1_queue:to_list(State#state.shaped_receivers)), lists:foldl(fun ({RID, {From, Body}}, AccState) -> NewBody = if Reason == closed -> #body{http_reason = @@ -752,7 +758,7 @@ bounce_receivers(State, Reason) -> State, Receivers ++ ShapedReceivers). bounce_els_from_obuf(State) -> - lists:foreach( + p1_queue:foreach( fun({xmlstreamelement, El}) -> try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of Pkt when ?is_stanza(Pkt) -> @@ -769,7 +775,7 @@ bounce_els_from_obuf(State) -> end; (_) -> ok - end, buf_to_list(State#state.el_obuf)). + end, State#state.el_obuf). is_valid_key(<<"">>, <<"">>) -> true; is_valid_key(PrevKey, Key) -> @@ -1029,26 +1035,33 @@ get_attr(Attr, Attrs, Default) -> _ -> Default end. -buf_new() -> queue:new(). +buf_new(Host) -> + buf_new(Host, unlimited). + +buf_new(Host, Limit) -> + QueueType = case gen_mod:get_module_opt( + Host, mod_bosh, queue_type, + mod_bosh:mod_opt_type(queue_type)) of + undefined -> ejabberd_config:default_queue_type(Host); + T -> T + end, + p1_queue:new(QueueType, Limit). buf_in(Xs, Buf) -> - lists:foldl(fun (X, Acc) -> queue:in(X, Acc) end, Buf, - Xs). + lists:foldl(fun p1_queue:in/2, Buf, Xs). buf_out(Buf, Num) when is_integer(Num), Num > 0 -> buf_out(Buf, Num, []); -buf_out(Buf, _) -> {queue:to_list(Buf), buf_new()}. +buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}. buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf}; buf_out(Buf, I, Els) -> - case queue:out(Buf) of + case p1_queue:out(Buf) of {{value, El}, NewBuf} -> buf_out(NewBuf, I - 1, [El | Els]); {empty, _} -> buf_out(Buf, 0, Els) end. -buf_to_list(Buf) -> queue:to_list(Buf). - cancel_timer(TRef) when is_reference(TRef) -> (?GEN_FSM):cancel_timer(TRef); cancel_timer(_) -> false. diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index 720e4cafa..5856546ce 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -37,7 +37,7 @@ env_binary_to_list/2, opt_type/1, may_hide_data/1, is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1, default_db/1, default_db/2, default_ram_db/1, default_ram_db/2, - fsm_limit_opts/1]). + default_queue_type/1, queue_dir/0, fsm_limit_opts/1]). -export([start/2]). @@ -1455,9 +1455,13 @@ opt_type(default_ram_db) -> fun(T) when is_atom(T) -> T end; opt_type(loglevel) -> fun (P) when P >= 0, P =< 5 -> P end; +opt_type(queue_dir) -> + fun iolist_to_binary/1; +opt_type(queue_type) -> + fun(ram) -> ram; (file) -> file end; opt_type(_) -> - [hide_sensitive_log_data, hosts, language, - default_db, default_ram_db, loglevel]. + [hide_sensitive_log_data, hosts, language, max_fsm_queue, + default_db, default_ram_db, queue_type, queue_dir, loglevel]. -spec may_hide_data(any()) -> any(). may_hide_data(Data) -> @@ -1486,3 +1490,11 @@ fsm_limit_opts(Opts) -> N -> [{max_queue, N}] end end. + +-spec queue_dir() -> binary() | undefined. +queue_dir() -> + get_option(queue_dir, opt_type(queue_dir)). + +-spec default_queue_type(binary()) -> ram | file. +default_queue_type(Host) -> + get_option({queue_type, Host}, opt_type(queue_type), ram). diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index 99ee6de9f..40492d8da 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -45,7 +45,7 @@ external_host_overloaded/1, is_temporarly_blocked/1, get_commands_spec/0, zlib_enabled/1, get_idle_timeout/1, tls_required/1, tls_verify/1, tls_enabled/1, tls_options/2, - host_up/1, host_down/1]). + host_up/1, host_down/1, queue_type/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, @@ -285,6 +285,14 @@ get_idle_timeout(LServer) -> (infinity) -> infinity end, timer:minutes(10)). +-spec queue_type(binary()) -> ram | file. +queue_type(LServer) -> + case ejabberd_config:get_option( + {s2s_queue_type, LServer}, opt_type(s2s_queue_type)) of + undefined -> ejabberd_config:default_queue_type(LServer); + Type -> Type + end. + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -739,7 +747,9 @@ opt_type(s2s_timeout) -> fun(I) when is_integer(I), I>=0 -> I; (infinity) -> infinity end; +opt_type(s2s_queue_type) -> + fun(ram) -> ram; (file) -> file end; opt_type(_) -> [route_subdomains, s2s_access, s2s_certfile, s2s_ciphers, s2s_dhfile, s2s_cafile, s2s_protocol_options, - s2s_tls_compression, s2s_use_starttls, s2s_timeout]. + s2s_tls_compression, s2s_use_starttls, s2s_timeout, s2s_queue_type]. diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 77f555475..8c9f9d631 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -277,8 +277,14 @@ handle_timeout(#{on_route := Action} = State) -> init([#{server := LServer, remote_server := RServer} = State, Opts]) -> ServerHost = ejabberd_router:host_of_route(LServer), + QueueType = ejabberd_s2s:queue_type(LServer), + QueueLimit = case lists:keyfind( + max_queue, 1, ejabberd_config:fsm_limit_opts([])) of + {_, N} -> N; + false -> unlimited + end, State1 = State#{on_route => queue, - queue => queue:new(), + queue => p1_queue:new(QueueType, QueueLimit), xmlns => ?NS_SERVER, lang => ?MYLANG, server_host => ServerHost, @@ -300,7 +306,15 @@ handle_cast(Msg, #{server_host := ServerHost} = State) -> handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) -> case Action of - queue -> State#{queue => queue:in(Pkt, Q)}; + queue -> + try State#{queue => p1_queue:in(Pkt, Q)} + catch error:full -> + #{server := LServer, remote_server := RServer} = State, + ?INFO_MSG("Failed to establish outbound s2s connection " + "~s -> ~s: message queue is overloaded", + [LServer, RServer]), + stop(State#{stop_reason => queue_full}) + end; bounce -> bounce_packet(Pkt, State); send -> set_idle_timeout(send(State, Pkt)) end; @@ -324,20 +338,18 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== -spec resend_queue(state()) -> state(). -resend_queue(#{queue := Q} = State) -> - State1 = State#{queue => queue:new()}, - jlib:queue_foldl( +resend_queue(State) -> + queue_fold( fun(Pkt, AccState) -> send(AccState, Pkt) - end, State1, Q). + end, State). -spec bounce_queue(state()) -> state(). -bounce_queue(#{queue := Q} = State) -> - State1 = State#{queue => queue:new()}, - jlib:queue_foldl( +bounce_queue(State) -> + queue_fold( fun(Pkt, AccState) -> bounce_packet(Pkt, AccState) - end, State1, Q). + end, State). -spec bounce_message_queue(state()) -> state(). bounce_message_queue(State) -> @@ -363,6 +375,8 @@ mk_bounce_error(Lang, #{stop_reason := Why}) -> case Why of internal_failure -> xmpp:err_internal_server_error(); + queue_full -> + xmpp:err_resource_constraint(); {dns, _} -> xmpp:err_remote_server_not_found(Reason, Lang); _ -> @@ -387,6 +401,15 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) -> set_idle_timeout(State) -> State. +queue_fold(F, #{queue := Q} = State) -> + case p1_queue:out(Q) of + {{value, Pkt}, Q1} -> + State1 = F(Pkt, State#{queue => Q1}), + queue_fold(F, State1); + {empty, Q1} -> + State#{queue => Q1} + end. + transform_options(Opts) -> lists:foldl(fun transform_options/2, [], Opts). diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 51ff9d436..616b6b73a 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -75,8 +75,7 @@ db_version = undefined :: undefined | non_neg_integer(), start_interval = 0 :: non_neg_integer(), host = <<"">> :: binary(), - max_pending_requests_len :: non_neg_integer(), - pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}). + pending_requests :: p1_queue:queue()}). -define(STATE_KEY, ejabberd_sql_state). @@ -271,10 +270,16 @@ init([Host, StartInterval]) -> [DBType | _] = db_opts(Host), (?GEN_FSM):send_event(self(), connect), ejabberd_sql_sup:add_pid(Host, self()), + QueueType = case ejabberd_config:get_option( + {sql_queue_type, Host}, opt_type(sql_queue_type)) of + undefined -> + ejabberd_config:default_queue_type(Host); + Type -> + Type + end, {ok, connecting, #state{db_type = DBType, host = Host, - max_pending_requests_len = max_fsm_queue(), - pending_requests = {0, queue:new()}, + pending_requests = p1_queue:new(QueueType, max_fsm_queue()), start_interval = StartInterval}}. connecting(connect, #state{host = Host} = State) -> @@ -285,16 +290,17 @@ connecting(connect, #state{host = Host} = State) -> [mssql | Args] -> apply(fun odbc_connect/1, Args); [odbc | Args] -> apply(fun odbc_connect/1, Args) end, - {_, PendingRequests} = State#state.pending_requests, case ConnectRes of {ok, Ref} -> erlang:monitor(process, Ref), - lists:foreach(fun (Req) -> - (?GEN_FSM):send_event(self(), Req) - end, - queue:to_list(PendingRequests)), + PendingRequests = + p1_queue:dropwhile( + fun(Req) -> + ?GEN_FSM:send_event(self(), Req), + true + end, State#state.pending_requests), State1 = State#state{db_ref = Ref, - pending_requests = {0, queue:new()}}, + pending_requests = PendingRequests}, State2 = get_db_version(State1), {next_state, session_established, State2}; {error, Reason} -> @@ -321,26 +327,20 @@ connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> ?DEBUG("queuing pending request while connecting:~n\t~p", [Req]), - {Len, PendingRequests} = State#state.pending_requests, - NewPendingRequests = if Len < - State#state.max_pending_requests_len -> - {Len + 1, - queue:in({sql_cmd, Command, From, Timestamp}, - PendingRequests)}; - true -> - lists:foreach(fun ({sql_cmd, _, To, - _Timestamp}) -> - (?GEN_FSM):reply(To, - {error, - <<"SQL connection failed">>}) - end, - queue:to_list(PendingRequests)), - {1, - queue:from_list([{sql_cmd, Command, From, - Timestamp}])} - end, + PendingRequests = + try p1_queue:in({sql_cmd, Command, From, Timestamp}, + State#state.pending_requests) + catch error:full -> + Q = p1_queue:dropwhile( + fun({sql_cmd, _, To, _Timestamp}) -> + (?GEN_FSM):reply( + To, {error, <<"SQL connection failed">>}), + true + end, State#state.pending_requests), + p1_queue:in({sql_cmd, Command, From, Timestamp}, Q) + end, {next_state, connecting, - State#state{pending_requests = NewPendingRequests}}; + State#state{pending_requests = PendingRequests}}; connecting(Request, {Who, _Ref}, State) -> ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'", [Request, Who]), @@ -1068,15 +1068,10 @@ odbcinst_config() -> filename:join(tmp_dir(), "odbcinst.ini"). max_fsm_queue() -> - ejabberd_config:get_option( - max_fsm_queue, - fun(N) when is_integer(N), N > 0 -> N end). + proplists:get_value(max_queue, fsm_limit_opts(), unlimited). fsm_limit_opts() -> - case max_fsm_queue() of - N when is_integer(N) -> [{max_queue, N}]; - _ -> [] - end. + ejabberd_config:fsm_limit_opts([]). check_error({error, Why} = Err, #sql_query{} = Query) -> ?ERROR_MSG("SQL query '~s' at ~p failed: ~p", @@ -1093,8 +1088,6 @@ check_error({error, Why} = Err, Query) -> check_error(Result, _Query) -> Result. -opt_type(max_fsm_queue) -> - fun (N) when is_integer(N), N > 0 -> N end; opt_type(sql_database) -> fun iolist_to_binary/1; opt_type(sql_keepalive_interval) -> fun (I) when is_integer(I), I > 0 -> I end; @@ -1114,8 +1107,10 @@ opt_type(sql_ssl) -> fun(B) when is_boolean(B) -> B end; opt_type(sql_ssl_verify) -> fun(B) when is_boolean(B) -> B end; opt_type(sql_ssl_certfile) -> fun iolist_to_binary/1; opt_type(sql_ssl_cafile) -> fun iolist_to_binary/1; +opt_type(sql_queue_type) -> + fun(ram) -> ram; (file) -> file end; opt_type(_) -> - [max_fsm_queue, sql_database, sql_keepalive_interval, + [sql_database, sql_keepalive_interval, sql_password, sql_port, sql_server, sql_type, sql_username, sql_ssl, sql_ssl_verify, sql_ssl_cerfile, - sql_ssl_cafile]. + sql_ssl_cafile, sql_queue_type]. diff --git a/src/jlib.erl b/src/jlib.erl index 580ad1ffa..38a71d7cc 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -38,8 +38,7 @@ -export([tolower/1, term_to_base64/1, base64_to_term/1, decode_base64/1, encode_base64/1, ip_to_list/1, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1, - queue_drop_while/2, queue_foldl/3, queue_foldr/3, queue_foreach/2]). + l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1]). %% The following functions are used by gen_iq_handler.erl for providing backward %% compatibility and must not be used in other parts of the code @@ -960,48 +959,3 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. - --spec queue_drop_while(fun((term()) -> boolean()), ?TQUEUE) -> ?TQUEUE. - -queue_drop_while(F, Q) -> - case queue:peek(Q) of - {value, Item} -> - case F(Item) of - true -> - queue_drop_while(F, queue:drop(Q)); - _ -> - Q - end; - empty -> - Q - end. - --spec queue_foldl(fun((term(), T) -> T), T, ?TQUEUE) -> T. -queue_foldl(F, Acc, Q) -> - case queue:out(Q) of - {{value, Item}, Q1} -> - Acc1 = F(Item, Acc), - queue_foldl(F, Acc1, Q1); - {empty, _} -> - Acc - end. - --spec queue_foldr(fun((term(), T) -> T), T, ?TQUEUE) -> T. -queue_foldr(F, Acc, Q) -> - case queue:out_r(Q) of - {{value, Item}, Q1} -> - Acc1 = F(Item, Acc), - queue_foldr(F, Acc1, Q1); - {empty, _} -> - Acc - end. - --spec queue_foreach(fun((_) -> _), ?TQUEUE) -> ok. -queue_foreach(F, Q) -> - case queue:out(Q) of - {{value, Item}, Q1} -> - F(Item), - queue_foreach(F, Q1); - {empty, _} -> - ok - end. diff --git a/src/mod_bosh.erl b/src/mod_bosh.erl index abe3c2f16..57c819537 100644 --- a/src/mod_bosh.erl +++ b/src/mod_bosh.erl @@ -158,9 +158,11 @@ mod_opt_type(prebind) -> fun (B) when is_boolean(B) -> B end; mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +mod_opt_type(queue_type) -> + fun(ram) -> ram; (file) -> file end; mod_opt_type(_) -> - [json, max_concat, max_inactivity, max_pause, prebind, ram_db_type]. - + [json, max_concat, max_inactivity, max_pause, prebind, ram_db_type, + queue_type]. %%%---------------------------------------------------------------------- %%% Help Web Page diff --git a/src/mod_irc_connection.erl b/src/mod_irc_connection.erl index 46ed8767e..1e90c4005 100644 --- a/src/mod_irc_connection.erl +++ b/src/mod_irc_connection.erl @@ -50,7 +50,6 @@ encoding = <<"">> :: binary(), port = 0 :: inet:port_number(), password = <<"">> :: binary(), - queue = queue:new() :: ?TQUEUE, user = #jid{} :: jid(), host = <<"">> :: binary(), server = <<"">> :: binary(), @@ -112,7 +111,7 @@ init([From, Host, Server, Username, Encoding, Port, Password, Ident, RemoteAddr, RealName, WebircPassword, Mod]) -> gen_fsm:send_event(self(), init), {ok, open_socket, - #state{queue = queue:new(), mod = Mod, + #state{mod = Mod, encoding = Encoding, port = Port, password = Password, user = From, nick = Username, host = Host, server = Server, ident = Ident, realname = RealName, @@ -695,15 +694,6 @@ send_text(#state{socket = Socket, encoding = Encoding}, CText = iconv:convert(<<"utf-8">>, Encoding, iolist_to_binary(Text)), gen_tcp:send(Socket, CText). -%send_queue(Socket, Q) -> -% case queue:out(Q) of -% {{value, El}, Q1} -> -% send_element(Socket, El), -% send_queue(Socket, Q1); -% {empty, Q1} -> -% ok -% end. - bounce_messages(Reason) -> receive {send_element, El} -> diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 47a9c6ce1..04d5ec686 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -837,7 +837,8 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM, history = History}} = MsgType) -> Start = proplists:get_value(start, Query), End = proplists:get_value('end', Query), - #lqueue{len = L, queue = Q} = History, + #lqueue{queue = Q} = History, + L = p1_queue:len(Q), Msgs = lists:flatmap( fun({Nick, Pkt, _HaveSubject, Now, _Size}) -> @@ -861,7 +862,7 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM, false -> [] end - end, queue:to_list(Q)), + end, p1_queue:to_list(Q)), case RSM of #rsm_set{max = Max, before = Before} when is_binary(Before) -> {NewMsgs, IsComplete} = filter_by_max(lists:reverse(Msgs), Max), diff --git a/src/mod_muc.erl b/src/mod_muc.erl index 563f4c68a..7e8c4e78d 100644 --- a/src/mod_muc.erl +++ b/src/mod_muc.erl @@ -80,6 +80,7 @@ access = {none, none, none, none} :: {atom(), atom(), atom(), atom()}, history_size = 20 :: non_neg_integer(), max_rooms_discoitems = 100 :: non_neg_integer(), + queue_type = ram :: ram | file, default_room_opts = [] :: list(), room_shaper = none :: shaper:shaper()}). @@ -226,7 +227,7 @@ init([Host, Opts]) -> IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, one_queue), #state{access = Access, host = MyHost, - history_size = HistorySize, + history_size = HistorySize, queue_type = QueueType, room_shaper = RoomShaper} = State = init_state(Host, Opts), Mod = gen_mod:db_mod(Host, Opts, ?MODULE), RMod = gen_mod:ram_db_mod(Host, Opts, ?MODULE), @@ -234,7 +235,7 @@ init([Host, Opts]) -> RMod:init(Host, [{host, MyHost}|Opts]), register_iq_handlers(MyHost, IQDisc), ejabberd_router:register_route(MyHost, Host), - load_permanent_rooms(MyHost, Host, Access, HistorySize, RoomShaper), + load_permanent_rooms(MyHost, Host, Access, HistorySize, RoomShaper, QueueType), {ok, State}. handle_call(stop, _From, State) -> @@ -242,7 +243,7 @@ handle_call(stop, _From, State) -> handle_call({create, Room, From, Nick, Opts}, _From, #state{host = Host, server_host = ServerHost, access = Access, default_room_opts = DefOpts, - history_size = HistorySize, + history_size = HistorySize, queue_type = QueueType, room_shaper = RoomShaper} = State) -> ?DEBUG("MUC: create new room '~s'~n", [Room]), NewOpts = case Opts of @@ -253,7 +254,7 @@ handle_call({create, Room, From, Nick, Opts}, _From, Host, ServerHost, Access, Room, HistorySize, RoomShaper, From, - Nick, NewOpts), + Nick, NewOpts, QueueType), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod:register_online_room(Room, Host, Pid), {reply, ok, State}. @@ -300,13 +301,14 @@ handle_cast(Msg, State) -> handle_info({route, Packet}, #state{host = Host, server_host = ServerHost, access = Access, default_room_opts = DefRoomOpts, - history_size = HistorySize, + history_size = HistorySize, queue_type = QueueType, max_rooms_discoitems = MaxRoomsDiscoItems, room_shaper = RoomShaper} = State) -> From = xmpp:get_from(Packet), To = xmpp:get_to(Packet), case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems) of + From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems, + QueueType) of {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]); _ -> @@ -353,6 +355,13 @@ init_state(Host, Opts) -> DefRoomOpts1 = gen_mod:get_opt(default_room_options, Opts, fun(L) when is_list(L) -> L end, []), + QueueType = case gen_mod:get_opt(queue_type, Opts, + mod_opt_type(queue_type)) of + undefined -> + ejabberd_config:default_queue_type(Host); + Type -> + Type + end, DefRoomOpts = lists:flatmap( fun({Opt, Val}) -> @@ -410,6 +419,7 @@ init_state(Host, Opts) -> server_host = Host, access = {Access, AccessCreate, AccessAdmin, AccessPersistent}, default_room_opts = DefRoomOpts, + queue_type = QueueType, history_size = HistorySize, max_rooms_discoitems = MaxRoomsDiscoItems, room_shaper = RoomShaper}. @@ -437,12 +447,12 @@ unregister_iq_handlers(Host) -> gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS). do_route(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems) -> + From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems, QueueType) -> {AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent} = Access, case acl:match_rule(ServerHost, AccessRoute, From) of allow -> do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts); + From, To, Packet, DefRoomOpts, QueueType); deny -> Lang = xmpp:get_lang(Packet), ErrText = <<"Access denied by service policy">>, @@ -452,11 +462,11 @@ do_route(Host, ServerHost, Access, HistorySize, RoomShaper, do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper, _From, #jid{luser = <<"">>, lresource = <<"">>} = _To, - #iq{} = IQ, _DefRoomOpts) -> + #iq{} = IQ, _DefRoomOpts, _QueueType) -> ejabberd_local:process_iq(IQ); do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper, From, #jid{luser = <<"">>, lresource = <<"">>} = _To, - #message{lang = Lang, body = Body, type = Type} = Packet, _) -> + #message{lang = Lang, body = Body, type = Type} = Packet, _, _) -> {_AccessRoute, _AccessCreate, AccessAdmin, _AccessPersistent} = Access, if Type == error -> ok; @@ -473,11 +483,11 @@ do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper, end end; do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper, - _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts) -> + _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts, _) -> Err = xmpp:err_service_unavailable(), ejabberd_router:route_error(Packet, Err); do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts) -> + From, To, Packet, DefRoomOpts, QueueType) -> {_AccessRoute, AccessCreate, _AccessAdmin, _AccessPersistent} = Access, {Room, _, Nick} = jid:tolower(To), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), @@ -492,7 +502,8 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, {ok, Pid} = start_new_room( Host, ServerHost, Access, Room, HistorySize, - RoomShaper, From, Nick, DefRoomOpts), + RoomShaper, From, Nick, DefRoomOpts, + QueueType), RMod:register_online_room(Room, Host, Pid), mod_muc_room:route(Pid, Packet), ok; @@ -659,7 +670,7 @@ get_rooms(ServerHost, Host) -> Mod:get_rooms(LServer, Host). load_permanent_rooms(Host, ServerHost, Access, - HistorySize, RoomShaper) -> + HistorySize, RoomShaper, QueueType) -> RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), lists:foreach( fun(R) -> @@ -669,7 +680,7 @@ load_permanent_rooms(Host, ServerHost, Access, {ok, Pid} = mod_muc_room:start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - R#muc_room.opts), + R#muc_room.opts, QueueType), RMod:register_online_room(Room, Host, Pid); {ok, _} -> ok @@ -679,17 +690,17 @@ load_permanent_rooms(Host, ServerHost, Access, start_new_room(Host, ServerHost, Access, Room, HistorySize, RoomShaper, From, - Nick, DefRoomOpts) -> + Nick, DefRoomOpts, QueueType) -> case restore_room(ServerHost, Host, Room) of error -> ?DEBUG("MUC: open new room '~s'~n", [Room]), mod_muc_room:start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - From, Nick, DefRoomOpts); + From, Nick, DefRoomOpts, QueueType); Opts -> ?DEBUG("MUC: restore room '~s'~n", [Room]), mod_muc_room:start(Host, ServerHost, Access, Room, - HistorySize, RoomShaper, Opts) + HistorySize, RoomShaper, Opts, QueueType) end. -spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(), @@ -956,11 +967,13 @@ mod_opt_type(user_message_shaper) -> fun (A) when is_atom(A) -> A end; mod_opt_type(user_presence_shaper) -> fun (A) when is_atom(A) -> A end; +mod_opt_type(queue_type) -> + fun(ram) -> ram; (file) -> file end; mod_opt_type(_) -> [access, access_admin, access_create, access_persistent, db_type, ram_db_type, default_room_options, history_size, host, max_room_desc, max_room_id, max_room_name, max_rooms_discoitems, max_user_conferences, max_users, max_users_admin_threshold, max_users_presence, - min_message_interval, min_presence_interval, + min_message_interval, min_presence_interval, queue_type, regexp_room_id, room_shaper, user_message_shaper, user_presence_shaper]. diff --git a/src/mod_muc_admin.erl b/src/mod_muc_admin.erl index 4b1509fc9..d94eb3086 100644 --- a/src/mod_muc_admin.erl +++ b/src/mod_muc_admin.erl @@ -471,6 +471,8 @@ create_room_with_opts(Name1, Host1, ServerHost, CustomRoomOpts) -> AcPer = gen_mod:get_module_opt(ServerHost, mod_muc, access_persistent, fun(X) -> X end, all), HistorySize = gen_mod:get_module_opt(ServerHost, mod_muc, history_size, fun(X) -> X end, 20), RoomShaper = gen_mod:get_module_opt(ServerHost, mod_muc, room_shaper, fun(X) -> X end, none), + QueueType = gen_mod:get_module_opt(ServerHost, mod_muc, queue_type, fun(X) -> X end, + ejabberd_config:default_queue_type(ServerHost)), %% If the room does not exist yet in the muc_online_room case mod_muc:find_online_room(Name, Host) of @@ -483,7 +485,8 @@ create_room_with_opts(Name1, Host1, ServerHost, CustomRoomOpts) -> Name, HistorySize, RoomShaper, - RoomOpts), + RoomOpts, + QueueType), mod_muc:register_online_room(Host, Name, Pid), ok; {ok, _} -> diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index 8ab14a92b..2b3390666 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -30,10 +30,10 @@ -behaviour(gen_fsm). %% External exports --export([start_link/9, - start_link/7, - start/9, - start/7, +-export([start_link/10, + start_link/8, + start/10, + start/8, get_role/2, get_affiliation/2, is_occupant_or_admin/2, @@ -93,25 +93,25 @@ %%% API %%%---------------------------------------------------------------------- start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - Creator, Nick, DefRoomOpts) -> + Creator, Nick, DefRoomOpts, QueueType) -> gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], + RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). -start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> +start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Opts], + RoomShaper, Opts, QueueType], ?FSMOPTS). start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - Creator, Nick, DefRoomOpts) -> + Creator, Nick, DefRoomOpts, QueueType) -> gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], + RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). -start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> +start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Opts], + RoomShaper, Opts, QueueType], ?FSMOPTS). %%%---------------------------------------------------------------------- @@ -119,15 +119,17 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> %%%---------------------------------------------------------------------- init([Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, _Nick, DefRoomOpts]) -> + RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) -> process_flag(trap_exit, true), Shaper = shaper:new(RoomShaper), + RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_affiliation(Creator, owner, #state{host = Host, server_host = ServerHost, access = Access, room = Room, - history = lqueue_new(HistorySize), + history = lqueue_new(HistorySize, QueueType), jid = jid:make(Room, Host), just_created = true, + room_queue = RoomQueue, room_shaper = Shaper}), State1 = set_opts(DefRoomOpts, State), store_room(State1), @@ -136,15 +138,17 @@ init([Host, ServerHost, Access, Room, HistorySize, add_to_log(room_existence, created, State1), add_to_log(room_existence, started, State1), {ok, normal_state, State1}; -init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) -> +init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) -> process_flag(trap_exit, true), Shaper = shaper:new(RoomShaper), + RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_opts(Opts, #state{host = Host, server_host = ServerHost, access = Access, room = Room, - history = lqueue_new(HistorySize), + history = lqueue_new(HistorySize, QueueType), jid = jid:make(Room, Host), + room_queue = RoomQueue, room_shaper = Shaper}), add_to_log(room_existence, started, State), {ok, normal_state, State}. @@ -175,7 +179,10 @@ normal_state({route, <<"">>, MessageShaperInterval == 0 -> {RoomShaper, RoomShaperInterval} = shaper:update(StateData#state.room_shaper, Size), - RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), + RoomQueueEmpty = case StateData#state.room_queue of + undefined -> true; + RQ -> p1_queue:is_empty(RQ) + end, if RoomShaperInterval == 0, RoomQueueEmpty -> NewActivity = Activity#activity{ message_time = Now, @@ -200,8 +207,8 @@ normal_state({route, <<"">>, message_time = Now, message_shaper = MessageShaper, message = Packet}, - RoomQueue = queue:in({message, From}, - StateData#state.room_queue), + RoomQueue = p1_queue:in({message, From}, + StateData#state.room_queue), StateData2 = store_user_activity(From, NewActivity, StateData1), @@ -584,8 +591,8 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. handle_info({process_user_presence, From}, normal_state = _StateName, StateData) -> - RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), - RoomQueue = queue:in({presence, From}, StateData#state.room_queue), + RoomQueueEmpty = p1_queue:is_empty(StateData#state.room_queue), + RoomQueue = p1_queue:in({presence, From}, StateData#state.room_queue), StateData1 = StateData#state{room_queue = RoomQueue}, if RoomQueueEmpty -> StateData2 = prepare_room_queue(StateData1), @@ -595,9 +602,9 @@ handle_info({process_user_presence, From}, normal_state = _StateName, StateData) handle_info({process_user_message, From}, normal_state = _StateName, StateData) -> RoomQueueEmpty = - queue:is_empty(StateData#state.room_queue), - RoomQueue = queue:in({message, From}, - StateData#state.room_queue), + p1_queue:is_empty(StateData#state.room_queue), + RoomQueue = p1_queue:in({message, From}, + StateData#state.room_queue), StateData1 = StateData#state{room_queue = RoomQueue}, if RoomQueueEmpty -> StateData2 = prepare_room_queue(StateData1), @@ -606,7 +613,7 @@ handle_info({process_user_message, From}, end; handle_info(process_room_queue, normal_state = StateName, StateData) -> - case queue:out(StateData#state.room_queue) of + case p1_queue:out(StateData#state.room_queue) of {{value, {message, From}}, RoomQueue} -> Activity = get_user_activity(From, StateData), Packet = Activity#activity.message, @@ -1418,6 +1425,32 @@ get_max_users_admin_threshold(StateData) -> fun(I) when is_integer(I), I>0 -> I end, 5). +-spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue(). +room_queue_new(ServerHost, Shaper, QueueType) -> + HaveRoomShaper = Shaper /= none, + HaveMessageShaper = gen_mod:get_module_opt( + ServerHost, mod_muc, user_message_shaper, + fun(A) when is_atom(A) -> A end, + none) /= none, + HavePresenceShaper = gen_mod:get_module_opt( + ServerHost, mod_muc, user_presence_shaper, + fun(A) when is_atom(A) -> A end, + none) /= none, + HaveMinMessageInterval = gen_mod:get_module_opt( + ServerHost, mod_muc, min_message_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) /= 0, + HaveMinPresenceInterval = gen_mod:get_module_opt( + ServerHost, mod_muc, min_presence_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) /= 0, + if HaveRoomShaper or HaveMessageShaper or HavePresenceShaper + or HaveMinMessageInterval or HaveMinPresenceInterval -> + p1_queue:new(QueueType); + true -> + undefined + end. + -spec get_user_activity(jid(), state()) -> #activity{}. get_user_activity(JID, StateData) -> case treap:lookup(jid:tolower(JID), @@ -1515,7 +1548,7 @@ clean_treap(Treap, CleanPriority) -> -spec prepare_room_queue(state()) -> state(). prepare_room_queue(StateData) -> - case queue:out(StateData#state.room_queue) of + case p1_queue:out(StateData#state.room_queue) of {{value, {message, From}}, _RoomQueue} -> Activity = get_user_activity(From, StateData), Packet = Activity#activity.message, @@ -1997,38 +2030,34 @@ get_history(Nick, Packet, #state{history = History}) -> #muc{history = #muc_history{} = MUCHistory} -> Now = p1_time_compat:timestamp(), Q = History#lqueue.queue, - {NewQ, Len} = filter_history(Q, MUCHistory, Now, Nick, queue:new(), 0, 0), - History#lqueue{queue = NewQ, len = Len}; + filter_history(Q, Now, Nick, MUCHistory); _ -> - History + p1_queue:to_list(History#lqueue.queue) end. --spec filter_history(?TQUEUE, muc_history(), erlang:timestamp(), binary(), - ?TQUEUE, non_neg_integer(), non_neg_integer()) -> - {?TQUEUE, non_neg_integer()}. -filter_history(Queue, #muc_history{since = Since, - seconds = Seconds, - maxstanzas = MaxStanzas, - maxchars = MaxChars} = MUC, - Now, Nick, AccQueue, NumStanzas, NumChars) -> - case queue:out_r(Queue) of - {{value, {_, _, _, TimeStamp, Size} = Elem}, NewQueue} -> - NowDiff = timer:now_diff(Now, TimeStamp) div 1000000, - Chars = Size + byte_size(Nick) + 1, - if (NumStanzas < MaxStanzas) andalso - (TimeStamp > Since) andalso - (NowDiff =< Seconds) andalso - (NumChars + Chars =< MaxChars) -> - filter_history(NewQueue, MUC, Now, Nick, - queue:in_r(Elem, AccQueue), - NumStanzas + 1, - NumChars + Chars); - true -> - {AccQueue, NumStanzas} - end; - {empty, _} -> - {AccQueue, NumStanzas} - end. +-spec filter_history(p1_queue:queue(), erlang:timestamp(), + binary(), muc_history()) -> list(). +filter_history(Queue, Now, Nick, + #muc_history{since = Since, + seconds = Seconds, + maxstanzas = MaxStanzas, + maxchars = MaxChars}) -> + {History, _, _} = + lists:foldr( + fun({_, _, _, TimeStamp, Size} = Elem, + {Elems, NumStanzas, NumChars} = Acc) -> + NowDiff = timer:now_diff(Now, TimeStamp) div 1000000, + Chars = Size + byte_size(Nick) + 1, + if (NumStanzas < MaxStanzas) andalso + (TimeStamp > Since) andalso + (NowDiff =< Seconds) andalso + (NumChars + Chars =< MaxChars) -> + {[Elem|Elems], NumStanzas + 1, NumChars + Chars}; + true -> + Acc + end + end, {[], 0, 0}, p1_queue:to_list(Queue)), + History. -spec is_room_overcrowded(state()) -> boolean(). is_room_overcrowded(StateData) -> @@ -2381,31 +2410,28 @@ status_codes(IsInitialPresence, _IsSelfPresence = true, StateData) -> end; status_codes(_IsInitialPresence, _IsSelfPresence = false, _StateData) -> []. --spec lqueue_new(non_neg_integer()) -> lqueue(). -lqueue_new(Max) -> - #lqueue{queue = queue:new(), len = 0, max = Max}. +-spec lqueue_new(non_neg_integer(), ram | file) -> lqueue(). +lqueue_new(Max, Type) -> + #lqueue{queue = p1_queue:new(Type), max = Max}. -spec lqueue_in(term(), lqueue()) -> lqueue(). %% If the message queue limit is set to 0, do not store messages. lqueue_in(_Item, LQ = #lqueue{max = 0}) -> LQ; %% Otherwise, rotate messages in the queue store. -lqueue_in(Item, - #lqueue{queue = Q1, len = Len, max = Max}) -> - Q2 = queue:in(Item, Q1), +lqueue_in(Item, #lqueue{queue = Q1, max = Max}) -> + Len = p1_queue:len(Q1), + Q2 = p1_queue:in(Item, Q1), if Len >= Max -> Q3 = lqueue_cut(Q2, Len - Max + 1), - #lqueue{queue = Q3, len = Max, max = Max}; - true -> #lqueue{queue = Q2, len = Len + 1, max = Max} + #lqueue{queue = Q3, max = Max}; + true -> #lqueue{queue = Q2, max = Max} end. --spec lqueue_cut(queue:queue(), non_neg_integer()) -> queue:queue(). +-spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue(). lqueue_cut(Q, 0) -> Q; lqueue_cut(Q, N) -> - {_, Q1} = queue:out(Q), lqueue_cut(Q1, N - 1). - --spec lqueue_to_list(lqueue()) -> list(). -lqueue_to_list(#lqueue{queue = Q1}) -> - queue:to_list(Q1). + {_, Q1} = p1_queue:out(Q), + lqueue_cut(Q1, N - 1). -spec add_message_to_history(binary(), jid(), message(), state()) -> state(). add_message_to_history(FromNick, FromJID, Packet, StateData) -> @@ -2436,7 +2462,7 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) -> StateData end. --spec send_history(jid(), lqueue(), state()) -> ok. +-spec send_history(jid(), list(), state()) -> ok. send_history(JID, History, StateData) -> lists:foreach( fun({Nick, Packet, _HaveSubject, _TimeStamp, _Size}) -> @@ -2445,7 +2471,7 @@ send_history(JID, History, StateData) -> Packet, jid:replace_resource(StateData#state.jid, Nick), JID)) - end, lqueue_to_list(History)). + end, History). -spec send_subject(jid(), state()) -> ok. send_subject(JID, #state{subject_author = Nick} = StateData) -> diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index f0152a722..535d014f1 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -36,6 +36,7 @@ -include("xmpp.hrl"). -include("logger.hrl"). +-include("p1_queue.hrl"). -define(is_sm_packet(Pkt), is_record(Pkt, sm_enable) or @@ -44,7 +45,6 @@ is_record(Pkt, sm_r)). -type state() :: ejabberd_c2s:state(). --type lqueue() :: {non_neg_integer(), queue:queue()}. %%%=================================================================== %%% API @@ -102,6 +102,7 @@ c2s_stream_init({ok, State}, Opts) -> ({max_resume_timeout, _}) -> true; ({ack_timeout, _}) -> true; ({resend_on_timeout, _}) -> true; + ({queue_type, _}) -> true; (_) -> false end, Opts), {ok, State#{mgmt_options => MgmtOpts}}; @@ -114,6 +115,7 @@ c2s_stream_started(#{lserver := LServer, mgmt_options := Opts} = State, ResumeTimeout = get_resume_timeout(LServer, Opts), MaxResumeTimeout = get_max_resume_timeout(LServer, Opts, ResumeTimeout), State1#{mgmt_state => inactive, + mgmt_queue_type => get_queue_type(LServer, Opts), mgmt_max_queue => get_max_ack_queue(LServer, Opts), mgmt_timeout => ResumeTimeout, mgmt_max_timeout => MaxResumeTimeout, @@ -216,9 +218,10 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, c2s_handle_send(State, _Pkt, _Result) -> State. -c2s_handle_call(#{sid := {Time, _}, mod := Mod} = State, +c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, {resume_session, Time}, From) -> - Mod:reply(From, {resume, State}), + State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, + Mod:reply(From, {resume, State1}), {stop, State#{mgmt_state => resumed}}; c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> Mod:reply(From, {error, <<"Previous session not found">>}), @@ -316,6 +319,7 @@ perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns} = State) -> -spec handle_enable(state(), sm_enable()) -> state(). handle_enable(#{mgmt_timeout := DefaultTimeout, + mgmt_queue_type := QueueType, mgmt_max_timeout := MaxTimeout, mgmt_xmlns := Xmlns, jid := JID} = State, #sm_enable{resume = Resume, max = Max}) -> @@ -339,7 +343,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout, #sm_enabled{xmlns = Xmlns} end, State1 = State#{mgmt_state => active, - mgmt_queue => queue_new(), + mgmt_queue => p1_queue:new(QueueType), mgmt_timeout => Timeout}, send(State1, Res). @@ -446,7 +450,7 @@ resend_rack(#{mgmt_ack_timer := _, mgmt_stanzas_out := NumStanzasOut, mgmt_stanzas_req := NumStanzasReq} = State) -> State1 = cancel_ack_timer(State), - case NumStanzasReq < NumStanzasOut andalso not queue_is_empty(Queue) of + case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of true -> send_rack(State1); false -> State1 end; @@ -460,13 +464,13 @@ mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut, 4294967295 -> 0; Num -> Num + 1 end, - Queue1 = queue_in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue), + Queue1 = p1_queue:in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue), State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum}, check_queue_length(State1). -spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) -> - NewQueue = queue_dropwhile( + NewQueue = p1_queue:dropwhile( fun({N, _T, _E}) -> N =< NumHandled end, Queue), State#{mgmt_queue => NewQueue}. @@ -475,7 +479,7 @@ check_queue_length(#{mgmt_max_queue := Limit} = State) when Limit == infinity; Limit == exceeded -> State; check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> - case queue_len(Queue) > Limit of + case p1_queue:len(Queue) > Limit of true -> State#{mgmt_max_queue => exceeded}; false -> @@ -484,14 +488,14 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> -spec resend_unacked_stanzas(state()) -> state(). resend_unacked_stanzas(#{mgmt_state := MgmtState, - mgmt_queue := {QueueLen, _} = Queue, + mgmt_queue := Queue, jid := JID} = State) when (MgmtState == active orelse MgmtState == pending orelse - MgmtState == timeout) andalso QueueLen > 0 -> + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> ?DEBUG("Resending ~B unacknowledged stanza(s) to ~s", - [QueueLen, jid:encode(JID)]), - queue_foldl( + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foldl( fun({_, Time, Pkt}, AccState) -> NewPkt = add_resent_delay_info(AccState, Pkt, Time), send(AccState, xmpp:put_meta(NewPkt, mgmt_is_resent, true)) @@ -504,11 +508,11 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, mgmt_resend := MgmtResend, lang := Lang, user := User, jid := JID, lserver := LServer, - mgmt_queue := {QueueLen, _} = Queue, + mgmt_queue := Queue, resource := Resource} = State) when (MgmtState == active orelse MgmtState == pending orelse - MgmtState == timeout) andalso QueueLen > 0 -> + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> ResendOnTimeout = case MgmtResend of Resend when is_boolean(Resend) -> Resend; @@ -522,8 +526,8 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, end end, ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s", - [QueueLen, jid:encode(JID)]), - queue_foreach( + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foreach( fun({_, _Time, #presence{from = From}}) -> ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]); ({_, _Time, #iq{} = El}) -> @@ -564,7 +568,8 @@ route_unacked_stanzas(_State) -> -spec inherit_session_state(state(), binary()) -> {ok, state()} | {error, binary()} | {error, binary(), non_neg_integer()}. -inherit_session_state(#{user := U, server := S} = State, ResumeID) -> +inherit_session_state(#{user := U, server := S, + mgmt_queue_type := QueueType} = State, ResumeID) -> case jlib:base64_to_term(ResumeID) of {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of @@ -589,8 +594,12 @@ inherit_session_state(#{user := U, server := S} = State, ResumeID) -> mgmt_stanzas_in := NumStanzasIn, mgmt_stanzas_out := NumStanzasOut} = OldState} -> State1 = ejabberd_c2s:copy_state(State, OldState), + Queue1 = case QueueType of + ram -> Queue; + _ -> p1_queue:ram_to_file(Queue) + end, State2 = State1#{mgmt_xmlns => Xmlns, - mgmt_queue => Queue, + mgmt_queue => Queue1, mgmt_timeout => Timeout, mgmt_stanzas_in => NumStanzasIn, mgmt_stanzas_out => NumStanzasOut, @@ -632,44 +641,6 @@ add_resent_delay_info(_State, El, _Time) -> send(#{mod := Mod} = State, Pkt) -> Mod:send(State, Pkt). --spec queue_new() -> lqueue(). -queue_new() -> - {0, queue:new()}. - --spec queue_in(term(), lqueue()) -> lqueue(). -queue_in(Elem, {N, Q}) -> - {N+1, queue:in(Elem, Q)}. - --spec queue_len(lqueue()) -> non_neg_integer(). -queue_len({N, _}) -> - N. - --spec queue_foldl(fun((term(), T) -> T), T, lqueue()) -> T. -queue_foldl(F, Acc, {_N, Q}) -> - jlib:queue_foldl(F, Acc, Q). - --spec queue_foreach(fun((_) -> _), lqueue()) -> ok. -queue_foreach(F, {_N, Q}) -> - jlib:queue_foreach(F, Q). - --spec queue_dropwhile(fun((term()) -> boolean()), lqueue()) -> lqueue(). -queue_dropwhile(F, {N, Q}) -> - case queue:peek(Q) of - {value, Item} -> - case F(Item) of - true -> - queue_dropwhile(F, {N-1, queue:drop(Q)}); - false -> - {N, Q} - end; - empty -> - {N, Q} - end. - --spec queue_is_empty(lqueue()) -> boolean(). -queue_is_empty({N, _Q}) -> - N == 0. - -spec cancel_ack_timer(state()) -> state(). cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) -> case erlang:cancel_timer(TRef) of @@ -741,6 +712,17 @@ get_resend_on_timeout(Host, Opts) -> Resend -> Resend end. +get_queue_type(Host, Opts) -> + VFun = mod_opt_type(queue_type), + case gen_mod:get_module_opt(Host, ?MODULE, queue_type, VFun) of + undefined -> + case gen_mod:get_opt(queue_type, Opts, VFun) of + undefined -> ejabberd_config:default_queue_type(Host); + Type -> Type + end; + Type -> Type + end. + mod_opt_type(max_ack_queue) -> fun(I) when is_integer(I), I > 0 -> I; (infinity) -> infinity @@ -757,6 +739,8 @@ mod_opt_type(resend_on_timeout) -> fun(B) when is_boolean(B) -> B; (if_offline) -> if_offline end; +mod_opt_type(queue_type) -> + fun(ram) -> ram; (file) -> file end; mod_opt_type(_) -> [max_ack_queue, resume_timeout, max_resume_timeout, ack_timeout, - resend_on_timeout]. + resend_on_timeout, queue_type]. diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 828e0c03c..efecf6df8 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -180,7 +180,8 @@ Welcome to this XMPP server." mod_stats: [] mod_time: [] mod_version: [] - "mnesia.localhost": + "mnesia.localhost": + queue_type: ram auth_method: internal modules: mod_announce: @@ -238,7 +239,8 @@ Welcome to this XMPP server." mod_stats: [] mod_time: [] mod_version: [] - "redis.localhost": + "redis.localhost": + queue_type: ram auth_method: internal sm_db_type: redis modules: @@ -297,7 +299,8 @@ Welcome to this XMPP server." mod_stats: [] mod_time: [] mod_version: [] - "riak.localhost": + "riak.localhost": + queue_type: ram auth_method: riak modules: mod_announce: @@ -341,7 +344,8 @@ Welcome to this XMPP server." mod_version: [] "localhost": auth_method: [internal, anonymous] - "ldap.localhost": + "ldap.localhost": + queue_type: ram ldap_servers: - "localhost" ldap_rootdn: "cn=admin,dc=localhost" @@ -374,7 +378,8 @@ Welcome to this XMPP server." mod_stats: [] mod_time: [] mod_version: [] - "extauth.localhost": + "extauth.localhost": + queue_type: ram extauth_program: "python extauth.py" auth_method: external hosts: @@ -450,6 +455,7 @@ listen: @@password@@ loglevel: @@loglevel@@ max_fsm_queue: 1000 +queue_type: file modules: mod_adhoc: [] mod_configure: [] -- 2.40.0