-type csi_type() :: presence | chatstate | {pep, binary()}.
-type csi_queue() :: {non_neg_integer(), map()}.
-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}.
+-type csi_key() :: {ljid(), csi_type()}.
+-type csi_element() :: {csi_timestamp(), stanza()}.
-type c2s_state() :: ejabberd_c2s:state().
-type filter_acc() :: {stanza() | drop, c2s_state()}.
C2SState1 = flush_queue(C2SState),
enqueue_stanza(Type, Stanza, C2SState1);
false ->
- #jid{luser = U, lserver = S} = xmpp:get_from(Stanza),
- Q1 = queue_in({U, S}, Type, Stanza, Q),
+ From = jid:tolower(xmpp:get_from(Stanza)),
+ Q1 = queue_in({From, Type}, Stanza, Q),
{stop, {drop, C2SState#{csi_queue => Q1}}}
end;
enqueue_stanza(_Type, Stanza, State) ->
{Stanza, State}.
-spec dequeue_sender(jid(), c2s_state()) -> c2s_state().
-dequeue_sender(#jid{luser = U, lserver = S},
+dequeue_sender(#jid{luser = U, lserver = S} = Sender,
#{csi_queue := Q, jid := JID} = C2SState) ->
?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s",
[U, S, jid:encode(JID)]),
- case queue_take({U, S}, Q) of
- {Stanzas, Q1} ->
- C2SState1 = flush_stanzas(C2SState, Stanzas),
- C2SState1#{csi_queue => Q1};
- error ->
- C2SState
- end.
+ {Elems, Q1} = queue_take(Sender, Q),
+ C2SState1 = flush_stanzas(C2SState, Elems),
+ C2SState1#{csi_queue => Q1}.
-spec flush_queue(c2s_state()) -> c2s_state().
flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
[{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state().
flush_stanzas(#{lserver := LServer} = C2SState, Elems) ->
lists:foldl(
- fun({_Type, Time, Stanza}, AccState) ->
+ fun({Time, Stanza}, AccState) ->
Stanza1 = add_delay_info(Stanza, LServer, Time),
ejabberd_c2s:send(AccState, Stanza1)
end, C2SState, Elems).
queue_new() ->
{0, #{}}.
--spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue().
-queue_in(Key, Type, Val, {Seq, Q}) ->
+-spec queue_in(csi_key(), csi_element(), csi_queue()) -> csi_queue().
+queue_in(Key, Val, {Seq, Q}) ->
Seq1 = Seq + 1,
Time = {Seq1, p1_time_compat:timestamp()},
- case maps:get(Key, Q, error) of
- error ->
- Q1 = maps:put(Key, [{Type, Time, Val}], Q),
- {Seq1, Q1};
- TypeVals ->
- case lists:keymember(Type, 1, TypeVals) of
- true ->
- TypeVals1 = lists:keyreplace(
- Type, 1, TypeVals, {Type, Time, Val}),
- Q1 = maps:put(Key, TypeVals1, Q),
- {Seq1, Q1};
- false ->
- TypeVals1 = [{Type, Time, Val}|TypeVals],
- Q1 = maps:put(Key, TypeVals1, Q),
- {Seq1, Q1}
- end
- end.
-
--spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error.
-queue_take(Key, {Seq, Q}) ->
- case maps:get(Key, Q, error) of
- error ->
- error;
- TypeVals ->
- Q1 = maps:remove(Key, Q),
- {lists:keysort(2, TypeVals), {Seq, Q1}}
- end.
+ Q1 = maps:put(Key, {Time, Val}, Q),
+ {Seq1, Q1}.
+
+-spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}.
+queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) ->
+ {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ})
+ when U == LUser, S == LServer ->
+ {[Val | AccVals], maps:remove(Key, AccQ)};
+ (_, _, Acc) ->
+ Acc
+ end, {[], Q}, Q),
+ {lists:keysort(1, Vals), {Seq, Q1}}.
-spec queue_len(csi_queue()) -> non_neg_integer().
queue_len({_, Q}) ->
maps:size(Q).
--spec queue_to_list(csi_queue()) -> [term()].
-queue_to_list({_, _, Q}) ->
- TypeVals = maps:fold(
- fun(_, Vals, Acc) ->
- Vals ++ Acc
- end, [], Q),
- lists:keysort(2, TypeVals).
+-spec queue_to_list(csi_queue()) -> [csi_element()].
+queue_to_list({_, Q}) ->
+ lists:keysort(1, maps:values(Q)).