-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
-callback remove_user(binary(), binary()) -> any().
-callback read_message_headers(binary(), binary()) ->
- [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}].
+ [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
-callback read_message(binary(), binary(), non_neg_integer()) ->
{ok, #offline_msg{}} | error.
-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
Mod = gen_mod:db_mod(Server, ?MODULE),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
- ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
- fun() ->
- case count_messages_in_db(Mod, User, Server) of
- 0 ->
- case Mod:store_message(Msg) of
- ok ->
- {cache, ok};
- Err ->
- {nocache, Err}
- end;
- _ ->
- {cache, ok}
+ case count_offline_messages(User, Server) of
+ 0 ->
+ store_message_in_db(Mod, Msg);
+ _ ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
end
- end);
+ end;
false ->
case get_max_user_messages(User, Server) of
infinity ->
store_message_in_db(Mod, Msg);
Limit ->
- Num = count_messages_in_db(Mod, User, Server),
+ Num = count_offline_messages(User, Server),
if Num < Limit ->
store_message_in_db(Mod, Msg);
true ->
?NS_FLEX_OFFLINE, _Lang) ->
ejabberd_sm:route(JID, {resend_offline, false}),
Mod = gen_mod:db_mod(S, ?MODULE),
- Hdrs = Mod:read_message_headers(U, S),
+ Hdrs = case Mod:read_message_headers(U, S) of
+ L when is_list(L) ->
+ L;
+ _ ->
+ []
+ end,
BareJID = jid:remove_resource(JID),
{result, lists:map(
fun({Seq, From, _To, _TS, _El}) ->
stop
end
end;
- _ -> Acc
+ _ ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end;
+ false ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end.
+
+-spec maybe_update_cache(jid(), message()) -> ok.
+maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
+ case xmpp:get_meta(Packet, mam_archived, false) of
+ true ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ _ ->
+ ok
end;
- false -> Acc
+ _ ->
+ ok
end.
-spec check_store_hint(message()) -> store | no_store | none.
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
- Res = read_db_messages(LUser, LServer),
+ Res = case read_db_messages(LUser, LServer) of
+ error ->
+ [];
+ L when is_list(L) ->
+ L
+ end,
case use_mam_for_user(LUser, LServer) of
true ->
read_mam_messages(LUser, LServer, Res);
Res
end.
--spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(),
- lists:flatmap(
- fun({Seq, From, To, TS, El}) ->
- Node = integer_to_binary(Seq),
- try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
- Pkt ->
+ case Mod:read_message_headers(LUser, LServer) of
+ error ->
+ error;
+ L ->
+ lists:flatmap(
+ fun({Seq, From, To, TS, El}) ->
Node = integer_to_binary(Seq),
- Pkt1 = add_delay_info(Pkt, LServer, TS),
- Pkt2 = xmpp:set_from_to(Pkt1, From, To),
- [{Node, Pkt2}]
- catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("Failed to decode packet ~p "
- "of user ~s: ~s",
- [El, jid:encode(To),
- xmpp:format_error(Why)]),
- []
- end
- end, Mod:read_message_headers(LUser, LServer)).
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode packet ~p "
+ "of user ~s: ~s",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, L)
+ end.
-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
{integer() | none, [message()]}.
end, 1, AllMsgs2),
AllMsgs3.
--spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
- integer().
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
+ {cache, integer()} | {nocache, integer()}.
+count_mam_messages(_LUser, _LServer, error) ->
+ {nocache, 0};
count_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
case Start of
none ->
- length(ExtraMsgs);
+ {cache, length(ExtraMsgs)};
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
Number when is_integer(Number) -> Number - length(ExtraMsgs);
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
chat, only_count),
- Count + length(ExtraMsgs)
+ {cache, Count + length(ExtraMsgs)}
end.
format_user_queue(Hdrs) ->
US = {LUser, LServer},
Mod = gen_mod:db_mod(LServer, ?MODULE),
user_queue_parse_query(LUser, LServer, Query),
- HdrsAll = Mod:read_message_headers(LUser, LServer),
+ HdrsAll = case Mod:read_message_headers(LUser, LServer) of
+ error -> [];
+ L -> L
+ end,
Hdrs = get_messages_subset(User, Server, HdrsAll),
FMsgs = format_user_queue(Hdrs),
[?XC(<<"h1">>,
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_mam_for_user(User, Server) of
true ->
- Res = read_db_messages(LUser, LServer),
- count_mam_messages(LUser, LServer, Res);
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Res = read_db_messages(LUser, LServer),
+ count_mam_messages(LUser, LServer, Res)
+ end);
+ false ->
+ Res = read_db_messages(LUser, LServer),
+ ets_cache:untag(count_mam_messages(LUser, LServer, Res))
+ end;
_ ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- count_messages_in_db(Mod, LUser, LServer)
- end.
-
--spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
-count_messages_in_db(Mod, LUser, LServer) ->
- case use_cache(Mod, LServer) of
- true ->
- ets_cache:lookup(
- ?SPOOL_COUNTER_CACHE, {LUser, LServer},
- fun() ->
- Mod:count_messages(LUser, LServer)
- end);
- false ->
- ets_cache:untag(Mod:count_messages(LUser, LServer))
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Mod:count_messages(LUser, LServer)
+ end);
+ false ->
+ ets_cache:untag(Mod:count_messages(LUser, LServer))
+ end
end.
-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.