Don't store messages via a single process
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Sun, 21 May 2017 20:21:13 +0000 (23:21 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Sun, 21 May 2017 20:21:13 +0000 (23:21 +0300)
src/mod_offline.erl
src/mod_offline_mnesia.erl
src/mod_offline_riak.erl
src/mod_offline_sql.erl
src/prosody2ejabberd.erl
src/sql_queries.erl

index c06bb89761a4adae00e0dd6be895ecbd684f83c9..2c2c6185a9d5389a484a0341b14f9a627bb0147a 100644 (file)
 -protocol({xep, 160, '1.0'}).
 -protocol({xep, 334, '0.2'}).
 
--behaviour(gen_server).
 -behaviour(gen_mod).
 
 -export([start/2,
         stop/1,
         reload/3,
         store_packet/1,
-        store_offline_msg/5,
+        store_offline_msg/1,
         c2s_self_presence/1,
         get_sm_features/5,
         get_sm_identity/5,
@@ -64,9 +63,7 @@
         webadmin_user/4,
         webadmin_user_parse_query/5]).
 
--export([init/1, handle_call/3, handle_cast/2,
-        handle_info/2, terminate/2, code_change/3,
-        mod_opt_type/1, depends/2]).
+-export([mod_opt_type/1, depends/2]).
 
 -deprecated({get_queue_length,2}).
 
 %% default value for the maximum number of user messages
 -define(MAX_USER_MESSAGES, infinity).
 
--type us() :: {binary(), binary()}.
 -type c2s_state() :: ejabberd_c2s:state().
 
 -callback init(binary(), gen_mod:opts()) -> any().
 -callback import(#offline_msg{}) -> ok.
--callback store_messages(binary(), us(), [#offline_msg{}],
-                        non_neg_integer(), non_neg_integer()) ->
-    {atomic, any()}.
+-callback store_message(#offline_msg{}) -> ok | {error, any()}.
 -callback pop_messages(binary(), binary()) ->
     {ok, [#offline_msg{}]} | {error, any()}.
 -callback remove_expired_messages(binary()) -> {atomic, any()}.
 -callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
 -callback count_messages(binary(), binary()) -> non_neg_integer().
 
-start(Host, Opts) ->
-    gen_mod:start_child(?MODULE, Host, Opts).
-
-stop(Host) ->
-    gen_mod:stop_child(?MODULE, Host).
-
-reload(Host, NewOpts, OldOpts) ->
-    Proc = gen_mod:get_module_proc(Host, ?MODULE),
-    gen_server:cast(Proc, {reload, NewOpts, OldOpts}).
-
 depends(_Host, _Opts) ->
     [].
 
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([Host, Opts]) ->
-    process_flag(trap_exit, true),
+start(Host, Opts) ->
     Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
     Mod:init(Host, Opts),
     IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)),
@@ -153,64 +132,9 @@ init([Host, Opts]) ->
     ejabberd_hooks:add(webadmin_user_parse_query, Host,
                       ?MODULE, webadmin_user_parse_query, 50),
     gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
-                                 ?MODULE, handle_offline_query, IQDisc),
-    AccessMaxOfflineMsgs =
-       gen_mod:get_opt(access_max_user_messages, Opts,
-                       max_user_offline_messages),
-    {ok,
-     #state{host = Host,
-            access_max_offline_messages = AccessMaxOfflineMsgs}}.
-
-
-handle_call(stop, _From, State) ->
-    {stop, normal, ok, State}.
-
-handle_cast({reload, NewOpts, OldOpts}, #state{host = Host} = State) ->
-    NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
-    OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
-    if NewMod /= OldMod ->
-           NewMod:init(Host, NewOpts);
-       true ->
-           ok
-    end,
-    case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
-       {false, IQDisc, _} ->
-           gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
-                                         ?MODULE, handle_offline_query, IQDisc);
-       true ->
-           ok
-    end,
-    case gen_mod:is_equal_opt(access_max_user_messages, NewOpts, OldOpts,
-                             max_user_offline_messages) of
-       {false, AccessMaxOfflineMsgs, _} ->
-           {noreply,
-            State#state{access_max_offline_messages = AccessMaxOfflineMsgs}};
-       true ->
-           {noreply, State}
-    end;
-handle_cast(Msg, State) ->
-    ?WARNING_MSG("unexpected cast: ~p", [Msg]),
-    {noreply, State}.
-
+                                 ?MODULE, handle_offline_query, IQDisc).
 
-handle_info(#offline_msg{us = UserServer} = Msg, State) ->
-    #state{host = Host,
-           access_max_offline_messages = AccessMaxOfflineMsgs} = State,
-    DBType = gen_mod:db_type(Host, ?MODULE),
-    Msgs = receive_all(UserServer, [Msg], DBType),
-    Len = length(Msgs),
-    MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
-                                           UserServer, Host),
-    store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
-    {noreply, State};
-
-handle_info(_Info, State) ->
-    ?ERROR_MSG("got unexpected info: ~p", [_Info]),
-    {noreply, State}.
-
-
-terminate(_Reason, State) ->
-    Host = State#state.host,
+stop(Host) ->
     ejabberd_hooks:delete(offline_message_hook, Host,
                          ?MODULE, store_packet, 50),
     ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@@ -229,41 +153,48 @@ terminate(_Reason, State) ->
                          ?MODULE, webadmin_user, 50),
     ejabberd_hooks:delete(webadmin_user_parse_query, Host,
                          ?MODULE, webadmin_user_parse_query, 50),
