]> granicus.if.org Git - ejabberd/commitdiff
Cache number of offline messages
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Sun, 30 Jun 2019 18:14:37 +0000 (21:14 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Sun, 30 Jun 2019 18:14:37 +0000 (21:14 +0300)
rebar.config
src/mod_offline.erl
src/mod_offline_mnesia.erl
src/mod_offline_opt.erl
src/mod_offline_riak.erl
src/mod_offline_sql.erl

index 4f477ebfad137d4a2e397b7a4ec8abeead24082f..e8c88a18a0a1b353ef2789d4e46038616091ac77 100644 (file)
@@ -20,7 +20,7 @@
 
 {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.10"}},
         {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "2887223"}},
-        {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "8c4487c"}},
+        {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "a425873340"}},
         {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.1.1"}}},
         {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.16"}}},
         {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", "7fd02f3a2f"}},
index 01c4e04bcf6724c3a0981d6a42c172904c264bd5..27035e8408fe0df001a408daec9af8943ef8de1b 100644 (file)
@@ -85,6 +85,7 @@
 -define(MAX_USER_MESSAGES, infinity).
 
 -define(EMPTY_SPOOL_CACHE, offline_empty_cache).
+-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache).
 
 -type c2s_state() :: ejabberd_c2s:state().
 
 -callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
 -callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
 -callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
--callback count_messages(binary(), binary()) -> non_neg_integer().
+-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
 
--optional_callbacks([remove_expired_messages/1, remove_old_messages/2]).
+-optional_callbacks([remove_expired_messages/1, remove_old_messages/2,
+                    use_cache/1, cache_nodes/1]).
 
 depends(_Host, _Opts) ->
     [].
@@ -113,7 +117,7 @@ depends(_Host, _Opts) ->
 start(Host, Opts) ->
     Mod = gen_mod:db_mod(Opts, ?MODULE),
     Mod:init(Host, Opts),
-    init_cache(Opts),
+    init_cache(Mod, Host, Opts),
     ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
                       store_packet, 50),
     ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@@ -163,33 +167,64 @@ stop(Host) ->
 reload(Host, NewOpts, OldOpts) ->
     NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
     OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
-    init_cache(NewOpts),
+    init_cache(NewMod, Host, NewOpts),
     if NewMod /= OldMod ->
            NewMod:init(Host, NewOpts);
        true ->
            ok
     end.
 
-init_cache(Opts) ->
+init_cache(Mod, Host, Opts) ->
+    CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)},
+                {life_time, mod_offline_opt:cache_life_time(Opts)},
+                {cache_missed, false}],
     case mod_offline_opt:use_mam_for_storage(Opts) of
         true ->
-           MaxSize = mod_offline_opt:cache_size(Opts),
-           LifeTime = mod_offline_opt:cache_life_time(Opts),
-           COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}],
-            ets_cache:new(?EMPTY_SPOOL_CACHE, COpts);
+            ets_cache:new(?EMPTY_SPOOL_CACHE, CacheOpts);
         false ->
             ets_cache:delete(?EMPTY_SPOOL_CACHE)
+    end,
+    case use_cache(Mod, Host) of
+       true ->
+           ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts);
+       false ->
+           ets_cache:delete(?SPOOL_COUNTER_CACHE)
+    end.
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+    case erlang:function_exported(Mod, use_cache, 1) of
+        true -> Mod:use_cache(Host);
+        false -> mod_offline_opt:use_cache(Host)
+    end.
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+    case erlang:function_exported(Mod, cache_nodes, 1) of
+        true -> Mod:cache_nodes(Host);
+        false -> ejabberd_cluster:get_nodes()
+    end.
+
+-spec flush_cache(module(), binary(), binary()) -> ok.
+flush_cache(Mod, User, Server) ->
+    case use_cache(Mod, Server) of
+       true ->
+           ets_cache:delete(?SPOOL_COUNTER_CACHE,
+                            {User, Server},
+                            cache_nodes(Mod, Server));
+       false ->
+           ok
     end.
 
 -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
 store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
     UseMam = use_mam_for_user(User, Server),
+    Mod = gen_mod:db_mod(Server, ?MODULE),
     case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
        true ->
-           Mod = gen_mod:db_mod(Server, ?MODULE),
            ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
                fun() ->
-                   case count_messages_in_db(User, Server) of
+                   case count_messages_in_db(Mod, User, Server) of
                        0 ->
                            case Mod:store_message(Msg) of
                                ok ->
@@ -202,15 +237,14 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
                    end
                end);
        false ->
-           Mod = gen_mod:db_mod(Server, ?MODULE),
            case get_max_user_messages(User, Server) of
                infinity ->
-                   Mod:store_message(Msg);
+                   store_message_in_db(Mod, Msg);
                Limit ->
