get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
- process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]).
+ process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]).
-include("xmpp.hrl").
-include("logger.hrl").
#rsm_set{} | undefined, chat | groupchat) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
{error, db_failure}.
+-callback select(binary(), jid(), jid(), mam_query:result(),
+ #rsm_set{} | undefined, chat | groupchat,
+ all | only_count | only_messages) ->
+ {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
+ {error, db_failure}.
-callback use_cache(binary()) -> boolean().
-callback cache_nodes(binary()) -> [node()].
-callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}.
-callback is_empty_for_user(binary(), binary()) -> boolean().
-callback is_empty_for_room(binary(), binary(), binary()) -> boolean().
-callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
- #rsm_set{} | undefined) ->
+ #rsm_set{} | undefined, all | only_count | only_messages) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
{error, db_failure}.
--optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/5]).
+-optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/6, select/6, select/7]).
%%%===================================================================
%%% API
xmpp:make_error(IQ, Err)
end.
+select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
+ select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, all).
+
select(_LServer, JidRequestor, JidArchive, Query, RSM,
{groupchat, _Role, #state{config = #config{mam = false},
- history = History}} = MsgType) ->
+ history = History}} = MsgType, _Flags) ->
Start = proplists:get_value(start, Query),
End = proplists:get_value('end', Query),
#lqueue{queue = Q} = History,
_ ->
{Msgs, true, L}
end;
-select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
+select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
case might_expose_jid(Query, MsgType) of
true ->
{[], true, 0};
false ->
case {MsgType, gen_mod:get_module_opt(LServer, ?MODULE, user_mucsub_from_muc_archive)} of
{chat, true} ->
- select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM);
+ select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
_ ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
+ db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags)
end
end.
-select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) ->
+select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
MucHosts = mod_muc_admin:find_hosts(LServer),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case proplists:get_value(with, Query) of
select(LServer, JidRequestor, MucJid, Query, RSM,
{groupchat, member, #state{config = #config{mam = true}}});
_ ->
- Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat)
+ db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags)
end;
_ ->
- case erlang:function_exported(Mod, select_with_mucsub, 5) of
+ case erlang:function_exported(Mod, select_with_mucsub, 6) of
true ->
- Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM);
+ Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
false ->
- select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM)
+ select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags)
end
end.
-select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- case Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) of
+select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
+ case db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) of
{error, _} = Err ->
Err;
{Entries, All, Count} ->
end
end.
+db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ case erlang:function_exported(Mod, select, 7) of
+ true ->
+ Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags);
+ _ ->
+ Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
+ end.
+
wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) ->
ReqBare = jid:remove_resource(Requester),
ReqServer = jid:make(<<>>, LServer, <<>>),
%% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
- extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1, remove_from_archive/3,
- is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/5]).
+ extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3,
+ is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]).
-include_lib("stdlib/include/ms_transform.hrl").
-include("xmpp.hrl").
end.
select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
- MAMQuery, RSM, MsgType) ->
+ MAMQuery, RSM, MsgType, Flags) ->
User = case MsgType of
chat -> LUser;
_ -> jid:encode(JidArchive)
end,
{Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none),
- do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery).
+ do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery, Flags).
-spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
- #rsm_set{} | undefined) ->
+ #rsm_set{} | undefined, all | only_count | only_messages) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), integer()} |
{error, db_failure}.
select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
- MAMQuery, RSM) ->
+ MAMQuery, RSM, Flags) ->
Extra = case gen_mod:db_mod(LServer, mod_muc) of
mod_muc_sql ->
subscribers_table;
[jid:encode(Jid) || {Jid, _} <- SubRooms]
end,
{Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra),
- do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery).
+ do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery, Flags).
-do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, MsgType, Query, CountQuery) ->
+do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM,
+ MsgType, Query, CountQuery, Flags) ->
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
% reasonable limit on how many stanzas may be pushed to a client in one
% request. If a query returns a number of stanzas greater than this limit
% and the client did not specify a limit using RSM then the server should
% return a policy-violation error to the client." We currently don't do this
% for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
- case {ejabberd_sql:sql_query(LServer, Query),
- ejabberd_sql:sql_query(LServer, CountQuery)} of
+ QRes = case Flags of
+ all ->
+ {ejabberd_sql:sql_query(LServer, Query), ejabberd_sql:sql_query(LServer, CountQuery)};
+ only_messages ->
+ {ejabberd_sql:sql_query(LServer, Query), {selected, ok, [[<<"0">>]]}};
+ only_count ->
+ {{selected, ok, []}, ejabberd_sql:sql_query(LServer, CountQuery)}
+ end,
+ case QRes of
{{selected, _, Res}, {selected, _, [[Count]]}} ->
{Max, Direction, _} = get_max_direction_id(RSM),
{Res1, IsComplete} =
Res
end.
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(LServer),
end
end, Mod:read_message_headers(LUser, LServer)).
--spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
- [{integer(), message()}].
-read_mam_messages(LUser, LServer, ReadMsgs) ->
+-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
+ {integer() | none, [message()]}.
+parse_marker_messages(LServer, ReadMsgs) ->
{Timestamp, ExtraMsgs} = lists:foldl(
fun({_Node, #message{id = <<"ActivityMarker">>,
body = [], type = error} = Msg}, {T, E}) ->
Decoded ->
Pkt1 = add_delay_info(Decoded, LServer, TS),
{T, [xmpp:set_from_to(Pkt1, From, To) | E]}
- catch _:{xmpp_codec, _Why} ->
- {T, E}
+ catch _:{xmpp_codec, _Why} ->
+ {T, E}
end;
({_Node, Msg}, {T, E}) ->
{T, [Msg | E]}
_ ->
Timestamp
end,
+ {Start, ExtraMsgs}.
+
+-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
+ [{integer(), message()}].
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
AllMsgs = case Start of
none ->
ExtraMsgs;
[{start, Start}],
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
- chat),
+ chat, only_messages),
MamMsgs2 = lists:map(
fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
add_delay_info(MM, LServer, MMT)
end, 1, AllMsgs2),
AllMsgs3.
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
+ integer().
+count_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
+ case Start of
+ none ->
+ length(ExtraMsgs);
+ _ ->
+ MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+ Number when is_integer(Number) -> Number - length(ExtraMsgs);
+ infinity -> undefined;
+ _ -> 100 - length(ExtraMsgs)
+ end,
+ JID = jid:make(LUser, LServer, <<>>),
+ {_, _, Count} = mod_mam:select(LServer, JID, JID,
+ [{start, Start}],
+ #rsm_set{max = MaxOfflineMsgs,
+ before = <<"9999999999999999">>},
+ chat, only_count),
+ Count + length(ExtraMsgs)
+ end.
+
format_user_queue(Hdrs) ->
lists:map(
fun({Seq, From, To, TS, El}) ->
case use_mam_for_user(User, Server) of
true ->
Res = read_db_messages(LUser, LServer),
- length(read_mam_messages(LUser, LServer, Res));
+ count_mam_messages(LUser, LServer, Res);
_ ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:count_messages(LUser, LServer)