-    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
-    ok.
-
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
 
-store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
-    Mod = gen_mod:db_mod(Host, ?MODULE),
-    case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of
-       {atomic, discard} ->
-           discard_warn_sender(Msgs);
-       _ ->
+reload(Host, NewOpts, OldOpts) ->
+    NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
+    OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
+    if NewMod /= OldMod ->
+           NewMod:init(Host, NewOpts);
+       true ->
+           ok
+    end,
+    case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
+       {false, IQDisc, _} ->
+           gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
+                                         ?MODULE, handle_offline_query, IQDisc);
+       true ->
            ok
     end.
 
-get_max_user_messages(AccessRule, {User, Server}, Host) ->
-    case acl:match_rule(
-          Host, AccessRule, jid:make(User, Server)) of
+-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
+store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
+    Mod = gen_mod:db_mod(Server, ?MODULE),
+    case get_max_user_messages(User, Server) of
+       infinity ->
+           Mod:store_message(Msg);
+       Limit ->
+           Num = count_offline_messages(User, Server),
+           if Num < Limit ->
+                   Mod:store_message(Msg);
+              true ->
+                   {error, full}
+           end
+    end.
+
+get_max_user_messages(User, Server) ->
+    Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages,
+                                   max_user_offline_messages),
+    case acl:match_rule(Server, Access, jid:make(User, Server)) of
        Max when is_integer(Max) -> Max;
        infinity -> infinity;
        _ -> ?MAX_USER_MESSAGES
     end.
 
-receive_all(US, Msgs, DBType) ->
-    receive
-      #offline_msg{us = US} = Msg ->
-         receive_all(US, [Msg | Msgs], DBType)
-      after 0 ->
-               case DBType of
-                 mnesia -> Msgs;
-                 sql -> lists:reverse(Msgs);
-                 riak -> Msgs
-               end
-    end.
-
 get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
     Feats = case Acc of
                {result, I} -> I;
@@ -484,14 +415,19 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
                        NewPacket ->
                            TimeStamp = p1_time_compat:timestamp(),
                            Expire = find_x_expire(TimeStamp, NewPacket),
-                           gen_mod:get_module_proc(To#jid.lserver, ?MODULE) !
-                               #offline_msg{us = {LUser, LServer},
-                                            timestamp = TimeStamp,
-                                            expire = Expire,
-                                            from = From,
-                                            to = To,
-                                            packet = NewPacket},
-                           {offlined, NewPacket}
+                           OffMsg = #offline_msg{us = {LUser, LServer},
+                                                 timestamp = TimeStamp,
+                                                 expire = Expire,
+                                                 from = From,
+                                                 to = To,
+                                                 packet = NewPacket},
+                           case store_offline_msg(OffMsg) of
+                               ok ->
+                                   {offlined, NewPacket};
+                               {error, Reason} ->
+                                   discard_warn_sender(Packet, Reason),
+                                   stop
+                           end
                    end;
                _ -> Acc
            end;
@@ -635,15 +571,18 @@ remove_user(User, Server) ->
 %% Helper functions:
 
 %% Warn senders that their messages have been discarded:
-discard_warn_sender(Msgs) ->
-    lists:foreach(
-      fun(#offline_msg{packet = Packet}) ->
-             ErrText = <<"Your contact offline message queue is "
-                         "full. The message has been discarded.">>,
-             Lang = xmpp:get_lang(Packet),
-             Err = xmpp:err_resource_constraint(ErrText, Lang),
-             ejabberd_router:route_error(Packet, Err)
-      end, Msgs).
+-spec discard_warn_sender(message(), full | any()) -> ok.
+discard_warn_sender(Packet, full) ->
+    ErrText = <<"Your contact offline message queue is "
+               "full. The message has been discarded.">>,
+    Lang = xmpp:get_lang(Packet),
+    Err = xmpp:err_resource_constraint(ErrText, Lang),
+    ejabberd_router:route_error(Packet, Err);
+discard_warn_sender(Packet, _) ->
+    ErrText = <<"Database failure">>,
+    Lang = xmpp:get_lang(Packet),
+    Err = xmpp:err_internal_server_error(ErrText, Lang),
+    ejabberd_router:route_error(Packet, Err).
 
 webadmin_page(_, Host,
              #request{us = _US, path = [<<"user">>, U, <<"queue">>],
@@ -790,11 +729,7 @@ get_queue_length(LUser, LServer) ->
     count_offline_messages(LUser, LServer).
 
 get_messages_subset(User, Host, MsgsAll) ->
-    Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
-                                   max_user_offline_messages),
-    MaxOfflineMsgs = case get_max_user_messages(Access,
-                                               User, Host)
-                        of
+    MaxOfflineMsgs = case get_max_user_messages(User, Host) of
                       Number when is_integer(Number) -> Number;
                       _ -> 100
                     end,
index d0d0de418fd344696e6ae190cff143ffb0cf9e5e..a725ab003d01589a3b5423ef260336d1f0f92edd 100644 (file)
@@ -26,7 +26,7 @@
 
 -behaviour(mod_offline).
 
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
         remove_old_messages/2, remove_user/2, read_message_headers/2,
         read_message/3, remove_message/3, read_all_messages/2,
         remove_all_messages/2, count_messages/2, import/1]).
@@ -36,8 +36,6 @@
 -include("mod_offline.hrl").
 -include("logger.hrl").
 
--define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
-
 %%%===================================================================
 %%% API
 %%%===================================================================
@@ -46,26 +44,9 @@ init(_Host, _Opts) ->
                           [{disc_only_copies, [node()]}, {type, bag},
                            {attributes, record_info(fields, offline_msg)}]).
 
-store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) ->
-    F = fun () ->
-               Count = if MaxOfflineMsgs =/= infinity ->
-                               Len + count_mnesia_records(US);
-                          true -> 0
-                       end,
-               if Count > MaxOfflineMsgs -> discard;
-                  true ->
-                       if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
-                               mnesia:write_lock_table(offline_msg);
-                          true -> ok
-                       end,
-                       lists:foreach(
-                         fun(#offline_msg{packet = Pkt} = M) ->
-                                 El = xmpp:encode(Pkt),
-                                 mnesia:write(M#offline_msg{packet = El})
-                         end, Msgs)
-               end
-       end,
-    mnesia:transaction(F).
+store_message(#offline_msg{packet = Pkt} = OffMsg) ->
+    El = xmpp:encode(Pkt),
+    mnesia:dirty_write(OffMsg#offline_msg{packet = El}).
 
 pop_messages(LUser, LServer) ->
     US = {LUser, LServer},
index ffc1450aad4a5cc6b168c139b182a561b8d78c99..5d0fd1af89702dc2b39bd5846a3df3c4700aecd4 100644 (file)
@@ -26,7 +26,7 @@
 
 -behaviour(mod_offline).
 
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
         remove_old_messages/2, remove_user/2, read_message_headers/2,
         read_message/3, remove_message/3, read_all_messages/2,
         remove_all_messages/2, count_messages/2, import/1]).
 init(_Host, _Opts) ->
     ok.
 
-store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) ->
-    Count = if MaxOfflineMsgs =/= infinity ->
-                    Len + count_messages(User, Host);
-               true -> 0
-            end,
-    if
-        Count > MaxOfflineMsgs ->
-            {atomic, discard};
-        true ->
-           try
-               lists:foreach(
-                 fun(#offline_msg{us = US,
-                                  packet = Pkt,
-                                  timestamp = TS} = M) ->
-                         El = xmpp:encode(Pkt),
-                         ok = ejabberd_riak:put(
-                                M#offline_msg{packet = El},
-                                offline_msg_schema(),
-                                [{i, TS}, {'2i', [{<<"us">>, US}]}])
-                 end, Msgs),
-               {atomic, ok}
-           catch _:{badmatch, Err} ->
-                   {atomic, Err}
-           end
-    end.
+store_message(#offline_msg{us = US, packet = Pkt, timestamp = TS} = M) ->
+    El = xmpp:encode(Pkt),
+    ejabberd_riak:put(M#offline_msg{packet = El},
+                     offline_msg_schema(),
+                     [{i, TS}, {'2i', [{<<"us">>, US}]}]).
 
 pop_messages(LUser, LServer) ->
     case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
index a8c587679fdfb2287ad5abc0a8f00c07ed5cf669..48b32be81aa56fc4f7c99970b9795f1d7e47694e 100644 (file)
@@ -28,7 +28,7 @@
 
 -behaviour(mod_offline).
 
--export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
         remove_old_messages/2, remove_user/2, read_message_headers/2,
         read_message/3, remove_message/3, read_all_messages/2,
         remove_all_messages/2, count_messages/2, import/1, export/1]).
 init(_Host, _Opts) ->
     ok.
 