-                   Num = count_messages_in_db(User, Server),
+                   Num = count_messages_in_db(Mod, User, Server),
                    if Num < Limit ->
-                       Mod:store_message(Msg);
-                       true ->
+                           store_message_in_db(Mod, Msg);
+                      true ->
                            {error, full}
                    end
            end
@@ -407,6 +441,7 @@ remove_msg_by_node(To, Seq) ->
            LServer = To#jid.lserver,
            Mod = gen_mod:db_mod(LServer, ?MODULE),
            Mod:remove_message(LUser, LServer, I),
+           flush_cache(Mod, LUser, LServer),
            true;
        _ ->
            false
@@ -573,6 +608,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State)
                                                packet = Msg}
                               end, read_mam_messages(LUser, LServer, OffMsgs));
                       _ ->
+                          flush_cache(Mod, LUser, LServer),
                           OffMsgs
                   end;
               _ ->
@@ -619,16 +655,24 @@ remove_expired_messages(Server) ->
     LServer = jid:nameprep(Server),
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     case erlang:function_exported(Mod, remove_expired_messages, 1) of
-       true -> Mod:remove_expired_messages(LServer);
-       false -> erlang:error(not_implemented)
+       true ->
+           Ret = Mod:remove_expired_messages(LServer),
+           ets_cache:clear(?SPOOL_COUNTER_CACHE),
+           Ret;
+       false ->
+           erlang:error(not_implemented)
     end.
 
 remove_old_messages(Days, Server) ->
     LServer = jid:nameprep(Server),
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     case erlang:function_exported(Mod, remove_old_messages, 2) of
-       true -> Mod:remove_old_messages(Days, LServer);
-       false -> erlang:error(not_implemented)
+       true ->
+           Ret = Mod:remove_old_messages(Days, LServer),
+           ets_cache:clear(?SPOOL_COUNTER_CACHE),
+           Ret;
+       false ->
+           erlang:error(not_implemented)
     end.
 
 -spec remove_user(binary(), binary()) -> ok.
@@ -637,7 +681,7 @@ remove_user(User, Server) ->
     LServer = jid:nameprep(Server),
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     Mod:remove_user(LUser, LServer),
-    ok.
+    flush_cache(Mod, LUser, LServer).
 
 %% Helper functions:
 
@@ -944,23 +988,29 @@ user_queue_parse_query(LUser, LServer, Query) ->
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     case lists:keysearch(<<"delete">>, 1, Query) of
        {value, _} ->
-           user_queue_parse_query(LUser, LServer, Query, Mod);
+           case user_queue_parse_query(LUser, LServer, Query, Mod, false) of
+               true ->
+                   flush_cache(Mod, LUser, LServer);
+               false ->
+                   ok
+           end;
        _ ->
            ok
     end.
 
-user_queue_parse_query(LUser, LServer, Query, Mod) ->
+user_queue_parse_query(LUser, LServer, Query, Mod, Acc) ->
     case lists:keytake(<<"selected">>, 1, Query) of
        {value, {_, Seq}, Query2} ->
-           case catch binary_to_integer(Seq) of
-               I when is_integer(I), I>=0 ->
-                   Mod:remove_message(LUser, LServer, I);
-               _ ->
-                   ok
-           end,
-           user_queue_parse_query(LUser, LServer, Query2, Mod);
+           NewAcc = case catch binary_to_integer(Seq) of
+                        I when is_integer(I), I>=0 ->
+                            Mod:remove_message(LUser, LServer, I),
+                            true;
+                        _ ->
+                            Acc
+                    end,
+           user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc);
        false ->
-           ok
+           Acc
     end.
 
 us_to_list({User, Server}) ->
@@ -1007,7 +1057,9 @@ delete_all_msgs(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
     Mod = gen_mod:db_mod(LServer, ?MODULE),
-    Mod:remove_all_messages(LUser, LServer).
+    Ret = Mod:remove_all_messages(LUser, LServer),
+    flush_cache(Mod, LUser, LServer),
+    Ret.
 
 webadmin_user_parse_query(_, <<"removealloffline">>,
                          User, Server, _Query) ->
@@ -1035,13 +1087,39 @@ count_offline_messages(User, Server) ->
            Res = read_db_messages(LUser, LServer),
            count_mam_messages(LUser, LServer, Res);
        _ ->
-           count_messages_in_db(LUser, LServer)
+           Mod = gen_mod:db_mod(LServer, ?MODULE),
+           count_messages_in_db(Mod, LUser, LServer)
     end.
 
--spec count_messages_in_db(binary(), binary()) -> non_neg_integer().
-count_messages_in_db(LUser, LServer) ->
-    Mod = gen_mod:db_mod(LServer, ?MODULE),
-    Mod:count_messages(LUser, LServer).
+-spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
+count_messages_in_db(Mod, LUser, LServer) ->
+    case use_cache(Mod, LServer) of
+       true ->
+           ets_cache:lookup(
+             ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+             fun() ->
+                     Mod:count_messages(LUser, LServer)
+             end);
+       false ->
+           ets_cache:untag(Mod:count_messages(LUser, LServer))
+    end.
+
+-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.
+store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) ->
+    case Mod:store_message(Msg) of
+       ok ->
+           case use_cache(Mod, Server) of
+               true ->
+                   ets_cache:incr(
+                     ?SPOOL_COUNTER_CACHE,
+                     {User, Server}, 1,
+                     cache_nodes(Mod, Server));
+               false ->
+                   ok
+           end;
+       Err ->
+           Err
+    end.
 
 -spec add_delay_info(message(), binary(),
                     undefined | erlang:timestamp()) -> message().
