]> granicus.if.org Git - ejabberd/commitdiff
Change implementation of mod_offline use_mam_for_storage 19.05
authorPaweł Chmielowski <pchmielowski@process-one.net>
Tue, 28 May 2019 12:32:09 +0000 (14:32 +0200)
committerPaweł Chmielowski <pchmielowski@process-one.net>
Tue, 28 May 2019 12:32:17 +0000 (14:32 +0200)
Previous version was trying to determine range of messages that should
be fetched from mam by storing time when last user resource disconnected.

But that had couple edge cases that could cause problems, for example in
case of node crash we could not store information about user disconnect
and with that we didn't have data to initiate mam query.

New version don't track user disconnects, but simply ensure that we have
timestamp of first message that is gonna be put in storage, after some
measurements cost of that check with caching on top is not that costly,
and as much more robust i decided to introduce that change.

src/mod_offline.erl

index 63a0d6763d76797105f5b63892dde3ed9024379b..76682d06c86b23d13e9da0298b0024f5e61fb0a4 100644 (file)
@@ -61,8 +61,7 @@
         c2s_copy_session/2,
         webadmin_page/3,
         webadmin_user/4,
-        webadmin_user_parse_query/5,
-        user_unset_presence/4]).
+        webadmin_user_parse_query/5]).
 
 -export([mod_opt_type/1, mod_options/1, depends/2]).
 
@@ -83,6 +82,8 @@
 %% default value for the maximum number of user messages
 -define(MAX_USER_MESSAGES, infinity).
 
+-define(EMPTY_SPOOL_CACHE, offline_empty_cache).
+
 -type c2s_state() :: ejabberd_c2s:state().
 
 -callback init(binary(), gen_mod:opts()) -> any().
@@ -110,6 +111,7 @@ depends(_Host, _Opts) ->
 start(Host, Opts) ->
     Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
     Mod:init(Host, Opts),
+    init_cache(Opts),
     ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
                       store_packet, 50),
     ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@@ -132,8 +134,6 @@ start(Host, Opts) ->
                       ?MODULE, webadmin_user, 50),
     ejabberd_hooks:add(webadmin_user_parse_query, Host,
                       ?MODULE, webadmin_user_parse_query, 50),
-    ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
-                      user_unset_presence, 50),
     gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
                                  ?MODULE, handle_offline_query).
 
@@ -156,39 +156,52 @@ stop(Host) ->
                          ?MODULE, webadmin_user, 50),
     ejabberd_hooks:delete(webadmin_user_parse_query, Host,
                          ?MODULE, webadmin_user_parse_query, 50),
-    ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
-                         user_unset_presence, 50),
     gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
 
 reload(Host, NewOpts, OldOpts) ->
     NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
     OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
+    init_cache(NewOpts),
     if NewMod /= OldMod ->
            NewMod:init(Host, NewOpts);
        true ->
            ok
     end.
 
+init_cache(Opts) ->
+    case gen_mod:get_opt(use_mam_for_storage, Opts) of
+        true ->
+           MaxSize = gen_mod:get_opt(cache_size, Opts),
+           LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
+                          infinity -> infinity;
+                          I -> timer:seconds(I)
+                      end,
+           COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}],
+            ets_cache:new(?EMPTY_SPOOL_CACHE, COpts);
+        false ->
+            ets_cache:delete(?EMPTY_SPOOL_CACHE)
+    end.
+
 -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
 store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
-    {UseMam, ActivityMarker} = case use_mam_for_user(User, Server) of
-                                  true ->
-                                      {true, xmpp:get_meta(Pkt, activity_marker, false)};
-                                  _ ->
-                                      {false, false}
-                              end,
-    case UseMam andalso (not ActivityMarker) andalso
-        xmpp:get_meta(Pkt, mam_archived, false) of
+    UseMam = use_mam_for_user(User, Server),
+    case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
        true ->
-           case xmpp:get_meta(Pkt, first_from_queue, false) of
-               true ->
-                   store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
-               _ ->
-                   ok
-           end;
-       false when ActivityMarker ->
            Mod = gen_mod:db_mod(Server, ?MODULE),
-           Mod:store_message(Msg);
+           ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
+               fun() ->
+                   case count_messages_in_db(User, Server) of
+                       0 ->
+                           case Mod:store_message(Msg) of
+                               ok ->
+                                   {cache, ok};
+                               Err ->
+                                   {nocache, Err}
+                           end;
+                       _ ->
+                           {cache, ok}
+                   end
+               end);
        false ->
            Mod = gen_mod:db_mod(Server, ?MODULE),
            case get_max_user_messages(User, Server) of
@@ -554,6 +567,8 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State)
               {ok, OffMsgs} ->
                   case use_mam_for_user(LUser, LServer) of
                       true ->
+                          ets_cache:delete(?EMPTY_SPOOL_CACHE, {LUser, LServer},
+                                           ejabberd_cluster:get_nodes()),
                           lists:map(
                               fun({_, #message{from = From, to = To} = Msg}) ->
                                   #offline_msg{from = From, to = To,
@@ -627,31 +642,6 @@ remove_user(User, Server) ->
     Mod:remove_user(LUser, LServer),
     ok.
 
--spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
-user_unset_presence(User, Server, _Resource, _Status) ->
-    case use_mam_for_user(User, Server) of
-       true ->
-           case ejabberd_sm:get_user_present_resources(User, Server) of
-               [] ->
-                   TimeStamp = erlang:system_time(microsecond),
-                   store_last_activity_marker(User, Server, TimeStamp);
-               _ ->
-                   ok
-           end;
-       _ ->
-           ok
-    end.
-
-store_last_activity_marker(User, Server, Timestamp) ->
-    Jid = jid:make(User, Server, <<>>),
-    Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error, from = Jid, to = Jid},
-                       activity_marker, true),
-
-    Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
-                      timestamp = misc:usec_to_now(Timestamp),
-                      packet = Pkt},
-    store_offline_msg(Msg).
-
 %% Helper functions:
 
 -spec check_if_message_should_be_bounced(message()) -> boolean().
@@ -1123,12 +1113,19 @@ mod_opt_type(use_mam_for_storage) ->
 mod_opt_type(store_empty_body) ->
     fun (V) when is_boolean(V) -> V;
         (unless_chat_state) -> unless_chat_state
+    end;
+mod_opt_type(O) when O == cache_life_time; O == cache_size ->
+    fun (I) when is_integer(I), I > 0 -> I;
+        (infinity) -> infinity
     end.
 
+
 mod_options(Host) ->
     [{db_type, ejabberd_config:default_db(Host, ?MODULE)},
      {access_max_user_messages, max_user_offline_messages},
      {store_empty_body, unless_chat_state},
      {use_mam_for_storage, false},
      {bounce_groupchat, false},
-     {store_groupchat, false}].
+     {store_groupchat, false},
+     {cache_size, ejabberd_config:cache_size(Host)},
+     {cache_life_time, ejabberd_config:cache_life_time(Host)}].