]> granicus.if.org Git - ejabberd/commitdiff
Don't issue count/message fetch queries for offline from mam when not needed
authorPaweł Chmielowski <pchmielowski@process-one.net>
Thu, 2 May 2019 09:12:22 +0000 (11:12 +0200)
committerPaweł Chmielowski <pchmielowski@process-one.net>
Thu, 2 May 2019 09:12:22 +0000 (11:12 +0200)
src/mod_mam.erl
src/mod_mam_sql.erl
src/mod_offline.erl

index 73a00180e2027359e6a0ada031bdf709d0adf0c9..ba00d74e52b61a738fd0f6b4816273f4bfddbd8c 100644 (file)
@@ -42,7 +42,7 @@
         get_room_config/4, set_room_option/3, offline_message/1, export/1,
         mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
         is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
-        process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]).
+        process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]).
 
 -include("xmpp.hrl").
 -include("logger.hrl").
                 #rsm_set{} | undefined, chat | groupchat) ->
     {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
     {error, db_failure}.
+-callback select(binary(), jid(), jid(), mam_query:result(),
+                #rsm_set{} | undefined, chat | groupchat,
+                all | only_count | only_messages) ->
+                   {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
+                   {error, db_failure}.
 -callback use_cache(binary()) -> boolean().
 -callback cache_nodes(binary()) -> [node()].
 -callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}.
 -callback is_empty_for_user(binary(), binary()) -> boolean().
 -callback is_empty_for_room(binary(), binary(), binary()) -> boolean().
 -callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
-                            #rsm_set{} | undefined) ->
+                            #rsm_set{} | undefined, all | only_count | only_messages) ->
     {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
     {error, db_failure}.
 
--optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/5]).
+-optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/6, select/6, select/7]).
 
 %%%===================================================================
 %%% API
@@ -1038,9 +1043,12 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) ->
            xmpp:make_error(IQ, Err)
     end.
 
+select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
+    select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, all).
+
 select(_LServer, JidRequestor, JidArchive, Query, RSM,
        {groupchat, _Role, #state{config = #config{mam = false},
-                                history = History}} = MsgType) ->
+                                history = History}} = MsgType, _Flags) ->
     Start = proplists:get_value(start, Query),
     End = proplists:get_value('end', Query),
     #lqueue{queue = Q} = History,
@@ -1079,21 +1087,20 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM,
        _ ->
            {Msgs, true, L}
     end;
-select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
+select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
     case might_expose_jid(Query, MsgType) of
        true ->
            {[], true, 0};
        false ->
            case {MsgType, gen_mod:get_module_opt(LServer, ?MODULE, user_mucsub_from_muc_archive)} of
                {chat, true} ->
-                   select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM);
+                   select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
                _ ->
-                   Mod = gen_mod:db_mod(LServer, ?MODULE),
-                   Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
+                   db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags)
            end
     end.
 
-select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) ->
+select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
     MucHosts = mod_muc_admin:find_hosts(LServer),
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     case proplists:get_value(with, Query) of
@@ -1103,20 +1110,19 @@ select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) ->
                    select(LServer, JidRequestor, MucJid, Query, RSM,
                           {groupchat, member, #state{config = #config{mam = true}}});
                _ ->
-                   Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat)
+                   db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags)
            end;
        _ ->
-           case erlang:function_exported(Mod, select_with_mucsub, 5) of
+           case erlang:function_exported(Mod, select_with_mucsub, 6) of
                true ->
-                   Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM);
+                   Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
                false ->
-                   select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM)
+                   select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags)
            end
     end.
 
-select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) ->
-    Mod = gen_mod:db_mod(LServer, ?MODULE),
-    case Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) of
+select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
+    case db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) of
        {error, _} = Err ->
            Err;
        {Entries, All, Count} ->
@@ -1166,6 +1172,15 @@ select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) ->
            end
     end.
 
+db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    case erlang:function_exported(Mod, select, 7) of
+       true ->
+           Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags);
+       _ ->
+       Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
+    end.
+
 wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) ->
     ReqBare = jid:remove_resource(Requester),
     ReqServer = jid:make(<<>>, LServer, <<>>),
index 5379a055ad946c0dbc0cbfb440f757029fcb59de..be87e64da1162aed57d99e060982903be287a7cf 100644 (file)
@@ -30,8 +30,8 @@
 
 %% API
 -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
-        extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1, remove_from_archive/3,
-        is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/5]).
+        extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3,
+        is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]).
 
 -include_lib("stdlib/include/ms_transform.hrl").
 -include("xmpp.hrl").
@@ -174,20 +174,20 @@ get_prefs(LUser, LServer) ->
     end.
 
 select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
-       MAMQuery, RSM, MsgType) ->
+       MAMQuery, RSM, MsgType, Flags) ->
     User = case MsgType of
               chat -> LUser;
               _ -> jid:encode(JidArchive)
           end,
     {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none),
-    do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery).
+    do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery, Flags).
 
 -spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
-                            #rsm_set{} | undefined) ->
+                            #rsm_set{} | undefined, all | only_count | only_messages) ->
                                {[{binary(), non_neg_integer(), xmlel()}], boolean(), integer()} |
                                {error, db_failure}.
 select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
