]> granicus.if.org Git - ejabberd/commitdiff
Add mod_offline option for fetching data from mam instead of from spool table
authorPaweł Chmielowski <pchmielowski@process-one.net>
Fri, 26 Apr 2019 17:59:06 +0000 (19:59 +0200)
committerPaweł Chmielowski <pchmielowski@process-one.net>
Fri, 26 Apr 2019 17:59:06 +0000 (19:59 +0200)
This commit introduces `use_mam_for_storage` option that take boolean
argument. Enabling it will make mod_offline not use spool table for storing
offline message, but instead will use mam archive to retrieve messages
stored when offline.

Enabling this option have couple drawback currently, only messages that
were stored in mam will be available, most of flexible message retrieval
queries don't work (those that allow retrieval/deletion of messages by id).

src/mod_mam.erl
src/mod_offline.erl
src/mod_stream_mgmt.erl

index 5e20184faa9acd05eedabe3fb84ef18b948e5609..73a00180e2027359e6a0ada031bdf709d0adf0c9 100644 (file)
@@ -42,7 +42,7 @@
         get_room_config/4, set_room_option/3, offline_message/1, export/1,
         mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
         is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
-        process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2]).
+        process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]).
 
 -include("xmpp.hrl").
 -include("logger.hrl").
@@ -112,7 +112,7 @@ start(Host, Opts) ->
            ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
                               user_send_packet_strip_tag, 500),
            ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
-                              offline_message, 50),
+                              offline_message, 49),
            ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
                               muc_filter_message, 50),
            ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
@@ -188,7 +188,7 @@ stop(Host) ->
     ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
                          user_send_packet_strip_tag, 500),
     ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
-                         offline_message, 50),
+                         offline_message, 49),
     ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
                          muc_filter_message, 50),
     ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
index 4a1a4cca274095742e2e0767e4e6bf0236ce74aa..6a9114a922693bb9f702f58772793590106f7598 100644 (file)
@@ -61,7 +61,8 @@
         c2s_copy_session/2,
         webadmin_page/3,
         webadmin_user/4,
-        webadmin_user_parse_query/5]).
+        webadmin_user_parse_query/5,
+        user_unset_presence/4]).
 
 -export([mod_opt_type/1, mod_options/1, depends/2]).
 
@@ -131,6 +132,8 @@ start(Host, Opts) ->
                       ?MODULE, webadmin_user, 50),
     ejabberd_hooks:add(webadmin_user_parse_query, Host,
                       ?MODULE, webadmin_user_parse_query, 50),
+    ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
+                      user_unset_presence, 50),
     gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
                                  ?MODULE, handle_offline_query).
 
@@ -153,6 +156,8 @@ stop(Host) ->
                          ?MODULE, webadmin_user, 50),
     ejabberd_hooks:delete(webadmin_user_parse_query, Host,
                          ?MODULE, webadmin_user_parse_query, 50),
+    ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
+                         user_unset_presence, 50),
     gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
 
 reload(Host, NewOpts, OldOpts) ->
@@ -165,17 +170,28 @@ reload(Host, NewOpts, OldOpts) ->
     end.
 
 -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
-store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
-    Mod = gen_mod:db_mod(Server, ?MODULE),
-    case get_max_user_messages(User, Server) of
-       infinity ->
-           Mod:store_message(Msg);
-       Limit ->
-           Num = count_offline_messages(User, Server),
-           if Num < Limit ->
+store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
+    case (not xmpp:get_meta(Pkt, activity_marker, false)) andalso
+        use_mam_for_user(User, Server) of
+       true ->
+           case xmpp:get_meta(Pkt, first_from_queue, false) of
+               true ->
+                   store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
+               _ ->
+                   ok
+           end;
+       _ ->
+           Mod = gen_mod:db_mod(Server, ?MODULE),
+           case get_max_user_messages(User, Server) of
+               infinity ->
                    Mod:store_message(Msg);
-              true ->
-                   {error, full}
+               Limit ->
+                   Num = count_offline_messages(User, Server),
+                   if Num < Limit ->
+                       Mod:store_message(Msg);
+                       true ->
+                           {error, full}
+                   end
            end
     end.
 
