]> granicus.if.org Git - ejabberd/commitdiff
Clean mod_offline.erl from DB specific code
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 15 Apr 2016 10:44:33 +0000 (13:44 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Fri, 15 Apr 2016 10:44:33 +0000 (13:44 +0300)
src/mod_offline.erl
src/mod_offline_mnesia.erl [new file with mode: 0644]
src/mod_offline_riak.erl [new file with mode: 0644]
src/mod_offline_sql.erl [new file with mode: 0644]

index 54eda165c2bfe6210b9e6b45dcb31b4893180cd8..2cdd82ae8a297babaae016441ad4245f66a5ee72 100644 (file)
@@ -25,8 +25,6 @@
 
 -module(mod_offline).
 
--compile([{parse_transform, ejabberd_sql_pt}]).
-
 -author('alexey@process-one.net').
 
 -protocol({xep, 13, '1.2'}).
@@ -61,6 +59,7 @@
         get_queue_length/2,
         count_offline_messages/2,
         get_offline_els/2,
+        find_x_expire/2,
         webadmin_page/3,
         webadmin_user/4,
         webadmin_user_parse_query/5]).
@@ -82,8 +81,6 @@
 
 -include("mod_offline.hrl").
 
--include("ejabberd_sql_pt.hrl").
-
 -define(PROCNAME, ejabberd_offline).
 
 -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
 %% default value for the maximum number of user messages
 -define(MAX_USER_MESSAGES, infinity).
 
+-type us() :: {binary(), binary()}.
+-callback init(binary(), gen_mod:opts()) -> any().
+-callback import(binary(), #offline_msg{}) -> ok | pass.
+-callback store_messages(binary(), us(), [#offline_msg{}],
+                        non_neg_integer(), non_neg_integer()) ->
+    {atomic, any()}.
+-callback pop_messages(binary(), binary()) ->
+    {atomic, [#offline_msg{}]} | {aborted, any()}.
+-callback remove_expired_messages(binary()) -> {atomic, any()}.
+-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
+-callback remove_user(binary(), binary()) -> {atomic, any()}.
+-callback read_message_headers(binary(), binary()) -> any().
+-callback read_message(binary(), binary(), non_neg_integer()) ->
+    {ok, #offline_msg{}} | error.
+-callback remove_message(binary(), binary(), non_neg_integer()) -> ok.
+-callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
+-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
+-callback count_messages(binary(), binary()) -> non_neg_integer().
+
 start_link(Host, Opts) ->
     Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
     ?GEN_SERVER:start_link({local, Proc}, ?MODULE,
@@ -115,14 +131,8 @@ stop(Host) ->
 %%====================================================================
 
 init([Host, Opts]) ->
-    case gen_mod:db_type(Host, Opts) of
-      mnesia ->
-         mnesia:create_table(offline_msg,
-                             [{disc_only_copies, [node()]}, {type, bag},
-                              {attributes, record_info(fields, offline_msg)}]),
-         update_table();
-      _ -> ok
-    end,
+    Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+    Mod:init(Host, Opts),
     IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
                             no_queue),
     ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
@@ -174,7 +184,7 @@ handle_info(#offline_msg{us = UserServer} = Msg, State) ->
     Len = length(Msgs),
     MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
                                            UserServer, Host),
-    store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs, DBType),
+    store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
     {noreply, State};
 
 handle_info(_Info, State) ->
@@ -210,68 +220,12 @@ terminate(_Reason, State) ->
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
 
 store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
-    DBType = gen_mod:db_type(Host, ?MODULE),
-    store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs, DBType).
-
-store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs,
-                 mnesia) ->
-    F = fun () ->
-               Count = if MaxOfflineMsgs =/= infinity ->
-                              Len + count_mnesia_records(US);
-                          true -> 0
-                       end,
-               if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs);
-                  true ->
-                      if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
-                             mnesia:write_lock_table(offline_msg);
-                         true -> ok
-                      end,
-                      lists:foreach(fun (M) -> mnesia:write(M) end, Msgs)
-               end
-       end,
-    mnesia:transaction(F);
-store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) ->
-    Count = if MaxOfflineMsgs =/= infinity ->
-                  Len + count_offline_messages(User, Host);
-              true -> 0
-           end,
-    if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs);
-       true ->
-          Query = lists:map(fun (M) ->
-                                    Username =
-                                        ejabberd_odbc:escape((M#offline_msg.to)#jid.luser),
-                                    From = M#offline_msg.from,
-                                    To = M#offline_msg.to,
-                                    Packet =
-                                        jlib:replace_from_to(From, To,
-                                                             M#offline_msg.packet),
-                                    NewPacket =
-                                        jlib:add_delay_info(Packet, Host,
-                                                            M#offline_msg.timestamp,
-                                                            <<"Offline Storage">>),
-                                    XML =
-                                        ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)),
-                                    odbc_queries:add_spool_sql(Username, XML)
-                            end,
-                            Msgs),
-          odbc_queries:add_spool(Host, Query)
-    end;
-store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
-                 riak) ->
-    Count = if MaxOfflineMsgs =/= infinity ->
-                    Len + count_offline_messages(User, Host);
-               true -> 0
-            end,
-    if
-        Count > MaxOfflineMsgs ->
-            discard_warn_sender(Msgs);
-        true ->
-            lists:foreach(
-              fun(#offline_msg{us = US,
-                               timestamp = TS} = M) ->
-                      ejabberd_riak:put(M, offline_msg_schema(),
-                                       [{i, TS}, {'2i', [{<<"us">>, US}]}])
-              end, Msgs)
+    Mod = gen_mod:db_mod(Host, ?MODULE),
+    case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of
+       {atomic, discard} ->
+           discard_warn_sender(Msgs);
+       _ ->
+           ok
     end.
 
 get_max_user_messages(AccessRule, {User, Server}, Host) ->
@@ -330,11 +284,11 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
            BareJID = jid:to_string(jid:remove_resource(JID)),
            Pid ! dont_ask_offline,
            {result, lists:map(
-                      fun({Node, From, _OfflineMsg}) ->
+                      fun({Node, From, _To, _El}) ->
                               #xmlel{name = <<"item">>,
                                      attrs = [{<<"jid">>, BareJID},
                                               {<<"node">>, Node},
-                                              {<<"name">>, From}]}
+                                              {<<"name">>, jid:to_string(From)}]}
                       end, Hdrs)};
        none ->
            {result, []}
@@ -452,46 +406,31 @@ handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
        Pid when is_pid(Pid) ->
            Pid ! dont_ask_offline,
            lists:foreach(
-             fun({Node, _, Msg}) ->
-                     case offline_msg_to_route(S, Msg) of
-                         {route, From, To, El} ->
-                             NewEl = set_offline_tag(El, Node),
-                             Pid ! {route, From, To, NewEl};
-                         error ->
-                             ok
-                     end
+             fun({Node, From, To, El}) ->
+                     NewEl = set_offline_tag(El, Node),
+                     Pid ! {route, From, To, NewEl}
              end, read_message_headers(U, S))
     end.
 
-fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
-    case jid:from_string(From_s) of
-       From = #jid{} ->
-           case gen_mod:db_type(To#jid.lserver, ?MODULE) of
-               odbc ->
-                   read_message(From, To, Seq, odbc);
-               DBType ->
-                   case binary_to_timestamp(Seq) of
-                       undefined -> ok;
-                       TS -> read_message(From, To, TS, DBType)
-                   end
-           end;
-       error ->
-           ok
+fetch_msg_by_node(To, Seq) ->
+    case catch binary_to_integer(Seq) of
+       I when is_integer(I), I >= 0 ->
+           LUser = To#jid.luser,
+           LServer = To#jid.lserver,
+           Mod = gen_mod:db_mod(LServer, ?MODULE),
+           Mod:read_message(LUser, LServer, I);
+       _ ->
+           error
     end.
 
-remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
-    case jid:from_string(From_s) of
-       From = #jid{} ->
-           case gen_mod:db_type(To#jid.lserver, ?MODULE) of
-               odbc ->
-                   remove_message(From, To, Seq, odbc);
-               DBType ->
-                   case binary_to_timestamp(Seq) of
-                       undefined -> ok;
-                       TS -> remove_message(From, To, TS, DBType)
-                   end
-           end;
-       error ->
+remove_msg_by_node(To, Seq) ->
+    case catch binary_to_integer(Seq) of
+       I when is_integer(I), I>= 0 ->
+           LUser = To#jid.luser,
+           LServer = To#jid.lserver,
+           Mod = gen_mod:db_mod(LServer, ?MODULE),
+           Mod:remove_message(LUser, LServer, I);
+       _ ->
            ok
     end.
 
@@ -648,21 +587,11 @@ find_x_expire(TimeStamp, [El | Els]) ->
 resend_offline_messages(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
-    US = {LUser, LServer},
-    F = fun () ->
-               Rs = mnesia:wread({offline_msg, US}),
-               mnesia:delete({offline_msg, US}),
-               Rs
-       end,
-    case mnesia:transaction(F) of
-      {atomic, Rs} ->
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    case Mod:pop_messages(LUser, LServer) of
+      {ok, Rs} ->
          lists:foreach(fun (R) ->
-                               ejabberd_sm !
-                                 {route, R#offline_msg.from, R#offline_msg.to,
-                                  jlib:add_delay_info(R#offline_msg.packet,
-                                                      LServer,
-                                                      R#offline_msg.timestamp,
-                                                      <<"Offline Storage">>)}
+                               ejabberd_sm ! offline_msg_to_route(LServer, R)
                        end,
                        lists:keysort(#offline_msg.timestamp, Rs));
       _ -> ok
@@ -671,190 +600,47 @@ resend_offline_messages(User, Server) ->
 pop_offline_messages(Ls, User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
-    pop_offline_messages(Ls, LUser, LServer,
-                        gen_mod:db_type(LServer, ?MODULE)).
-
-pop_offline_messages(Ls, LUser, LServer, mnesia) ->
-    US = {LUser, LServer},
-    F = fun () ->
-               Rs = mnesia:wread({offline_msg, US}),
-               mnesia:delete({offline_msg, US}),
-               Rs
-       end,
-    case mnesia:transaction(F) of
-      {atomic, Rs} ->
-         TS = p1_time_compat:timestamp(),
-         Ls ++
-           lists:map(fun (R) ->
-                             offline_msg_to_route(LServer, R)
-                     end,
-                     lists:filter(fun (R) ->
-                                          case R#offline_msg.expire of
-                                            never -> true;
-                                            TimeStamp -> TS < TimeStamp
-                                          end
-                                  end,
-                                  lists:keysort(#offline_msg.timestamp, Rs)));
-      _ -> Ls
-    end;
-pop_offline_messages(Ls, LUser, LServer, odbc) ->
-    case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of
-      {atomic, {selected, Rs}} ->
-         Ls ++
-           lists:flatmap(fun ({_, XML}) ->
-                                 case fxml_stream:parse_element(XML) of
-                                   {error, _Reason} ->
-                                          [];
-                                   El ->
-                                          case offline_msg_to_route(LServer, El) of
-                                              error ->
-                                                  [];
-                                              RouteMsg ->
-                                                  [RouteMsg]
-                                          end
-                                 end
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    case Mod:pop_messages(LUser, LServer) of
+       {ok, Rs} ->
+           TS = p1_time_compat:timestamp(),
+           Ls ++
+               lists:map(fun (R) ->
+                                 offline_msg_to_route(LServer, R)
                          end,
-                         Rs);
-      _ -> Ls
-    end;
-pop_offline_messages(Ls, LUser, LServer, riak) ->
-    case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
-                                    <<"us">>, {LUser, LServer}) of
-        {ok, Rs} ->
-            try
-                lists:foreach(
-                  fun(#offline_msg{timestamp = T}) ->
-                          ok = ejabberd_riak:delete(offline_msg, T)
-                  end, Rs),
-                TS = p1_time_compat:timestamp(),
-                Ls ++ lists:map(
-                        fun (R) ->
-                                offline_msg_to_route(LServer, R)
-                        end,
-                        lists:filter(
-                          fun(R) ->
-                                  case R#offline_msg.expire of
-                                      never -> true;
-                                      TimeStamp -> TS < TimeStamp
-                                  end
-                          end,
-                          lists:keysort(#offline_msg.timestamp, Rs)))
-            catch _:{badmatch, _} ->
-                    Ls
-            end;
+                         lists:filter(
+                           fun(#offline_msg{packet = Pkt} = R) ->
+                                   #xmlel{children = Els} = Pkt,
+                                   Expire = case R#offline_msg.expire of
+                                                undefined ->
+                                                    find_x_expire(TS, Els);
+                                                Exp ->
+                                                    Exp
+                                            end,
+                                   case Expire of
+                                       never -> true;
+                                       TimeStamp -> TS < TimeStamp
+                                   end
+                           end, Rs));
        _ ->
            Ls
     end.
 
 remove_expired_messages(Server) ->
     LServer = jid:nameprep(Server),
-    remove_expired_messages(LServer,
-                           gen_mod:db_type(LServer, ?MODULE)).
-
-remove_expired_messages(_LServer, mnesia) ->
-    TimeStamp = p1_time_compat:timestamp(),
-    F = fun () ->
-               mnesia:write_lock_table(offline_msg),
-               mnesia:foldl(fun (Rec, _Acc) ->
-                                    case Rec#offline_msg.expire of
-                                      never -> ok;
-                                      TS ->
-                                          if TS < TimeStamp ->
-                                                 mnesia:delete_object(Rec);
-                                             true -> ok
-                                          end
-                                    end
-                            end,
-                            ok, offline_msg)
-       end,
-    mnesia:transaction(F);
-remove_expired_messages(_LServer, odbc) -> {atomic, ok};
-remove_expired_messages(_LServer, riak) -> {atomic, ok}.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:remove_expired_messages(LServer).
 
 remove_old_messages(Days, Server) ->
     LServer = jid:nameprep(Server),
-    remove_old_messages(Days, LServer,
-                       gen_mod:db_type(LServer, ?MODULE)).
-
-remove_old_messages(Days, _LServer, mnesia) ->
-    S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days,
-    MegaSecs1 = S div 1000000,
-    Secs1 = S rem 1000000,
-    TimeStamp = {MegaSecs1, Secs1, 0},
-    F = fun () ->
-               mnesia:write_lock_table(offline_msg),
-               mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec,
-                                 _Acc)
-                                    when TS < TimeStamp ->
-                                    mnesia:delete_object(Rec);
-                                (_Rec, _Acc) -> ok
-                            end,
-                            ok, offline_msg)
-       end,
-    mnesia:transaction(F);
-
-remove_old_messages(Days, LServer, odbc) ->
-    case catch ejabberd_odbc:sql_query(
-                LServer,
-                [<<"DELETE FROM spool"
-                  " WHERE created_at < "
-                  "DATE_SUB(CURDATE(), INTERVAL ">>,
-                 integer_to_list(Days), <<" DAY);">>]) of
-       {updated, N} ->
-           ?INFO_MSG("~p message(s) deleted from offline spool", [N]);
-       _Error ->
-           ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error])
-    end,
-    {atomic, ok};
-remove_old_messages(_Days, _LServer, riak) ->
-    {atomic, ok}.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:remove_old_messages(Days, LServer).
 
 remove_user(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
-    remove_user(LUser, LServer,
-               gen_mod:db_type(LServer, ?MODULE)).
-
-remove_user(LUser, LServer, mnesia) ->
-    US = {LUser, LServer},
-    F = fun () -> mnesia:delete({offline_msg, US}) end,
-    mnesia:transaction(F);
-remove_user(LUser, LServer, odbc) ->
-    odbc_queries:del_spool_msg(LServer, LUser);
-remove_user(LUser, LServer, riak) ->
-    {atomic, ejabberd_riak:delete_by_index(offline_msg,
-                                           <<"us">>, {LUser, LServer})}.
-
-jid_to_binary(#jid{user = U, server = S, resource = R,
-                   luser = LU, lserver = LS, lresource = LR}) ->
-    #jid{user = iolist_to_binary(U),
-         server = iolist_to_binary(S),
-         resource = iolist_to_binary(R),
-         luser = iolist_to_binary(LU),
-         lserver = iolist_to_binary(LS),
-         lresource = iolist_to_binary(LR)}.
-
-update_table() ->
-    Fields = record_info(fields, offline_msg),
-    case mnesia:table_info(offline_msg, attributes) of
-        Fields ->
-            ejabberd_config:convert_table_to_binary(
-              offline_msg, Fields, bag,
-              fun(#offline_msg{us = {U, _}}) -> U end,
-              fun(#offline_msg{us = {U, S},
-                               from = From,
-                               to = To,
-                               packet = El} = R) ->
-                      R#offline_msg{us = {iolist_to_binary(U),
-                                          iolist_to_binary(S)},
-                                    from = jid_to_binary(From),
-                                    to = jid_to_binary(To),
-                                    packet = fxml:to_xmlel(El)}
-              end);
-        _ ->
-            ?INFO_MSG("Recreating offline_msg table", []),
-            mnesia:transform_table(offline_msg, ignore, Fields)
-    end.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:remove_user(LUser, LServer).
 
 %% Helper functions:
 
@@ -880,255 +666,71 @@ webadmin_page(_, Host,
 webadmin_page(Acc, _, _) -> Acc.
 
 get_offline_els(LUser, LServer) ->
-    get_offline_els(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)).
-
-get_offline_els(LUser, LServer, DBType)
-  when DBType == mnesia; DBType == riak ->
-    Msgs = read_all_msgs(LUser, LServer, DBType),
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Hdrs = Mod:read_message_headers(LUser, LServer),
     lists:map(
-      fun(Msg) ->
-              {route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
-              jlib:replace_from_to(From, To, Packet)
-      end, Msgs);
-get_offline_els(LUser, LServer, odbc) ->
-    case catch ejabberd_odbc:sql_query(
-                 LServer,
-                 ?SQL("select @(xml)s from spool where "
-                      "username=%(LUser)s order by seq")) of
-        {selected, Rs} ->
-            lists:flatmap(
-              fun({XML}) ->
-                      case fxml_stream:parse_element(XML) of
-                          #xmlel{} = El ->
-                              case offline_msg_to_route(LServer, El) of
-                                  {route, _, _, NewEl} ->
-                                      [NewEl];
-                                  error ->
-                                      []
-                              end;
-                          _ ->
-                              []
-                      end
-              end, Rs);
-        _ ->
-            []
-    end.
+      fun({_Seq, From, To, Packet}) ->
+             jlib:replace_from_to(From, To, Packet)
+      end, Hdrs).
 
 offline_msg_to_route(LServer, #offline_msg{} = R) ->
-    {route, R#offline_msg.from, R#offline_msg.to,
-     jlib:add_delay_info(R#offline_msg.packet, LServer, R#offline_msg.timestamp,
-                        <<"Offline Storage">>)};
-offline_msg_to_route(_LServer, #xmlel{} = El) ->
-    To = jid:from_string(fxml:get_tag_attr_s(<<"to">>, El)),
-    From = jid:from_string(fxml:get_tag_attr_s(<<"from">>, El)),
-    if (To /= error) and (From /= error) ->
-            {route, From, To, El};
-       true ->
-            error
-    end.
-
-binary_to_timestamp(TS) ->
-    case catch jlib:binary_to_integer(TS) of
-       Int when is_integer(Int) ->
-           Secs = Int div 1000000,
-           USec = Int rem 1000000,
-           MSec = Secs div 1000000,
-           Sec = Secs rem 1000000,
-           {MSec, Sec, USec};
-       _ ->
-           undefined
-    end.
-
-timestamp_to_binary({MS, S, US}) ->
-    format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)).
-
-format_timestamp(TS) ->
-    iolist_to_binary(io_lib:format("~20..0s", [TS])).
-
-offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) ->
-    TS = timestamp_to_binary(Int),
-    From_s = jid:to_string(From),
-    {<<TS/binary, "+", From_s/binary>>, From_s, Msg}.
+    El = case R#offline_msg.timestamp of
+            undefined ->
+                R#offline_msg.packet;
+            TS ->
+                jlib:add_delay_info(R#offline_msg.packet, LServer, TS,
+                                    <<"Offline Storage">>)
+        end,
+    {route, R#offline_msg.from, R#offline_msg.to, El}.
 
 read_message_headers(LUser, LServer) ->
-    DBType = gen_mod:db_type(LServer, ?MODULE),
-    read_message_headers(LUser, LServer, DBType).
-
-read_message_headers(LUser, LServer, mnesia) ->
-    Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
-    Hdrs = lists:map(fun offline_msg_to_header/1, Msgs),
-    lists:keysort(1, Hdrs);
-read_message_headers(LUser, LServer, riak) ->
-    case ejabberd_riak:get_by_index(
-           offline_msg, offline_msg_schema(),
-          <<"us">>, {LUser, LServer}) of
-        {ok, Rs} ->
-           Hdrs = lists:map(fun offline_msg_to_header/1, Rs),
-           lists:keysort(1, Hdrs);
-       _Err ->
-           []
-    end;
-read_message_headers(LUser, LServer, odbc) ->
-    Username = ejabberd_odbc:escape(LUser),
-    case catch ejabberd_odbc:sql_query(
-                LServer, [<<"select xml, seq from spool where username ='">>,
-                          Username, <<"' order by seq;">>]) of
-       {selected, [<<"xml">>, <<"seq">>], Rows} ->
-           Hdrs = lists:flatmap(
-                    fun([XML, Seq]) ->
-                            try
-                                #xmlel{} = El = fxml_stream:parse_element(XML),
-                                From = fxml:get_tag_attr_s(<<"from">>, El),
-                                #jid{} = jid:from_string(From),
-                                TS = format_timestamp(Seq),
-                                [{<<TS/binary, "+", From/binary>>, From, El}]
-                            catch _:_ -> []
-                            end
-                    end, Rows),
-           lists:keysort(1, Hdrs);
-       _Err ->
-           []
-    end.
-
-read_message(_From, To, TS, mnesia) ->
-    {U, S, _} = jid:tolower(To),
-    case mnesia:dirty_match_object(
-          offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of
-       [Msg|_] ->
-           {ok, Msg};
-       _ ->
-           error
-    end;
-read_message(_From, _To, TS, riak) ->
-    case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
-       {ok, Msg} ->
-           {ok, Msg};
-       _ ->
-           error
-    end;
-read_message(_From, To, Seq, odbc) ->
-    {LUser, LServer, _} = jid:tolower(To),
-    Username = ejabberd_odbc:escape(LUser),
-    SSeq = ejabberd_odbc:escape(Seq),
-    case ejabberd_odbc:sql_query(
-          LServer,
-          [<<"select xml from spool  where username='">>, Username,
-           <<"'  and seq='">>, SSeq, <<"';">>]) of
-       {selected, [<<"xml">>], [[RawXML]|_]} ->
-           case fxml_stream:parse_element(RawXML) of
-               #xmlel{} = El -> {ok, El};
-               {error, _} -> error
-           end;
-       _ ->
-           error
-    end.
-
-remove_message(_From, To, TS, mnesia) ->
-    {U, S, _} = jid:tolower(To),
-    Msgs = mnesia:dirty_match_object(
-            offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}),
-    lists:foreach(
-      fun(Msg) ->
-             mnesia:dirty_delete_object(Msg)
-      end, Msgs);
-remove_message(_From, _To, TS, riak) ->
-    ejabberd_riak:delete(offline_msg, TS),
-    ok;
-remove_message(_From, To, Seq, odbc) ->
-    {LUser, LServer, _} = jid:tolower(To),
-    Username = ejabberd_odbc:escape(LUser),
-    SSeq = ejabberd_odbc:escape(Seq),
-    ejabberd_odbc:sql_query(
-      LServer,
-      [<<"delete from spool  where username='">>, Username,
-       <<"'  and seq='">>, SSeq, <<"';">>]),
-    ok.
-
-read_all_msgs(LUser, LServer, mnesia) ->
-    US = {LUser, LServer},
-    lists:keysort(#offline_msg.timestamp,
-                 mnesia:dirty_read({offline_msg, US}));
-read_all_msgs(LUser, LServer, riak) ->
-    case ejabberd_riak:get_by_index(
-           offline_msg, offline_msg_schema(),
-          <<"us">>, {LUser, LServer}) of
-        {ok, Rs} ->
-            lists:keysort(#offline_msg.timestamp, Rs);
-        _Err ->
-            []
-    end;
-read_all_msgs(LUser, LServer, odbc) ->
-    case catch ejabberd_odbc:sql_query(
-                 LServer,
-                 ?SQL("select @(xml)s from spool where "
-                      "username=%(LUser)s order by seq")) of
-        {selected, Rs} ->
-            lists:flatmap(
-              fun({XML}) ->
-                      case fxml_stream:parse_element(XML) of
-                          {error, _Reason} -> [];
-                          El -> [El]
-                      end
-              end,
-              Rs);
-        _ -> []
-    end.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    lists:map(
+      fun({Seq, From, To, El}) ->
+             Node = integer_to_binary(Seq),
+             {Node, From, To, El}
+      end, Mod:read_message_headers(LUser, LServer)).
 
-format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
-    lists:map(fun (#offline_msg{timestamp = TimeStamp,
-                               from = From, to = To,
-                               packet =
-                                   #xmlel{name = Name, attrs = Attrs,
-                                          children = Els}} =
-                      Msg) ->
-                     ID = jlib:encode_base64((term_to_binary(Msg))),
-                     {{Year, Month, Day}, {Hour, Minute, Second}} =
-                         calendar:now_to_local_time(TimeStamp),
-                     Time =
-                         iolist_to_binary(io_lib:format("~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
-                                                        [Year, Month, Day,
-                                                         Hour, Minute,
-                                                         Second])),
-                     SFrom = jid:to_string(From),
-                     STo = jid:to_string(To),
-                     Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs),
-                     Packet = #xmlel{name = Name, attrs = Attrs2,
-                                     children = Els},
-                     FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
-                     ?XE(<<"tr">>,
-                         [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
-                               [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
-                          ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time),
-                          ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom),
-                          ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo),
-                          ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
-                               [?XC(<<"pre">>, FPacket)])])
-             end,
-             Msgs);
-format_user_queue(Msgs, odbc) ->
-    lists:map(fun (#xmlel{} = Msg) ->
-                     ID = jlib:encode_base64((term_to_binary(Msg))),
-                     Packet = Msg,
-                     FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
-                     ?XE(<<"tr">>,
-                         [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
-                               [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
-                          ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
-                               [?XC(<<"pre">>, FPacket)])])
-             end,
-             Msgs).
+format_user_queue(Hdrs) ->
+    lists:map(
+      fun({Seq, From, To, El}) ->
+             ID = integer_to_binary(Seq),
+             FPacket = ejabberd_web_admin:pretty_print_xml(El),
+             SFrom = jid:to_string(From),
+             STo = jid:to_string(To),
+             Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
+                                          {attr, <<"stamp">>}]),
+             Time = case jlib:datetime_string_to_timestamp(Stamp) of
+                        {_, _, _} = Now ->
+                            {{Year, Month, Day}, {Hour, Minute, Second}} =
+                                calendar:now_to_local_time(Now),
+                            iolist_to_binary(
+                              io_lib:format(
+                                "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
+                                [Year, Month, Day, Hour, Minute,
+                                 Second]));
+                        _ ->
+                            <<"">>
+                    end,
+             ?XE(<<"tr">>,
+                 [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
+                       [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
+                  ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time),
+                  ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom),
+                  ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo),
+                  ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
+                       [?XC(<<"pre">>, FPacket)])])
+      end, Hdrs).
 
 user_queue(User, Server, Query, Lang) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
     US = {LUser, LServer},
