]> granicus.if.org Git - ejabberd/commitdiff
Make count_offline_messages cache work when offline uses mam for storage
authorPaweł Chmielowski <pchmielowski@process-one.net>
Mon, 1 Jul 2019 11:36:05 +0000 (13:36 +0200)
committerPaweł Chmielowski <pchmielowski@process-one.net>
Mon, 1 Jul 2019 11:36:41 +0000 (13:36 +0200)
This also replace existing cache for checking if spool is empty with this
cache.

src/mod_offline.erl
src/mod_offline_riak.erl
src/mod_offline_sql.erl
test/ejabberd_SUITE_data/ejabberd.mysql.yml
test/ejabberd_SUITE_data/ejabberd.pgsql.yml

index 27035e8408fe0df001a408daec9af8943ef8de1b..3b41ac97d6dc00de95da0393e4e11a6b6ea34fd3 100644 (file)
@@ -98,7 +98,7 @@
 -callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
 -callback remove_user(binary(), binary()) -> any().
 -callback read_message_headers(binary(), binary()) ->
-    [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}].
+    [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
 -callback read_message(binary(), binary(), non_neg_integer()) ->
     {ok, #offline_msg{}} | error.
 -callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
@@ -222,26 +222,26 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
     Mod = gen_mod:db_mod(Server, ?MODULE),
     case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
        true ->
-           ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
-               fun() ->
-                   case count_messages_in_db(Mod, User, Server) of
-                       0 ->
-                           case Mod:store_message(Msg) of
-                               ok ->
-                                   {cache, ok};
-                               Err ->
-                                   {nocache, Err}
-                           end;
-                       _ ->
-                           {cache, ok}
+           case count_offline_messages(User, Server) of
+               0 ->
+                   store_message_in_db(Mod, Msg);
+               _ ->
+                   case use_cache(Mod, Server) of
+                       true ->
+                           ets_cache:incr(
+                               ?SPOOL_COUNTER_CACHE,
+                               {User, Server}, 1,
+                               cache_nodes(Mod, Server));
+                       false ->
+                           ok
                    end
-               end);
+           end;
        false ->
            case get_max_user_messages(User, Server) of
                infinity ->
                    store_message_in_db(Mod, Msg);
                Limit ->
-                   Num = count_messages_in_db(Mod, User, Server),
+                   Num = count_offline_messages(User, Server),
                    if Num < Limit ->
                            store_message_in_db(Mod, Msg);
                       true ->
@@ -288,7 +288,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID,
             ?NS_FLEX_OFFLINE, _Lang) ->
     ejabberd_sm:route(JID, {resend_offline, false}),
            Mod = gen_mod:db_mod(S, ?MODULE),
-           Hdrs = Mod:read_message_headers(U, S),
+           Hdrs = case Mod:read_message_headers(U, S) of
+                      L when is_list(L) ->
+                          L;
+                      _ ->
+                          []
+                  end,
            BareJID = jid:remove_resource(JID),
            {result, lists:map(
                       fun({Seq, From, _To, _TS, _El}) ->
@@ -516,9 +521,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
                                    stop
                            end
                    end;
-               _ -> Acc
+               _ ->
+                   maybe_update_cache(To, Packet),
+                   Acc
+           end;
+       false ->
+           maybe_update_cache(To, Packet),
+           Acc
+    end.
+
+-spec maybe_update_cache(jid(), message()) -> ok.
+maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
+    case xmpp:get_meta(Packet, mam_archived, false) of
+       true ->
+           Mod = gen_mod:db_mod(Server, ?MODULE),
+           case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
+               true ->
+                   ets_cache:incr(
+                       ?SPOOL_COUNTER_CACHE,
+                       {User, Server}, 1,
+                       cache_nodes(Mod, Server));
+               _ ->
+                   ok
            end;
-       false -> Acc
+       _ ->
+           ok
     end.
 
 -spec check_store_hint(message()) -> store | no_store | none.
@@ -750,7 +777,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
 
 -spec read_messages(binary(), binary()) -> [{binary(), message()}].
 read_messages(LUser, LServer) ->
-    Res = read_db_messages(LUser, LServer),
+    Res = case read_db_messages(LUser, LServer) of
+             error ->
+                 [];
+             L when is_list(L) ->
+                 L
+         end,
     case use_mam_for_user(LUser, LServer) of
        true ->
            read_mam_messages(LUser, LServer, Res);
@@ -758,27 +790,32 @@ read_messages(LUser, LServer) ->
            Res
     end.
 
--spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
 read_db_messages(LUser, LServer) ->
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     CodecOpts = ejabberd_config:codec_options(),
-    lists:flatmap(
-       fun({Seq, From, To, TS, El}) ->
-           Node = integer_to_binary(Seq),
-           try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
-               Pkt ->
+    case Mod:read_message_headers(LUser, LServer) of
+       error ->
+           error;
+       L ->
+           lists:flatmap(
+               fun({Seq, From, To, TS, El}) ->
                    Node = integer_to_binary(Seq),
-                   Pkt1 = add_delay_info(Pkt, LServer, TS),
-                   Pkt2 = xmpp:set_from_to(Pkt1, From, To),
-                   [{Node, Pkt2}]
-           catch _:{xmpp_codec, Why} ->
-               ?ERROR_MSG("Failed to decode packet ~p "
-                          "of user ~s: ~s",
-                          [El, jid:encode(To),
-                           xmpp:format_error(Why)]),
-               []
-           end
-       end, Mod:read_message_headers(LUser, LServer)).
+                   try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+                       Pkt ->
+                           Node = integer_to_binary(Seq),
+                           Pkt1 = add_delay_info(Pkt, LServer, TS),
+                           Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+                           [{Node, Pkt2}]
+                   catch _:{xmpp_codec, Why} ->
+                       ?ERROR_MSG("Failed to decode packet ~p "
+                                  "of user ~s: ~s",
+                                  [El, jid:encode(To),
+                                   xmpp:format_error(Why)]),
+                       []
+                   end
+               end, L)
+    end.
 
 -spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
     {integer() | none, [message()]}.
@@ -896,13 +933,15 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
        end, 1, AllMsgs2),
     AllMsgs3.
 
--spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
-    integer().
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
+    {cache, integer()} | {nocache, integer()}.
+count_mam_messages(_LUser, _LServer, error) ->
+    {nocache, 0};
 count_mam_messages(LUser, LServer, ReadMsgs) ->
     {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
     case Start of
        none ->
-           length(ExtraMsgs);
+           {cache, length(ExtraMsgs)};
        _ ->
            MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
                                 Number when is_integer(Number) -> Number - length(ExtraMsgs);
@@ -914,7 +953,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) ->
                                           #rsm_set{max = MaxOfflineMsgs,
                                                    before = <<"9999999999999999">>},
                                           chat, only_count),