@@ -1107,6 +1185,8 @@ mod_opt_type(store_empty_body) ->
       econf:bool());
 mod_opt_type(db_type) ->
     econf:db_type(?MODULE);
+mod_opt_type(use_cache) ->
+    econf:bool();
 mod_opt_type(cache_size) ->
     econf:pos_int(infinity);
 mod_opt_type(cache_life_time) ->
@@ -1119,5 +1199,6 @@ mod_options(Host) ->
      {use_mam_for_storage, false},
      {bounce_groupchat, false},
      {store_groupchat, false},
+     {use_cache, ejabberd_option:use_cache(Host)},
      {cache_size, ejabberd_option:cache_size(Host)},
      {cache_life_time, ejabberd_option:cache_life_time(Host)}].
index d82735dee923a4e5ed3cc6fb1d15dc3130c24424..7fec22a6cb755a13c5d829d0522ef00f3c904eb3 100644 (file)
@@ -156,10 +156,10 @@ count_messages(LUser, LServer) ->
     F = fun () ->
                count_mnesia_records(US)
        end,
-    case catch mnesia:async_dirty(F) of
-       I when is_integer(I) -> I;
-       _ -> 0
-    end.
+    {cache, case mnesia:async_dirty(F) of
+               I when is_integer(I) -> I;
+               _ -> 0
+           end}.
 
 import(#offline_msg{} = Msg) ->
     mnesia:dirty_write(Msg).
index bb5eac6d9c7dd78b589d20de417199608647a862..e9ab7c71b875b237d9007e78b737d9b2b0d09993 100644 (file)
@@ -10,6 +10,7 @@
 -export([db_type/1]).
 -export([store_empty_body/1]).
 -export([store_groupchat/1]).
+-export([use_cache/1]).
 -export([use_mam_for_storage/1]).
 
 -spec access_max_user_messages(gen_mod:opts() | global | binary()) -> atom() | [ejabberd_shaper:shaper_rule()].
@@ -54,6 +55,12 @@ store_groupchat(Opts) when is_map(Opts) ->
 store_groupchat(Host) ->
     gen_mod:get_module_opt(Host, mod_offline, store_groupchat).
 
+-spec use_cache(gen_mod:opts() | global | binary()) -> boolean().
+use_cache(Opts) when is_map(Opts) ->
+    gen_mod:get_opt(use_cache, Opts);
+use_cache(Host) ->
+    gen_mod:get_module_opt(Host, mod_offline, use_cache).
+
 -spec use_mam_for_storage(gen_mod:opts() | global | binary()) -> boolean().
 use_mam_for_storage(Opts) when is_map(Opts) ->
     gen_mod:get_opt(use_mam_for_storage, Opts);
index db86767ce531cce6f116ec1ca13b23fb62c52ece..2c1ddea7c4309c0b4fc4dad23f5bea2f80dc7a40 100644 (file)
@@ -124,9 +124,9 @@ count_messages(LUser, LServer) ->
     case ejabberd_riak:count_by_index(
            offline_msg, <<"us">>, {LUser, LServer}) of
         {ok, Res} ->
-            Res;
+            {cache, Res};
         _ ->
-            0
+            {nocache, 0}
     end.
 
 import(#offline_msg{us = US, timestamp = TS} = M) ->
index 822ca7f434011255f560dc03b411ea2cf8c88c17..640cc071ede98bbbdcf3e09883e9240cb5084512 100644 (file)
@@ -185,8 +185,11 @@ count_messages(LUser, LServer) ->
                  ?SQL("select @(count(*))d from spool "
                       "where username=%(LUser)s and %(LServer)H")) of
         {selected, [{Res}]} ->
-            Res;
-        _ -> 0
+            {cache, Res};
+       {selected, []} ->
+           {cache, 0};
+        _ ->
+           {nocache, 0}
     end.
 
 export(_Server) ->