-    DBType = gen_mod:db_type(LServer, ?MODULE),
-    Res = user_queue_parse_query(LUser, LServer, Query,
-                                DBType),
-    MsgsAll = read_all_msgs(LUser, LServer, DBType),
-    Msgs = get_messages_subset(US, Server, MsgsAll,
-                              DBType),
-    FMsgs = format_user_queue(Msgs, DBType),
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Res = user_queue_parse_query(LUser, LServer, Query),
+    HdrsAll = Mod:read_message_headers(LUser, LServer),
+    Hdrs = get_messages_subset(US, Server, HdrsAll),
+    FMsgs = format_user_queue(Hdrs),
     [?XC(<<"h1">>,
         list_to_binary(io_lib:format(?T(<<"~s's Offline Messages Queue">>),
                                       [us_to_list(US)])))]
@@ -1158,96 +760,24 @@ user_queue(User, Server, Query, Lang) ->
               ?INPUTT(<<"submit">>, <<"delete">>,
                       <<"Delete Selected">>)])].
 
-user_queue_parse_query(LUser, LServer, Query, mnesia) ->
-    US = {LUser, LServer},
-    case lists:keysearch(<<"delete">>, 1, Query) of
-      {value, _} ->
-         Msgs = lists:keysort(#offline_msg.timestamp,
-                              mnesia:dirty_read({offline_msg, US})),
-         F = fun () ->
-                     lists:foreach(fun (Msg) ->
-                                           ID =
-                                               jlib:encode_base64((term_to_binary(Msg))),
-                                           case lists:member({<<"selected">>,
-                                                              ID},
-                                                             Query)
-                                               of
-                                             true -> mnesia:delete_object(Msg);
-                                             false -> ok
-                                           end
-                                   end,
-                                   Msgs)
-             end,
-         mnesia:transaction(F),
-         ok;
-      false -> nothing
-    end;
-user_queue_parse_query(LUser, LServer, Query, riak) ->
+user_queue_parse_query(LUser, LServer, Query) ->
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
     case lists:keysearch(<<"delete">>, 1, Query) of
