]> granicus.if.org Git - ejabberd/commitdiff
mod_client_state: Queue stanzas of each full JID
authorHolger Weiss <holger@zedat.fu-berlin.de>
Thu, 6 Apr 2017 20:19:00 +0000 (22:19 +0200)
committerHolger Weiss <holger@zedat.fu-berlin.de>
Thu, 6 Apr 2017 20:19:00 +0000 (22:19 +0200)
Keep the latest stanzas of each given full JID, rather than dropping
them when stanzas from a different resource are received.  This change
makes sure the recipient receives the latest status of all clients of
each contact.  It also ensures the recipient will see the current list
of occupants of joined MUC rooms.

src/mod_client_state.erl

index a5ac611f65b24fedb09f40524b403d53036ce6ba..0d92cb2df8ba8a6041cfc536de89e82af7df1c2c 100644 (file)
@@ -49,6 +49,8 @@
 -type csi_type() :: presence | chatstate | {pep, binary()}.
 -type csi_queue() :: {non_neg_integer(), map()}.
 -type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}.
+-type csi_key() :: {ljid(), csi_type()}.
+-type csi_element() :: {csi_timestamp(), stanza()}.
 -type c2s_state() :: ejabberd_c2s:state().
 -type filter_acc() :: {stanza() | drop, c2s_state()}.
 
@@ -320,25 +322,21 @@ enqueue_stanza(Type, Stanza, #{csi_state := inactive,
            C2SState1 = flush_queue(C2SState),
            enqueue_stanza(Type, Stanza, C2SState1);
        false ->
-           #jid{luser = U, lserver = S} = xmpp:get_from(Stanza),
-           Q1 = queue_in({U, S}, Type, Stanza, Q),
+           From = jid:tolower(xmpp:get_from(Stanza)),
+           Q1 = queue_in({From, Type}, Stanza, Q),
            {stop, {drop, C2SState#{csi_queue => Q1}}}
     end;
 enqueue_stanza(_Type, Stanza, State) ->
     {Stanza, State}.
 
 -spec dequeue_sender(jid(), c2s_state()) -> c2s_state().
-dequeue_sender(#jid{luser = U, lserver = S},
+dequeue_sender(#jid{luser = U, lserver = S} = Sender,
               #{csi_queue := Q, jid := JID} = C2SState) ->
     ?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s",
           [U, S, jid:encode(JID)]),
-    case queue_take({U, S}, Q) of
-       {Stanzas, Q1} ->
-           C2SState1 = flush_stanzas(C2SState, Stanzas),
-           C2SState1#{csi_queue => Q1};
-      error ->
-           C2SState
-    end.
+    {Elems, Q1} = queue_take(Sender, Q),
+    C2SState1 = flush_stanzas(C2SState, Elems),
+    C2SState1#{csi_queue => Q1}.
 
 -spec flush_queue(c2s_state()) -> c2s_state().
 flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
@@ -350,7 +348,7 @@ flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
                    [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state().
 flush_stanzas(#{lserver := LServer} = C2SState, Elems) ->
     lists:foldl(
-      fun({_Type, Time, Stanza}, AccState) ->
+      fun({Time, Stanza}, AccState) ->
              Stanza1 = add_delay_info(Stanza, LServer, Time),
              ejabberd_c2s:send(AccState, Stanza1)
       end, C2SState, Elems).
@@ -381,46 +379,27 @@ get_pep_node(#message{} = Msg) ->
 queue_new() ->
     {0, #{}}.
 
--spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue().
-queue_in(Key, Type, Val, {Seq, Q}) ->
+-spec queue_in(csi_key(), csi_element(), csi_queue()) -> csi_queue().
+queue_in(Key, Val, {Seq, Q}) ->
     Seq1 = Seq + 1,
     Time = {Seq1, p1_time_compat:timestamp()},
-    case maps:get(Key, Q, error) of
-       error ->
-           Q1 = maps:put(Key, [{Type, Time, Val}], Q),
-           {Seq1, Q1};
-       TypeVals ->
-           case lists:keymember(Type, 1, TypeVals) of
-               true ->
-                   TypeVals1 = lists:keyreplace(
-                                 Type, 1, TypeVals, {Type, Time, Val}),
-                   Q1 = maps:put(Key, TypeVals1, Q),
-                   {Seq1, Q1};
-               false ->
-                   TypeVals1 = [{Type, Time, Val}|TypeVals],
-                   Q1 = maps:put(Key, TypeVals1, Q),
-                   {Seq1, Q1}
-           end
-    end.
-
--spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error.
-queue_take(Key, {Seq, Q}) ->
-    case maps:get(Key, Q, error) of
-       error ->
-           error;
-       TypeVals ->
-           Q1 = maps:remove(Key, Q),
-           {lists:keysort(2, TypeVals), {Seq, Q1}}
-    end.
+    Q1 = maps:put(Key, {Time, Val}, Q),
+    {Seq1, Q1}.
+
+-spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}.
+queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) ->
+    {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ})
+                                  when U == LUser, S == LServer ->
+                                  {[Val | AccVals], maps:remove(Key, AccQ)};
+                              (_, _, Acc) ->
+                                  Acc
+                           end, {[], Q}, Q),
+    {lists:keysort(1, Vals), {Seq, Q1}}.
 
 -spec queue_len(csi_queue()) -> non_neg_integer().
 queue_len({_, Q}) ->
     maps:size(Q).
 
--spec queue_to_list(csi_queue()) -> [term()].
-queue_to_list({_, _, Q}) ->
-    TypeVals = maps:fold(
-                fun(_, Vals, Acc) ->
-                        Vals ++ Acc
-                end, [], Q),
-    lists:keysort(2, TypeVals).
+-spec queue_to_list(csi_queue()) -> [csi_element()].
+queue_to_list({_, Q}) ->
+    lists:keysort(1, maps:values(Q)).