-store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) ->
-    Count = if MaxOfflineMsgs =/= infinity ->
-                   Len + count_messages(User, Host);
-              true -> 0
-           end,
-    if Count > MaxOfflineMsgs -> {atomic, discard};
-       true ->
-           Query = lists:map(
-                     fun(M) ->
-                             LUser = (M#offline_msg.to)#jid.luser,
-                             From = M#offline_msg.from,
-                             To = M#offline_msg.to,
-                             Packet = xmpp:set_from_to(
-                                        M#offline_msg.packet, From, To),
-                             NewPacket = xmpp_util:add_delay_info(
-                                           Packet, jid:make(Host),
-                                           M#offline_msg.timestamp,
-                                           <<"Offline Storage">>),
-                             XML = fxml:element_to_binary(
-                                     xmpp:encode(NewPacket)),
-                              sql_queries:add_spool_sql(LUser, XML)
-                     end,
-                     Msgs),
-           sql_queries:add_spool(Host, Query)
+store_message(#offline_msg{us = {LUser, LServer}} = M) ->
+    From = M#offline_msg.from,
+    To = M#offline_msg.to,
+    Packet = xmpp:set_from_to(M#offline_msg.packet, From, To),
+    NewPacket = xmpp_util:add_delay_info(
+                 Packet, jid:make(LServer),
+                 M#offline_msg.timestamp,
+                 <<"Offline Storage">>),
+    XML = fxml:element_to_binary(
+           xmpp:encode(NewPacket)),
+    case sql_queries:add_spool(LUser, LServer, XML) of
+       {updated, _} ->
+           ok;
+       _ ->
+           {error, db_failure}
     end.
 
 pop_messages(LUser, LServer) ->
index 072da090840f6ed8257780fe67292af6038d8711..312a177be4c245464c5792d6ae8d62da5fb78e47 100644 (file)
@@ -185,15 +185,16 @@ convert_data(_Host, "config", _User, [Data]) ->
 convert_data(Host, "offline", User, [Data]) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Host),
-    Msgs = lists:flatmap(
-            fun({_, RawXML}) ->
-                    case deserialize(RawXML) of
-                        [El] -> el_to_offline_msg(LUser, LServer, El);
-                        _ -> []
-                    end
-            end, Data),
-    mod_offline:store_offline_msg(
-      LServer, {LUser, LServer}, Msgs, length(Msgs), infinity);
+    lists:foreach(
+      fun({_, RawXML}) ->
+             case deserialize(RawXML) of
+                 [El] ->
+                     Msg = el_to_offline_msg(LUser, LServer, El),
+                     ok = mod_offline:store_offline_msg(Msg);
+                 _ ->
+                     ok
+             end
+      end, Data);
 convert_data(Host, "privacy", User, [Data]) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Host),
index 2f2e5586365cfda197c18283adac4acd9728c906..0cf595bdff9ea170260b43efe80f09afdac8e691 100644 (file)
@@ -37,7 +37,7 @@
         set_password_scram_t/6, add_user/3, add_user_scram/6,
         del_user/2, del_user_return_password/3, list_users/1,
         list_users/2, users_number/1, users_number/2,
-        add_spool_sql/2, add_spool/2, get_and_del_spool_msg_t/2,
+        add_spool/3, get_and_del_spool_msg_t/2,
         del_spool_msg/2, get_roster/2, get_roster_jid_groups/2,
         get_roster_groups/3, del_user_roster_t/2,
         get_roster_by_jid/3, get_rostergroup_by_jid/3,
@@ -273,11 +273,10 @@ users_number(LServer, [{prefix, Prefix}])
 users_number(LServer, []) ->
     users_number(LServer).
 
-add_spool_sql(LUser, XML) ->
-    ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)").
-
-add_spool(LServer, Queries) ->
-    ejabberd_sql:sql_transaction(LServer, Queries).
+add_spool(LUser, LServer, XML) ->
+    ejabberd_sql:sql_query(
+      LServer,
+      ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)")).
 
 get_and_del_spool_msg_t(LServer, LUser) ->
     F = fun () ->