-        {value, _} ->
-            Msgs = read_all_msgs(LUser, LServer, riak),
-            lists:foreach(
-              fun (Msg) ->
-                      ID = jlib:encode_base64((term_to_binary(Msg))),
-                      case lists:member({<<"selected">>, ID}, Query) of
-                          true ->
-                              ejabberd_riak:delete(offline_msg,
-                                                   Msg#offline_msg.timestamp);
-                          false ->
-                              ok
-                      end
-              end,
-              Msgs),
-            ok;
-        false ->
-            nothing
-    end;
-user_queue_parse_query(LUser, LServer, Query, odbc) ->
-    Username = ejabberd_odbc:escape(LUser),
-    case lists:keysearch(<<"delete">>, 1, Query) of
-      {value, _} ->
-         Msgs = case catch ejabberd_odbc:sql_query(LServer,
-                                                   [<<"select xml, seq from spool  where username='">>,
-                                                    Username,
-                                                    <<"'  order by seq;">>])
-                    of
-                  {selected, [<<"xml">>, <<"seq">>], Rs} ->
-                      lists:flatmap(fun ([XML, Seq]) ->
-                                            case fxml_stream:parse_element(XML)
-                                                of
-                                              {error, _Reason} -> [];
-                                              El -> [{El, Seq}]
-                                            end
-                                    end,
-                                    Rs);
-                  _ -> []
-                end,
-         F = fun () ->
-                     lists:foreach(fun ({Msg, Seq}) ->
-                                           ID =
-                                               jlib:encode_base64((term_to_binary(Msg))),
-                                           case lists:member({<<"selected">>,
-                                                              ID},
-                                                             Query)
-                                               of
-                                             true ->
-                                                 SSeq =
-                                                     ejabberd_odbc:escape(Seq),
-                                                 catch
-                                                   ejabberd_odbc:sql_query(LServer,
-                                                                           [<<"delete from spool  where username='">>,
-                                                                            Username,
-                                                                            <<"'  and seq='">>,
-                                                                            SSeq,
-                                                                            <<"';">>]);
-                                             false -> ok
-                                           end
-                                   end,
-                                   Msgs)
-             end,
-         mnesia:transaction(F),
-         ok;
-      false -> nothing
+       {value, _} ->
+           case lists:keyfind(<<"selected">>, 1, Query) of
+               {_, Seq} ->
+                   case catch binary_to_integer(Seq) of
+                       I when is_integer(I), I>=0 ->
+                           Mod:remove_message(LUser, LServer, I),
+                           ok;
+                       _ ->
+                           nothing
+                   end;
+               false ->
+                   nothing
+           end;
+       _ ->
+           nothing
     end.
 
 us_to_list({User, Server}) ->
@@ -1256,7 +786,7 @@ us_to_list({User, Server}) ->
 get_queue_length(LUser, LServer) ->
     count_offline_messages(LUser, LServer).
 
-get_messages_subset(User, Host, MsgsAll, DBType) ->
+get_messages_subset(User, Host, MsgsAll) ->
     Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
                                     fun(A) when is_atom(A) -> A end,
                                    max_user_offline_messages),
