-define(MAX_USER_MESSAGES, infinity).
-define(EMPTY_SPOOL_CACHE, offline_empty_cache).
+-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache).
-type c2s_state() :: ejabberd_c2s:state().
-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
-callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
--callback count_messages(binary(), binary()) -> non_neg_integer().
+-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
--optional_callbacks([remove_expired_messages/1, remove_old_messages/2]).
+-optional_callbacks([remove_expired_messages/1, remove_old_messages/2,
+ use_cache/1, cache_nodes/1]).
depends(_Host, _Opts) ->
[].
start(Host, Opts) ->
Mod = gen_mod:db_mod(Opts, ?MODULE),
Mod:init(Host, Opts),
- init_cache(Opts),
+ init_cache(Mod, Host, Opts),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
reload(Host, NewOpts, OldOpts) ->
NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
- init_cache(NewOpts),
+ init_cache(NewMod, Host, NewOpts),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end.
-init_cache(Opts) ->
+init_cache(Mod, Host, Opts) ->
+ CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)},
+ {life_time, mod_offline_opt:cache_life_time(Opts)},
+ {cache_missed, false}],
case mod_offline_opt:use_mam_for_storage(Opts) of
true ->
- MaxSize = mod_offline_opt:cache_size(Opts),
- LifeTime = mod_offline_opt:cache_life_time(Opts),
- COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}],
- ets_cache:new(?EMPTY_SPOOL_CACHE, COpts);
+ ets_cache:new(?EMPTY_SPOOL_CACHE, CacheOpts);
false ->
ets_cache:delete(?EMPTY_SPOOL_CACHE)
+ end,
+ case use_cache(Mod, Host) of
+ true ->
+ ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts);
+ false ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE)
+ end.
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(Host);
+ false -> mod_offline_opt:use_cache(Host)
+ end.
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(Host);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
+-spec flush_cache(module(), binary(), binary()) -> ok.
+flush_cache(Mod, User, Server) ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE,
+ {User, Server},
+ cache_nodes(Mod, Server));
+ false ->
+ ok
end.
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
UseMam = use_mam_for_user(User, Server),
+ Mod = gen_mod:db_mod(Server, ?MODULE),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
fun() ->
- case count_messages_in_db(User, Server) of
+ case count_messages_in_db(Mod, User, Server) of
0 ->
case Mod:store_message(Msg) of
ok ->
end
end);
false ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
case get_max_user_messages(User, Server) of
infinity ->
- Mod:store_message(Msg);
+ store_message_in_db(Mod, Msg);
Limit ->
- Num = count_messages_in_db(User, Server),
+ Num = count_messages_in_db(Mod, User, Server),
if Num < Limit ->
- Mod:store_message(Msg);
- true ->
+ store_message_in_db(Mod, Msg);
+ true ->
{error, full}
end
end
LServer = To#jid.lserver,
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:remove_message(LUser, LServer, I),
+ flush_cache(Mod, LUser, LServer),
true;
_ ->
false
packet = Msg}
end, read_mam_messages(LUser, LServer, OffMsgs));
_ ->
+ flush_cache(Mod, LUser, LServer),
OffMsgs
end;
_ ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case erlang:function_exported(Mod, remove_expired_messages, 1) of
- true -> Mod:remove_expired_messages(LServer);
- false -> erlang:error(not_implemented)
+ true ->
+ Ret = Mod:remove_expired_messages(LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
end.
remove_old_messages(Days, Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case erlang:function_exported(Mod, remove_old_messages, 2) of
- true -> Mod:remove_old_messages(Days, LServer);
- false -> erlang:error(not_implemented)
+ true ->
+ Ret = Mod:remove_old_messages(Days, LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
end.
-spec remove_user(binary(), binary()) -> ok.
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:remove_user(LUser, LServer),
- ok.
+ flush_cache(Mod, LUser, LServer).
%% Helper functions:
Mod = gen_mod:db_mod(LServer, ?MODULE),
case lists:keysearch(<<"delete">>, 1, Query) of
{value, _} ->
- user_queue_parse_query(LUser, LServer, Query, Mod);
+ case user_queue_parse_query(LUser, LServer, Query, Mod, false) of
+ true ->
+ flush_cache(Mod, LUser, LServer);
+ false ->
+ ok
+ end;
_ ->
ok
end.
-user_queue_parse_query(LUser, LServer, Query, Mod) ->
+user_queue_parse_query(LUser, LServer, Query, Mod, Acc) ->
case lists:keytake(<<"selected">>, 1, Query) of
{value, {_, Seq}, Query2} ->
- case catch binary_to_integer(Seq) of
- I when is_integer(I), I>=0 ->
- Mod:remove_message(LUser, LServer, I);
- _ ->
- ok
- end,
- user_queue_parse_query(LUser, LServer, Query2, Mod);
+ NewAcc = case catch binary_to_integer(Seq) of
+ I when is_integer(I), I>=0 ->
+ Mod:remove_message(LUser, LServer, I),
+ true;
+ _ ->
+ Acc
+ end,
+ user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc);
false ->
- ok
+ Acc
end.
us_to_list({User, Server}) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_all_messages(LUser, LServer).
+ Ret = Mod:remove_all_messages(LUser, LServer),
+ flush_cache(Mod, LUser, LServer),
+ Ret.
webadmin_user_parse_query(_, <<"removealloffline">>,
User, Server, _Query) ->
Res = read_db_messages(LUser, LServer),
count_mam_messages(LUser, LServer, Res);
_ ->
- count_messages_in_db(LUser, LServer)
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ count_messages_in_db(Mod, LUser, LServer)
end.
--spec count_messages_in_db(binary(), binary()) -> non_neg_integer().
-count_messages_in_db(LUser, LServer) ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:count_messages(LUser, LServer).
+-spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
+count_messages_in_db(Mod, LUser, LServer) ->
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Mod:count_messages(LUser, LServer)
+ end);
+ false ->
+ ets_cache:untag(Mod:count_messages(LUser, LServer))
+ end.
+
+-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.
+store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) ->
+ case Mod:store_message(Msg) of
+ ok ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
+ end;
+ Err ->
+ Err
+ end.
-spec add_delay_info(message(), binary(),
undefined | erlang:timestamp()) -> message().
econf:bool());
mod_opt_type(db_type) ->
econf:db_type(?MODULE);
+mod_opt_type(use_cache) ->
+ econf:bool();
mod_opt_type(cache_size) ->
econf:pos_int(infinity);
mod_opt_type(cache_life_time) ->
{use_mam_for_storage, false},
{bounce_groupchat, false},
{store_groupchat, false},
+ {use_cache, ejabberd_option:use_cache(Host)},
{cache_size, ejabberd_option:cache_size(Host)},
{cache_life_time, ejabberd_option:cache_life_time(Host)}].