@@ -298,34 +314,44 @@ handle_offline_query(#iq{lang = Lang} = IQ) ->
 -spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
 handle_offline_items_view(JID, Items) ->
     {U, S, R} = jid:tolower(JID),
-    lists:foldl(
-      fun(#offline_item{node = Node, action = view}, Acc) ->
-             case fetch_msg_by_node(JID, Node) of
-                 {ok, OfflineMsg} ->
-                     case offline_msg_to_route(S, OfflineMsg) of
-                         {route, El} ->
-                             NewEl = set_offline_tag(El, Node),
-                             case ejabberd_sm:get_session_pid(U, S, R) of
-                                 Pid when is_pid(Pid) ->
-                                     Pid ! {route, NewEl};
-                                 none ->
-                                     ok
-                             end,
-                             Acc or true;
-                         error ->
-                             Acc or false
-                     end;
-                 error ->
-                     Acc or false
-             end
-      end, false, Items).
+    case use_mam_for_user(U, S) of
+       true ->
+           false;
+       _ ->
+           lists:foldl(
+               fun(#offline_item{node = Node, action = view}, Acc) ->
+                   case fetch_msg_by_node(JID, Node) of
+                       {ok, OfflineMsg} ->
+                           case offline_msg_to_route(S, OfflineMsg) of
+                               {route, El} ->
+                                   NewEl = set_offline_tag(El, Node),
+                                   case ejabberd_sm:get_session_pid(U, S, R) of
+                                       Pid when is_pid(Pid) ->
+                                           Pid ! {route, NewEl};
+                                       none ->
+                                           ok
+                                   end,
+                                   Acc or true;
+                               error ->
+                                   Acc or false
+                           end;
+                       error ->
+                           Acc or false
+                   end
+               end, false, Items)    end.
 
 -spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
 handle_offline_items_remove(JID, Items) ->
-    lists:foldl(
-      fun(#offline_item{node = Node, action = remove}, Acc) ->
-             Acc or remove_msg_by_node(JID, Node)
-      end, false, Items).
+    {U, S, _R} = jid:tolower(JID),
+    case use_mam_for_user(U, S) of
+       true ->
+           false;
+       _ ->
+           lists:foldl(
+               fun(#offline_item{node = Node, action = remove}, Acc) ->
+                   Acc or remove_msg_by_node(JID, Node)
+               end, false, Items)
+    end.
 
 -spec set_offline_tag(message(), binary()) -> message().
 set_offline_tag(Msg, Node) ->
@@ -334,11 +360,11 @@ set_offline_tag(Msg, Node) ->
 -spec handle_offline_fetch(jid()) -> ok.
 handle_offline_fetch(#jid{luser = U, lserver = S} = JID) ->
     ejabberd_sm:route(JID, {resend_offline, false}),
-           lists:foreach(
-             fun({Node, El}) ->
-             El1 = set_offline_tag(El, Node),
-             ejabberd_router:route(El1)
-      end, read_messages(U, S)).
+    lists:foreach(
+       fun({Node, El}) ->
+           El1 = set_offline_tag(El, Node),
+           ejabberd_router:route(El1)
+       end, read_messages(U, S)).
 
 -spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}.
 fetch_msg_by_node(To, Seq) ->
@@ -508,15 +534,26 @@ c2s_self_presence(Acc) ->
 -spec route_offline_messages(c2s_state()) -> ok.
 route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
     Mod = gen_mod:db_mod(LServer, ?MODULE),
-    case Mod:pop_messages(LUser, LServer) of
-       {ok, OffMsgs} ->
-           lists:foreach(
-             fun(OffMsg) ->
-                     route_offline_message(State, OffMsg)
-             end, OffMsgs);
-       _ ->
-           ok
-    end.
+    Msgs = case Mod:pop_messages(LUser, LServer) of
+              {ok, OffMsgs} ->
+                  case use_mam_for_user(LUser, LServer) of
+                      true ->
+                          lists:map(
+                              fun({_, #message{from = From, to = To} = Msg}) ->
+                                  #offline_msg{from = From, to = To,
+                                               us = {LUser, LServer},
+                                               packet = Msg}
+                              end, read_mam_messages(LUser, LServer, OffMsgs));
+                      _ ->
+                          OffMsgs
+                  end;
+              _ ->
+                  []
+          end,
+    lists:foreach(
+       fun(OffMsg) ->
+           route_offline_message(State, OffMsg)
+       end, Msgs).
 
 -spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
 route_offline_message(#{lserver := LServer} = State,
@@ -574,6 +611,31 @@ remove_user(User, Server) ->
     Mod:remove_user(LUser, LServer),
     ok.
 
+-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
+user_unset_presence(User, Server, _Resource, _Status) ->
+    case use_mam_for_user(User, Server) of
+       true ->
+           case ejabberd_sm:get_user_present_resources(User, Server) of
+               [] ->
+                   TimeStamp = erlang:system_time(microsecond),
+                   store_last_activity_marker(User, Server, TimeStamp);
+               _ ->
+                   ok
+           end;
+       _ ->
+           ok
+    end.
+
+store_last_activity_marker(User, Server, Timestamp) ->
+    Jid = jid:make(User, Server, <<>>),
+    Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error},
+                       activity_marker, true),
+
+    Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
+                      timestamp = misc:usec_to_now(Timestamp),
+                      packet = Pkt},
+    store_offline_msg(Msg).
+
 %% Helper functions:
 
 -spec check_if_message_should_be_bounced(message()) -> boolean().
@@ -641,25 +703,123 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
 
 -spec read_messages(binary(), binary()) -> [{binary(), message()}].
 read_messages(LUser, LServer) ->
+    Res = read_db_messages(LUser, LServer),
+    case use_mam_for_user(LUser, LServer) of
+       true ->
+           read_mam_messages(LUser, LServer, Res);
+       _ ->
+           Res
+    end.
+
+read_db_messages(LUser, LServer) ->
     Mod = gen_mod:db_mod(LServer, ?MODULE),
     CodecOpts = ejabberd_config:codec_options(LServer),
     lists:flatmap(
-      fun({Seq, From, To, TS, El}) ->
-             Node = integer_to_binary(Seq),
-             try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
-                 Pkt ->
-                     Node = integer_to_binary(Seq),
-                     Pkt1 = add_delay_info(Pkt, LServer, TS),
-                     Pkt2 = xmpp:set_from_to(Pkt1, From, To),
-                     [{Node, Pkt2}]
-             catch _:{xmpp_codec, Why} ->
-                     ?ERROR_MSG("failed to decode packet ~p "
-                                "of user ~s: ~s",
-                                [El, jid:encode(To),
-                                 xmpp:format_error(Why)]),
-                     []
-             end
-      end, Mod:read_message_headers(LUser, LServer)).
+       fun({Seq, From, To, TS, El}) ->
+           Node = integer_to_binary(Seq),
+           try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+               Pkt ->
+                   Node = integer_to_binary(Seq),
+                   Pkt1 = add_delay_info(Pkt, LServer, TS),
+                   Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+                   [{Node, Pkt2}]
+           catch _:{xmpp_codec, Why} ->
+               ?ERROR_MSG("failed to decode packet ~p "
+                          "of user ~s: ~s",
+                          [El, jid:encode(To),
+                           xmpp:format_error(Why)]),
+               []
+           end
+       end, Mod:read_message_headers(LUser, LServer)).
+
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+    {Timestamp, ExtraMsgs} = lists:foldl(
+       fun({_Node, #message{id = <<"ActivityMarker">>,
+                            body = [], type = error} = Msg}, {T, E}) ->
+           case xmpp:get_subtag(Msg, #delay{}) of
+               #delay{stamp = Time} ->
+                   if T == none orelse T > Time ->
+                       {Time, E};
+                       true ->
+                           {T, E}
+                   end
+           end;
+          (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt},
+           {T, E}) ->
+              try xmpp:decode(Pkt) of
+                  #message{id = <<"ActivityMarker">>,
+                           body = [], type = error} = Msg ->
+                      TS2 = case TS of
+                                undefined ->
+                                    case xmpp:get_subtag(Msg, #delay{}) of
+                                        #delay{stamp = TS0} ->
+                                            TS0;
+                                        _ ->
+                                            erlang:timestamp()
+                                    end
+                            end,
+                      if T == none orelse T > TS2 ->
+                          {TS2, E};
+                          true ->
+                              {T, E}
+                      end;
+                  Decoded ->
+                      Pkt1 = add_delay_info(Decoded, LServer, TS),
+                      {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
+                  catch _:{xmpp_codec, _Why} ->
+                      {T, E}
+              end;
+          ({_Node, Msg}, {T, E}) ->
+              {T, [Msg | E]}
+       end, {none, []}, ReadMsgs),
+    Start = case {Timestamp, ExtraMsgs} of
+               {none, [First|_]} ->
+                   case xmpp:get_subtag(First, #delay{}) of
+                       #delay{stamp = {Mega, Sec, Micro}} ->
+                           {Mega, Sec, Micro+1};
+                       _ ->
+                           none
+                   end;
+               {none, _} ->
+                   none;
+               _ ->
+                   Timestamp
+           end,
+    AllMsgs = case Start of
+                 none ->
+                     ExtraMsgs;
+                 _ ->
+                     MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+                                          Number when is_integer(Number) -> Number;
+                                          _ -> 100
+                                      end,
+                     JID = jid:make(LUser, LServer, <<>>),
+                     {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
+                                                      [{start, Start}],
+                                                      #rsm_set{max = MaxOfflineMsgs,
+                                                               before = <<"9999999999999999">>},
+                                                      chat),
+                     MamMsgs2 = lists:map(
+                         fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
+                             add_delay_info(MM, LServer, MMT)
+                         end, MamMsgs),
+
+                     ExtraMsgs ++ MamMsgs2
+             end,
+    AllMsgs2 = lists:sort(
+       fun(A, B) ->
+           case {xmpp:get_subtag(A, #delay{}), xmpp:get_subtag(B, #delay{})} of
+               {#delay{stamp = TA}, #delay{stamp = TB}} ->
+                   TA < TB;
+               _ ->
+                   true
+           end
+       end, AllMsgs),
+    {AllMsgs3, _} = lists:mapfoldl(
+       fun(Msg, Counter) ->
+           {{Counter, Msg}, Counter + 1}
+       end, 1, AllMsgs2),
+    AllMsgs3.
 
 format_user_queue(Hdrs) ->
     lists:map(
@@ -873,6 +1033,9 @@ import(LServer, {sql, _}, DBType, <<"spool">>,
     Mod = gen_mod:db_mod(DBType, ?MODULE),
     Mod:import(OffMsg).
 
+use_mam_for_user(_User, Server) ->
+    gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage).
+
 mod_opt_type(access_max_user_messages) ->
     fun acl:shaper_rules_validator/1;
 mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
@@ -880,6 +1043,8 @@ mod_opt_type(store_groupchat) ->
     fun(V) when is_boolean(V) -> V end;
 mod_opt_type(bounce_groupchat) ->
     fun(V) when is_boolean(V) -> V end;
+mod_opt_type(use_mam_for_storage) ->
+    fun(V) when is_boolean(V) -> V end;
 mod_opt_type(store_empty_body) ->
     fun (V) when is_boolean(V) -> V;
         (unless_chat_state) -> unless_chat_state
@@ -889,5 +1054,6 @@ mod_options(Host) ->
     [{db_type, ejabberd_config:default_db(Host, ?MODULE)},
      {access_max_user_messages, max_user_offline_messages},
      {store_empty_body, unless_chat_state},
+     {use_mam_for_storage, false},
      {bounce_groupchat, false},
      {store_groupchat, false}].
index 1a4308c58cb992dc6c5ea52ce9b66ea53045042b..34ce4e53dfdfdcf126e46111bcab1e70f1fb8fe2 100644 (file)
@@ -591,22 +591,25 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
                      end,
     ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
           [p1_queue:len(Queue), jid:encode(JID)]),
-    p1_queue:foreach(
-      fun({_, _Time, #presence{from = From}}) ->
-             ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]);
-        ({_, _Time, #iq{} = El}) ->
+    p1_queue:foldl(
+      fun({_, _Time, #presence{from = From}}, Acc) ->
+             ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]),
+             Acc;
+        ({_, _Time, #iq{} = El}, Acc) ->
              Txt = <<"User session terminated">>,
              ejabberd_router:route_error(
-               El, xmpp:err_service_unavailable(Txt, Lang));
-        ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) ->
+               El, xmpp:err_service_unavailable(Txt, Lang)),
+             Acc;
+        ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}, Acc) ->
              %% XEP-0280 says: "When a receiving server attempts to deliver a
              %% forked message, and that message bounces with an error for
              %% any reason, the receiving server MUST NOT forward that error
              %% back to the original sender."  Resending such a stanza could
              %% easily lead to unexpected results as well.
              ?DEBUG("Dropping forwarded message stanza from ~s",
-                    [jid:encode(From)]);
-        ({_, Time, #message{} = Msg}) ->
+                    [jid:encode(From)]),
+             Acc;
+        ({_, Time, #message{} = Msg}, Acc) ->
              case ejabberd_hooks:run_fold(message_is_archived,
                                           LServer, false,
                                           [State, Msg]) of
@@ -615,17 +618,26 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
                             [jid:encode(xmpp:get_from(Msg))]);
                  false when ResendOnTimeout ->
                      NewEl = add_resent_delay_info(State, Msg, Time),
-                     ejabberd_router:route(NewEl);
+                     NewEl2 = case Acc of
+                                  first_resend ->
+                                      xmpp:put_meta(NewEl, first_from_queue, true);
+                                  _ ->
+                                      NewEl
+                              end,
+                     ejabberd_router:route(NewEl2),
+                     false;
                  false ->
                      Txt = <<"User session terminated">>,
                      ejabberd_router:route_error(
-                       Msg, xmpp:err_service_unavailable(Txt, Lang))
+                       Msg, xmpp:err_service_unavailable(Txt, Lang)),
+                     Acc
              end;
-        ({_, _Time, El}) ->
+          ({_, _Time, El}, Acc) ->
              %% Raw element of type 'error' resulting from a validation error
              %% We cannot pass it to the router, it will generate an error
-             ?DEBUG("Do not route raw element from ack queue: ~p", [El])
-      end, Queue);
+             ?DEBUG("Do not route raw element from ack queue: ~p", [El]),
+             Acc
+      end, first_resend, Queue);
 route_unacked_stanzas(_State) ->
     ok.