-record(lqueue,
{
- queue = p1_queue:new() :: p1_queue:queue(),
+ queue = p1_queue:new() :: p1_queue:queue(lqueue_elem()),
max = 0 :: integer()
}).
just_created = erlang:system_time(microsecond) :: true | integer(),
activity = treap:empty() :: treap:treap(),
room_shaper = none :: ejabberd_shaper:shaper(),
- room_queue :: p1_queue:queue() | undefined
+ room_queue :: p1_queue:queue({message | presence, jid()}) | undefined
}).
-type users() :: #{ljid() => #user{}}.
%%%----------------------------------------------------------------------
{deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.10"}},
- {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.15"}}},
+ {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "2887223"}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.19"}}},
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.1.1"}}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.16"}}},
-record(state, {connection :: pid() | undefined,
num :: pos_integer(),
subscriptions = #{} :: subscriptions(),
- pending_q :: p1_queue:queue()}).
+ pending_q :: queue()}).
+-type queue() :: p1_queue:queue({{pid(), term()}, integer()}).
-type subscriptions() :: #{binary() => [pid()]}.
-type error_reason() :: binary() | timeout | disconnected | overloaded.
-type redis_error() :: {error, error_reason()}.
get_queue_type() ->
ejabberd_option:redis_queue_type().
--spec flush_queue(p1_queue:queue()) -> p1_queue:queue().
+-spec flush_queue(queue()) -> queue().
flush_queue(Q) ->
CurrTime = erlang:monotonic_time(millisecond),
p1_queue:dropwhile(
true
end, Q).
--spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue().
+-spec clean_queue(queue(), integer()) -> queue().
clean_queue(Q, CurrTime) ->
Q1 = p1_queue:dropwhile(
fun({_From, Time}) ->
id = 0 :: non_neg_integer(),
in_flight :: undefined | publish() | pubrel(),
codec :: mqtt_codec:state(),
- queue :: undefined | p1_queue:queue(),
+ queue :: undefined | p1_queue:queue(publish()),
tls :: boolean()}).
-type acks() :: #{non_neg_integer() => pubrec()}.
get_max_users_admin_threshold(StateData) ->
mod_muc_opt:max_users_admin_threshold(StateData#state.server_host).
--spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue().
+-spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue({message | presence, jid()}) | undefined.
room_queue_new(ServerHost, Shaper, QueueType) ->
HaveRoomShaper = Shaper /= none,
HaveMessageShaper = mod_muc_opt:user_message_shaper(ServerHost) /= none,
p1_queue:to_list(History#lqueue.queue)
end.
--spec filter_history(p1_queue:queue(), erlang:timestamp(),
+-spec filter_history(p1_queue:queue(lqueue_elem()), erlang:timestamp(),
binary(), muc_history()) -> [lqueue_elem()].
filter_history(Queue, Now, Nick,
#muc_history{since = Since,
true -> #lqueue{queue = Q2, max = Max}
end.
--spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue().
+-spec lqueue_cut(p1_queue:queue(lqueue_elem()), non_neg_integer()) -> p1_queue:queue(lqueue_elem()).
lqueue_cut(Q, 0) -> Q;
lqueue_cut(Q, N) ->
{_, Q1} = p1_queue:out(Q),
is_record(Pkt, sm_r)).
-type state() :: ejabberd_c2s:state().
+-type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}).
-type error_reason() :: session_not_found | session_timed_out |
session_is_dead | session_has_exited |
session_was_killed | session_copy_timed_out |
State1 = restart_pending_timer(State, Timeout),
State1#{mgmt_timeout => Timeout}.
--spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue())
+-spec queue_find(fun((stanza()) -> boolean()), queue())
-> stanza() | none.
queue_find(Pred, Queue) ->
case p1_queue:out(Queue) of