@@ -1267,33 +797,20 @@ get_messages_subset(User, Host, MsgsAll, DBType) ->
                       _ -> 100
                     end,
     Length = length(MsgsAll),
-    get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll,
-                        DBType).
+    get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll).
 
-get_messages_subset2(Max, Length, MsgsAll, _DBType)
-    when Length =< Max * 2 ->
+get_messages_subset2(Max, Length, MsgsAll) when Length =< Max * 2 ->
     MsgsAll;
-get_messages_subset2(Max, Length, MsgsAll, DBType)
-  when DBType == mnesia; DBType == riak ->
+get_messages_subset2(Max, Length, MsgsAll) ->
     FirstN = Max,
     {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
     MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
                              Msgs2),
     NoJID = jid:make(<<"...">>, <<"...">>, <<"">>),
-    IntermediateMsg = #offline_msg{timestamp = p1_time_compat:timestamp(),
-                                  from = NoJID, to = NoJID,
-                                  packet =
-                                      #xmlel{name = <<"...">>, attrs = [],
-                                             children = []}},
-    MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN;
-get_messages_subset2(Max, Length, MsgsAll, odbc) ->
-    FirstN = Max,
-    {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
-    MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
-                             Msgs2),
+    Seq = <<"0">>,
     IntermediateMsg = #xmlel{name = <<"...">>, attrs = [],
                             children = []},
-    MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN.
+    MsgsFirstN ++ [{Seq, NoJID, NoJID, IntermediateMsg}] ++ MsgsLastN.
 
 webadmin_user(Acc, User, Server, Lang) ->
     QueueLen = count_offline_messages(jid:nodeprep(User),
@@ -1310,25 +827,8 @@ webadmin_user(Acc, User, Server, Lang) ->
 delete_all_msgs(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
-    delete_all_msgs(LUser, LServer,
-                   gen_mod:db_type(LServer, ?MODULE)).
-
-delete_all_msgs(LUser, LServer, mnesia) ->
-    US = {LUser, LServer},
-    F = fun () ->
-               mnesia:write_lock_table(offline_msg),
-               lists:foreach(fun (Msg) -> mnesia:delete_object(Msg)
-                             end,
-                             mnesia:dirty_read({offline_msg, US}))
-       end,
-    mnesia:transaction(F);
-delete_all_msgs(LUser, LServer, riak) ->
-    Res = ejabberd_riak:delete_by_index(offline_msg,
-                                        <<"us">>, {LUser, LServer}),
-    {atomic, Res};
-delete_all_msgs(LUser, LServer, odbc) ->
-    odbc_queries:del_spool_msg(LServer, LUser),
-    {atomic, ok}.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:remove_all_messages(LUser, LServer).
 
 webadmin_user_parse_query(_, <<"removealloffline">>,
                          User, Server, _Query) ->
@@ -1350,112 +850,20 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
 count_offline_messages(User, Server) ->
     LUser = jid:nodeprep(User),
     LServer = jid:nameprep(Server),
-    DBType = gen_mod:db_type(LServer, ?MODULE),
-    count_offline_messages(LUser, LServer, DBType).
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:count_messages(LUser, LServer).
 
-count_offline_messages(LUser, LServer, mnesia) ->
-    US = {LUser, LServer},
-    F = fun () ->
-               count_mnesia_records(US)
-       end,
-    case catch mnesia:async_dirty(F) of
-      I when is_integer(I) -> I;
-      _ -> 0
-    end;
-count_offline_messages(LUser, LServer, odbc) ->
-    case catch ejabberd_odbc:sql_query(
-                 LServer,
-                 ?SQL("select @(count(*))d from spool "
-                      "where username=%(LUser)s")) of
-        {selected, [{Res}]} ->
-            Res;
-        _ -> 0
-    end;
-count_offline_messages(LUser, LServer, riak) ->
-    case ejabberd_riak:count_by_index(
-           offline_msg, <<"us">>, {LUser, LServer}) of
-        {ok, Res} ->
-            Res;
-        _ ->
-            0
-    end.
-
-%% Return the number of records matching a given match expression.
-%% This function is intended to be used inside a Mnesia transaction.
-%% The count has been written to use the fewest possible memory by
-%% getting the record by small increment and by using continuation.
--define(BATCHSIZE, 100).
-
-count_mnesia_records(US) ->
-    MatchExpression = #offline_msg{us = US,  _ = '_'},
-    case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}],
-                      ?BATCHSIZE, read) of
-       {Result, Cont} ->
-           Count = length(Result),
-           count_records_cont(Cont, Count);
-       '$end_of_table' ->
-           0
-    end.
-
-count_records_cont(Cont, Count) ->
-    case mnesia:select(Cont) of
-       {Result, Cont} ->
-           NewCount = Count + length(Result),
-           count_records_cont(Cont, NewCount);
-       '$end_of_table' ->
-           Count
-    end.
-
-offline_msg_schema() ->
-    {record_info(fields, offline_msg), #offline_msg{}}.
-
-export(_Server) ->
-    [{offline_msg,
-      fun(Host, #offline_msg{us = {LUser, LServer},
-                             timestamp = TimeStamp, from = From, to = To,
-                             packet = Packet})
-            when LServer == Host ->
-              Username = ejabberd_odbc:escape(LUser),
-              Packet1 = jlib:replace_from_to(From, To, Packet),
-              Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp,
-                                            <<"Offline Storage">>),
-              XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)),
-              [[<<"delete from spool where username='">>, Username, <<"';">>],
-               [<<"insert into spool(username, xml) values ('">>,
-                Username, <<"', '">>, XML, <<"');">>]];
-         (_Host, _R) ->
-              []
-      end}].
+export(LServer) ->
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:export(LServer).
 
 import(LServer) ->