-           Count + length(ExtraMsgs)
+           {cache, Count + length(ExtraMsgs)}
     end.
 
 format_user_queue(Hdrs) ->
@@ -957,7 +996,10 @@ user_queue(User, Server, Query, Lang) ->
     US = {LUser, LServer},
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     user_queue_parse_query(LUser, LServer, Query),
-    HdrsAll = Mod:read_message_headers(LUser, LServer),
+    HdrsAll = case Mod:read_message_headers(LUser, LServer) of
+                 error -> [];
+                 L -> L
+             end,
     Hdrs = get_messages_subset(User, Server, HdrsAll),
     FMsgs = format_user_queue(Hdrs),
     [?XC(<<"h1">>,
@@ -1082,26 +1124,32 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
 count_offline_messages(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
     case use_mam_for_user(User, Server) of
        true ->
-           Res = read_db_messages(LUser, LServer),
-           count_mam_messages(LUser, LServer, Res);
+           case use_cache(Mod, LServer) of
+               true ->
+                   ets_cache:lookup(
+                       ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+                       fun() ->
+                           Res = read_db_messages(LUser, LServer),
+                           count_mam_messages(LUser, LServer, Res)
+                       end);
+               false ->
+                   Res = read_db_messages(LUser, LServer),
+                   ets_cache:untag(count_mam_messages(LUser, LServer, Res))
+           end;
        _ ->
-           Mod = gen_mod:db_mod(LServer, ?MODULE),
-           count_messages_in_db(Mod, LUser, LServer)
-    end.
-
--spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
-count_messages_in_db(Mod, LUser, LServer) ->
-    case use_cache(Mod, LServer) of
-       true ->
-           ets_cache:lookup(
-             ?SPOOL_COUNTER_CACHE, {LUser, LServer},
-             fun() ->
-                     Mod:count_messages(LUser, LServer)
-             end);
-       false ->
-           ets_cache:untag(Mod:count_messages(LUser, LServer))
+           case use_cache(Mod, LServer) of
+               true ->
+                   ets_cache:lookup(
+                       ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+                       fun() ->
+                           Mod:count_messages(LUser, LServer)
+                       end);
+               false ->
+                   ets_cache:untag(Mod:count_messages(LUser, LServer))
+           end
     end.
 
 -spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.
index 2c1ddea7c4309c0b4fc4dad23f5bea2f80dc7a40..3e126c12cccfe5896043d8bb2755f4e9cd6be1fd 100644 (file)
@@ -88,7 +88,7 @@ read_message_headers(LUser, LServer) ->
                     end, Rs),
            lists:keysort(1, Hdrs);
        _Err ->
-           []
+           error
     end.
 
 read_message(_LUser, _LServer, I) ->
index 640cc071ede98bbbdcf3e09883e9240cb5084512..2846b28b05e5d5cb70b8a09c9b7574940ab00ccf 100644 (file)
@@ -131,7 +131,7 @@ read_message_headers(LUser, LServer) ->
                      end
              end, Rows);
        _Err ->
-           []
+           error
     end.
 
 read_message(LUser, LServer, Seq) ->
index 3de3c3f603f595a07dbcf9944bdb5123b6008999..0b0550e18b2b2ae907e078a0ec1cf58227f8fab0 100644 (file)
@@ -22,6 +22,7 @@ define_macro:
         db_type: sql
         ram_db_type: sql
       mod_offline:
+        use_cache: true
         db_type: sql
       mod_privacy:
         db_type: sql
index c64ccdd96d8dcaa54c4225593f15283364eee0fb..637fc61bce60ec8ae7356bdf06da4d78107e3ddd 100644 (file)
@@ -22,6 +22,7 @@ define_macro:
         db_type: sql
         ram_db_type: sql
       mod_offline:
+        use_cache: true
         db_type: sql
       mod_privacy:
         db_type: sql