-                  MAMQuery, RSM) ->
+                  MAMQuery, RSM, Flags) ->
     Extra = case gen_mod:db_mod(LServer, mod_muc) of
                mod_muc_sql ->
                    subscribers_table;
@@ -204,17 +204,25 @@ select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
                    [jid:encode(Jid) || {Jid, _} <- SubRooms]
            end,
     {Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra),
-    do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery).
+    do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery, Flags).
 
-do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, MsgType, Query, CountQuery) ->
+do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM,
+               MsgType, Query, CountQuery, Flags) ->
     % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
     % reasonable limit on how many stanzas may be pushed to a client in one
     % request. If a query returns a number of stanzas greater than this limit
     % and the client did not specify a limit using RSM then the server should
     % return a policy-violation error to the client." We currently don't do this
     % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
-    case {ejabberd_sql:sql_query(LServer, Query),
-         ejabberd_sql:sql_query(LServer, CountQuery)} of
+    QRes = case Flags of
+                  all ->
+                      {ejabberd_sql:sql_query(LServer, Query), ejabberd_sql:sql_query(LServer, CountQuery)};
+                  only_messages ->
+                      {ejabberd_sql:sql_query(LServer, Query), {selected, ok, [[<<"0">>]]}};
+                  only_count ->
+                      {{selected, ok, []}, ejabberd_sql:sql_query(LServer, CountQuery)}
+              end,
+    case QRes of
        {{selected, _, Res}, {selected, _, [[Count]]}} ->
            {Max, Direction, _} = get_max_direction_id(RSM),
            {Res1, IsComplete} =
index c4ac0eb654f2eb7181b6c75544cc98d519d8a472..ff3d06aa2ce95492420a9db3b777bf9ede84b307 100644 (file)
@@ -712,6 +712,7 @@ read_messages(LUser, LServer) ->
            Res
     end.
 
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
 read_db_messages(LUser, LServer) ->
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     CodecOpts = ejabberd_config:codec_options(LServer),
@@ -733,9 +734,9 @@ read_db_messages(LUser, LServer) ->
            end
        end, Mod:read_message_headers(LUser, LServer)).
 
--spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
-    [{integer(), message()}].
-read_mam_messages(LUser, LServer, ReadMsgs) ->
+-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
+    {integer() | none, [message()]}.
+parse_marker_messages(LServer, ReadMsgs) ->
     {Timestamp, ExtraMsgs} = lists:foldl(
        fun({_Node, #message{id = <<"ActivityMarker">>,
                             body = [], type = error} = Msg}, {T, E}) ->
@@ -771,8 +772,8 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
                   Decoded ->
                       Pkt1 = add_delay_info(Decoded, LServer, TS),
                       {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
-                  catch _:{xmpp_codec, _Why} ->
-                      {T, E}
+              catch _:{xmpp_codec, _Why} ->
+                  {T, E}
               end;
           ({_Node, Msg}, {T, E}) ->
               {T, [Msg | E]}
@@ -790,6 +791,12 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
                _ ->
                    Timestamp
            end,
+    {Start, ExtraMsgs}.
+
+-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
+    [{integer(), message()}].
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+    {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
     AllMsgs = case Start of
                  none ->
                      ExtraMsgs;
@@ -804,7 +811,7 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
                                                       [{start, Start}],
                                                       #rsm_set{max = MaxOfflineMsgs,
                                                                before = <<"9999999999999999">>},
-                                                      chat),
+                                                      chat, only_messages),
                      MamMsgs2 = lists:map(
                          fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
                              add_delay_info(MM, LServer, MMT)
@@ -842,6 +849,28 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
        end, 1, AllMsgs2),
     AllMsgs3.
 
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
+    integer().
+count_mam_messages(LUser, LServer, ReadMsgs) ->
+    {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
+    case Start of
+       none ->
+           length(ExtraMsgs);
+       _ ->
+           MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+                                Number when is_integer(Number) -> Number - length(ExtraMsgs);
+                                infinity -> undefined;
+                                _ -> 100 - length(ExtraMsgs)
+                            end,
+           JID = jid:make(LUser, LServer, <<>>),
+           {_, _, Count} = mod_mam:select(LServer, JID, JID,
+                                          [{start, Start}],
+                                          #rsm_set{max = MaxOfflineMsgs,
+                                                   before = <<"9999999999999999">>},
+                                          chat, only_count),
+           Count + length(ExtraMsgs)
+    end.
+
 format_user_queue(Hdrs) ->
     lists:map(
       fun({Seq, From, To, TS, El}) ->
@@ -1007,7 +1036,7 @@ count_offline_messages(User, Server) ->
     case use_mam_for_user(User, Server) of
        true ->
            Res = read_db_messages(LUser, LServer),
-           length(read_mam_messages(LUser, LServer, Res));
+           count_mam_messages(LUser, LServer, Res);
        _ ->
            Mod = gen_mod:db_mod(LServer, ?MODULE),
            Mod:count_messages(LUser, LServer)