-    [{<<"select username, xml from spool;">>,
-      fun([LUser, XML]) ->
-              El = #xmlel{} = fxml_stream:parse_element(XML),
-              From = #jid{} = jid:from_string(
-                                fxml:get_attr_s(<<"from">>, El#xmlel.attrs)),
-              To = #jid{} = jid:from_string(
-                              fxml:get_attr_s(<<"to">>, El#xmlel.attrs)),
-              Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
-                                          {attr, <<"stamp">>}]),
-              TS = case jlib:datetime_string_to_timestamp(Stamp) of
-                       {_, _, _} = Now ->
-                           Now;
-                       undefined ->
-                           p1_time_compat:timestamp()
-                   end,
-              Expire = find_x_expire(TS, El#xmlel.children),
-              #offline_msg{us = {LUser, LServer},
-                           from = From, to = To,
-                           timestamp = TS, expire = Expire}
-      end}].
-
-import(_LServer, mnesia, #offline_msg{} = Msg) ->
-    mnesia:dirty_write(Msg);
-import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) ->
-    ejabberd_riak:put(M, offline_msg_schema(),
-                     [{i, TS}, {'2i', [{<<"us">>, US}]}]);
-import(_, _, _) ->
-    pass.
+    Mod = gen_mod:db_mod(LServer, ?MODULE),
+    Mod:import(LServer).
+
+import(LServer, DBType, Data) ->
+    Mod = gen_mod:db_mod(DBType, ?MODULE),
+    Mod:import(LServer, Data).
 
 mod_opt_type(access_max_user_messages) ->
     fun (A) -> A end;
diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl
new file mode 100644 (file)
index 0000000..6a1d9e3
--- /dev/null
@@ -0,0 +1,232 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_mnesia).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+        remove_old_messages/2, remove_user/2, read_message_headers/2,
+        read_message/3, remove_message/3, read_all_messages/2,
+        remove_all_messages/2, count_messages/2, import/2]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+-include("logger.hrl").
+
+-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+    mnesia:create_table(offline_msg,
+                       [{disc_only_copies, [node()]}, {type, bag},
+                        {attributes, record_info(fields, offline_msg)}]),
+    update_table().
+
+store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) ->
+    F = fun () ->
+               Count = if MaxOfflineMsgs =/= infinity ->
+                               Len + count_mnesia_records(US);
+                          true -> 0
+                       end,
+               if Count > MaxOfflineMsgs -> discard;
+                  true ->
+                       if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
+                               mnesia:write_lock_table(offline_msg);
+                          true -> ok
+                       end,
+                       lists:foreach(fun (M) -> mnesia:write(M) end, Msgs)
+               end
+       end,
+    mnesia:transaction(F).
+
+pop_messages(LUser, LServer) ->
+    US = {LUser, LServer},
+    F = fun () ->
+               Rs = mnesia:wread({offline_msg, US}),
+               mnesia:delete({offline_msg, US}),
+               Rs
+       end,
+    case mnesia:transaction(F) of
+       {atomic, L} ->
+           {ok, lists:keysort(#offline_msg.timestamp, L)};
+       {aborted, Reason} ->
+           {error, Reason}
+    end.
+
+remove_expired_messages(_LServer) ->
+    TimeStamp = p1_time_compat:timestamp(),
+    F = fun () ->
+               mnesia:write_lock_table(offline_msg),
+               mnesia:foldl(fun (Rec, _Acc) ->
+                                    case Rec#offline_msg.expire of
+                                        never -> ok;
+                                        TS ->
+                                            if TS < TimeStamp ->
+                                                    mnesia:delete_object(Rec);
+                                               true -> ok
+                                            end
+                                    end
+                            end,
+                            ok, offline_msg)
+       end,
+    mnesia:transaction(F).
+
+remove_old_messages(Days, _LServer) ->
+    S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days,
+    MegaSecs1 = S div 1000000,
+    Secs1 = S rem 1000000,
+    TimeStamp = {MegaSecs1, Secs1, 0},
+    F = fun () ->
+               mnesia:write_lock_table(offline_msg),
+               mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec,
+                                 _Acc)
+                                    when TS < TimeStamp ->
+                                    mnesia:delete_object(Rec);
+                                (_Rec, _Acc) -> ok
+                            end,
+                            ok, offline_msg)
+       end,
+    mnesia:transaction(F).
+
+remove_user(LUser, LServer) ->
+    US = {LUser, LServer},
+    F = fun () -> mnesia:delete({offline_msg, US}) end,
+    mnesia:transaction(F).
+
+read_message_headers(LUser, LServer) ->
+    Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
+    Hdrs = lists:map(
+            fun(#offline_msg{from = From, to = To, packet = Pkt,
+                             timestamp = TS}) ->
+                    Seq = now_to_integer(TS),
+                    NewPkt = jlib:add_delay_info(Pkt, LServer, TS,
+                                                 <<"Offline Storage">>),
+                    {Seq, From, To, NewPkt}
+            end, Msgs),
+    lists:keysort(1, Hdrs).
+
+read_message(LUser, LServer, I) ->
+    US = {LUser, LServer},
+    TS = integer_to_now(I),
+    case mnesia:dirty_match_object(
+          offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of
+       [Msg|_] ->
+           {ok, Msg};
+       _ ->
+           error
+    end.
+
+remove_message(LUser, LServer, I) ->
+    US = {LUser, LServer},
+    TS = integer_to_now(I),
+    Msgs = mnesia:dirty_match_object(
+            offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}),
+    lists:foreach(
+      fun(Msg) ->
+             mnesia:dirty_delete_object(Msg)
+      end, Msgs).
+
+read_all_messages(LUser, LServer) ->
+    US = {LUser, LServer},
+    lists:keysort(#offline_msg.timestamp,
+                 mnesia:dirty_read({offline_msg, US})).
+
+remove_all_messages(LUser, LServer) ->
+    US = {LUser, LServer},
+    F = fun () ->
+               mnesia:write_lock_table(offline_msg),
+               lists:foreach(fun (Msg) -> mnesia:delete_object(Msg) end,
+                             mnesia:dirty_read({offline_msg, US}))
+       end,
+    mnesia:transaction(F).
+
+count_messages(LUser, LServer) ->
+    US = {LUser, LServer},
+    F = fun () ->
+               count_mnesia_records(US)
+       end,
+    case catch mnesia:async_dirty(F) of
+       I when is_integer(I) -> I;
+       _ -> 0
+    end.
+
+import(_LServer, #offline_msg{} = Msg) ->
+    mnesia:dirty_write(Msg).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+%% Return the number of records matching a given match expression.
+%% This function is intended to be used inside a Mnesia transaction.
+%% The count has been written to use the fewest possible memory by
+%% getting the record by small increment and by using continuation.
+-define(BATCHSIZE, 100).
+
+count_mnesia_records(US) ->
+    MatchExpression = #offline_msg{us = US,  _ = '_'},
+    case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}],
+                      ?BATCHSIZE, read) of
+       {Result, Cont} ->
+           Count = length(Result),
+           count_records_cont(Cont, Count);
+       '$end_of_table' ->
+           0
+    end.
+
+count_records_cont(Cont, Count) ->
+    case mnesia:select(Cont) of
+       {Result, Cont} ->
+           NewCount = Count + length(Result),
+           count_records_cont(Cont, NewCount);
+       '$end_of_table' ->
+           Count
+    end.
+
+jid_to_binary(#jid{user = U, server = S, resource = R,
+                   luser = LU, lserver = LS, lresource = LR}) ->
+    #jid{user = iolist_to_binary(U),
+         server = iolist_to_binary(S),
+         resource = iolist_to_binary(R),
+         luser = iolist_to_binary(LU),
+         lserver = iolist_to_binary(LS),
+         lresource = iolist_to_binary(LR)}.
+
+now_to_integer({MS, S, US}) ->
+    (MS * 1000000 + S) * 1000000 + US.
+
+integer_to_now(Int) ->
+    Secs = Int div 1000000,
+    USec = Int rem 1000000,
+    MSec = Secs div 1000000,
+    Sec = Secs rem 1000000,
+    {MSec, Sec, USec}.
+
+update_table() ->
+    Fields = record_info(fields, offline_msg),
+    case mnesia:table_info(offline_msg, attributes) of
+        Fields ->
+            ejabberd_config:convert_table_to_binary(
+              offline_msg, Fields, bag,
+              fun(#offline_msg{us = {U, _}}) -> U end,
+              fun(#offline_msg{us = {U, S},
+                               from = From,
+                               to = To,
+                               packet = El} = R) ->
+                      R#offline_msg{us = {iolist_to_binary(U),
+                                          iolist_to_binary(S)},
+                                    from = jid_to_binary(From),
+                                    to = jid_to_binary(To),
+                                    packet = fxml:to_xmlel(El)}
+              end);
+        _ ->
+            ?INFO_MSG("Recreating offline_msg table", []),
+            mnesia:transform_table(offline_msg, ignore, Fields)
+    end.
diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl
new file mode 100644 (file)
index 0000000..217e8f8
--- /dev/null
@@ -0,0 +1,153 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_riak).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+        remove_old_messages/2, remove_user/2, read_message_headers/2,
+        read_message/3, remove_message/3, read_all_messages/2,
+        remove_all_messages/2, count_messages/2, import/2]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+    ok.
+
+store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) ->
+    Count = if MaxOfflineMsgs =/= infinity ->
+                    Len + count_messages(User, Host);
+               true -> 0
+            end,
+    if
+        Count > MaxOfflineMsgs ->
+            {atomic, discard};
+        true ->
+           try
+               lists:foreach(
+                 fun(#offline_msg{us = US,
+                                  timestamp = TS} = M) ->
+                         ok = ejabberd_riak:put(
+                                M, offline_msg_schema(),
+                                [{i, TS}, {'2i', [{<<"us">>, US}]}])
+                 end, Msgs),
+               {atomic, ok}
+           catch _:{badmatch, Err} ->
+                   {atomic, Err}
+           end
+    end.
+
+pop_messages(LUser, LServer) ->
+    case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
+                                    <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+           try
+                lists:foreach(
+                  fun(#offline_msg{timestamp = T}) ->
+                          ok = ejabberd_riak:delete(offline_msg, T)
+                  end, Rs),
+               {ok, lists:keysort(#offline_msg.timestamp, Rs)}
+           catch _:{badmatch, Err} ->
+                   Err
+           end;
+       Err ->
+           Err
+    end.
+
+remove_expired_messages(_LServer) ->
+    %% TODO
+    {atomic, ok}.
+
+remove_old_messages(_Days, _LServer) ->
+    %% TODO
+    {atomic, ok}.
+
+remove_user(LUser, LServer) ->
+    {atomic, ejabberd_riak:delete_by_index(offline_msg,
+                                           <<"us">>, {LUser, LServer})}.
+
+read_message_headers(LUser, LServer) ->
+    case ejabberd_riak:get_by_index(
+           offline_msg, offline_msg_schema(),
+          <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+           Hdrs = lists:map(
+                    fun(#offline_msg{from = From, to = To, packet = Pkt,
+                                     timestamp = TS}) ->
+                            Seq = now_to_integer(TS),
+                            NewPkt = jlib:add_delay_info(
+                                       Pkt, LServer, TS, <<"Offline Storage">>),
+                            {Seq, From, To, NewPkt}
+                    end, Rs),
+           lists:keysort(1, Hdrs);
+       _Err ->
+           []
+    end.
+
+read_message(_LUser, _LServer, I) ->
+    TS = integer_to_now(I),
+    case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
+       {ok, Msg} ->
+           {ok, Msg};
+       _ ->
+           error
+    end.
+
+remove_message(_LUser, _LServer, I) ->
+    TS = integer_to_now(I),
+    ejabberd_riak:delete(offline_msg, TS),
+    ok.
+
+read_all_messages(LUser, LServer) ->
+    case ejabberd_riak:get_by_index(
+           offline_msg, offline_msg_schema(),
+          <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+            lists:keysort(#offline_msg.timestamp, Rs);
+        _Err ->
+            []
+    end.
+
+remove_all_messages(LUser, LServer) ->
+    Res = ejabberd_riak:delete_by_index(offline_msg,
+                                        <<"us">>, {LUser, LServer}),
+    {atomic, Res}.
+
+count_messages(LUser, LServer) ->
+    case ejabberd_riak:count_by_index(
+           offline_msg, <<"us">>, {LUser, LServer}) of
+        {ok, Res} ->
+            Res;
+        _ ->
+            0
+    end.
+
+import(_LServer, #offline_msg{us = US, timestamp = TS} = M) ->
+    ejabberd_riak:put(M, offline_msg_schema(),
+                     [{i, TS}, {'2i', [{<<"us">>, US}]}]).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+offline_msg_schema() ->
+    {record_info(fields, offline_msg), #offline_msg{}}.
+
+now_to_integer({MS, S, US}) ->
+    (MS * 1000000 + S) * 1000000 + US.
+
+integer_to_now(Int) ->
+    Secs = Int div 1000000,
+    USec = Int rem 1000000,
+    MSec = Secs div 1000000,
+    Sec = Secs rem 1000000,
+    {MSec, Sec, USec}.
diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl
new file mode 100644 (file)
index 0000000..37b9016
--- /dev/null
@@ -0,0 +1,252 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_sql).
+
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+        remove_old_messages/2, remove_user/2, read_message_headers/2,
+        read_message/3, remove_message/3, read_all_messages/2,
+        remove_all_messages/2, count_messages/2, import/1, import/2,
+        export/1]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+    ok.
+
+store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) ->
+    Count = if MaxOfflineMsgs =/= infinity ->
+                   Len + count_messages(User, Host);
+              true -> 0
+           end,
+    if Count > MaxOfflineMsgs -> {atomic, discard};
+       true ->
+           Query = lists:map(
+                     fun(M) ->
+                             Username =
+                                 ejabberd_odbc:escape((M#offline_msg.to)#jid.luser),
+                             From = M#offline_msg.from,
+                             To = M#offline_msg.to,
+                             Packet =
+                                 jlib:replace_from_to(From, To,
+                                                      M#offline_msg.packet),
+                             NewPacket =
+                                 jlib:add_delay_info(Packet, Host,
+                                                     M#offline_msg.timestamp,
+                                                     <<"Offline Storage">>),
+                             XML =
+                                 ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)),
+                                    odbc_queries:add_spool_sql(Username, XML)
+                     end,
+                     Msgs),
+           odbc_queries:add_spool(Host, Query)
+    end.
+
+pop_messages(LUser, LServer) ->
+    case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of
+       {atomic, {selected, Rs}} ->
+           {ok, lists:flatmap(
+                  fun({_, XML}) ->
+                          case xml_to_offline_msg(XML) of
+                              {ok, Msg} ->
+                                  [Msg];
+                              _Err ->
+                                  []
+                          end
+                  end, Rs)};
+       Err ->
+           {error, Err}
+    end.
+
+remove_expired_messages(_LServer) ->
+    %% TODO
+    {atomic, ok}.
+
+remove_old_messages(Days, LServer) ->
+    case catch ejabberd_odbc:sql_query(
+                LServer,
+                [<<"DELETE FROM spool"
+                  " WHERE created_at < "
+                  "DATE_SUB(CURDATE(), INTERVAL ">>,
+                 integer_to_list(Days), <<" DAY);">>]) of
+       {updated, N} ->
+           ?INFO_MSG("~p message(s) deleted from offline spool", [N]);
+       _Error ->
+           ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error])
+    end,
+    {atomic, ok}.
+
+remove_user(LUser, LServer) ->
+    odbc_queries:del_spool_msg(LServer, LUser).
+
+read_message_headers(LUser, LServer) ->
+    Username = ejabberd_odbc:escape(LUser),
+    case catch ejabberd_odbc:sql_query(
+                LServer, [<<"select xml, seq from spool where username ='">>,
+                          Username, <<"' order by seq;">>]) of
+       {selected, [<<"xml">>, <<"seq">>], Rows} ->
+           lists:flatmap(
+             fun([XML, Seq]) ->
+                     case xml_to_offline_msg(XML) of
+                         {ok, #offline_msg{from = From,
+                                           to = To,
+                                           packet = El}} ->
+                             Seq0 = binary_to_integer(Seq),
+                             [{Seq0, From, To, El}];
+                         _ ->
+                             []
+                     end
+             end, Rows);
+       _Err ->
+           []
+    end.
+
+read_message(LUser, LServer, Seq) ->
+    Username = ejabberd_odbc:escape(LUser),
+    SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)),
+    case ejabberd_odbc:sql_query(
+          LServer,
+          [<<"select xml from spool  where username='">>, Username,
+           <<"'  and seq='">>, SSeq, <<"';">>]) of
+       {selected, [<<"xml">>], [[RawXML]|_]} ->
+           case xml_to_offline_msg(RawXML) of
+               {ok, Msg} ->
+                   {ok, Msg};
+               _ ->
+                   error
+           end;
+       _ ->
+           error
+    end.
+
+remove_message(LUser, LServer, Seq) ->
+    Username = ejabberd_odbc:escape(LUser),
+    SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)),
+    ejabberd_odbc:sql_query(
+      LServer,
+      [<<"delete from spool  where username='">>, Username,
+       <<"'  and seq='">>, SSeq, <<"';">>]),
+    ok.
+
+read_all_messages(LUser, LServer) ->
+    case catch ejabberd_odbc:sql_query(
+                 LServer,
+                 ?SQL("select @(xml)s from spool where "
+                      "username=%(LUser)s order by seq")) of
+        {selected, Rs} ->
+            lists:flatmap(
+              fun({XML}) ->
+                     case xml_to_offline_msg(XML) of
+                         {ok, Msg} -> [Msg];
+                         _ -> []
+                     end
+              end, Rs);
+        _ ->
+           []
+    end.
+
+remove_all_messages(LUser, LServer) ->
+    odbc_queries:del_spool_msg(LServer, LUser),
+    {atomic, ok}.
+
+count_messages(LUser, LServer) ->
+    case catch ejabberd_odbc:sql_query(
+                 LServer,
+                 ?SQL("select @(count(*))d from spool "
+                      "where username=%(LUser)s")) of
+        {selected, [{Res}]} ->
+            Res;
+        _ -> 0
+    end.
+
+export(_Server) ->
+    [{offline_msg,
+      fun(Host, #offline_msg{us = {LUser, LServer},
+                             timestamp = TimeStamp, from = From, to = To,
+                             packet = Packet})
+            when LServer == Host ->
+              Username = ejabberd_odbc:escape(LUser),
+              Packet1 = jlib:replace_from_to(From, To, Packet),
+              Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp,
+                                            <<"Offline Storage">>),
+              XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)),
+              [[<<"delete from spool where username='">>, Username, <<"';">>],
+               [<<"insert into spool(username, xml) values ('">>,
+                Username, <<"', '">>, XML, <<"');">>]];
+         (_Host, _R) ->
+              []
+      end}].
+
+import(LServer) ->
+    [{<<"select username, xml from spool;">>,
+      fun([LUser, XML]) ->
+              El = #xmlel{} = fxml_stream:parse_element(XML),
+              From = #jid{} = jid:from_string(
+                                fxml:get_attr_s(<<"from">>, El#xmlel.attrs)),
+              To = #jid{} = jid:from_string(
+                              fxml:get_attr_s(<<"to">>, El#xmlel.attrs)),
+              Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
+                                          {attr, <<"stamp">>}]),
+              TS = case jlib:datetime_string_to_timestamp(Stamp) of
+                       {_, _, _} = Now ->
+                           Now;
+                       undefined ->
+                           p1_time_compat:timestamp()
+                   end,
+              Expire = mod_offline:find_x_expire(TS, El#xmlel.children),
+              #offline_msg{us = {LUser, LServer},
+                           from = From, to = To,
+                          packet = El,
+                           timestamp = TS, expire = Expire}
+      end}].
+
+import(_, _) ->
+    pass.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+xml_to_offline_msg(XML) ->
+    case fxml_stream:parse_element(XML) of
+       #xmlel{} = El ->
+           el_to_offline_msg(El);
+       Err ->
+           ?ERROR_MSG("got ~p when parsing XML packet ~s",
+                      [Err, XML]),
+           Err
+    end.
+
+el_to_offline_msg(El) ->
+    To_s = fxml:get_tag_attr_s(<<"to">>, El),
+    From_s = fxml:get_tag_attr_s(<<"from">>, El),
+    To = jid:from_string(To_s),
+    From = jid:from_string(From_s),
+    if To == error ->
+           ?ERROR_MSG("failed to get 'to' JID from offline XML ~p", [El]),
+           {error, bad_jid_to};
+       From == error ->
+           ?ERROR_MSG("failed to get 'from' JID from offline XML ~p", [El]),
+           {error, bad_jid_from};
+       true ->
+           {ok, #offline_msg{us = {To#jid.luser, To#jid.lserver},
+                             from = From,
+                             to = To,
+                             timestamp = undefined,
+                             expire = undefined,
+                             packet = El}}
+    end.