]> granicus.if.org Git - ejabberd/commitdiff
Add more tests for offline storage
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Tue, 8 Nov 2016 12:15:19 +0000 (15:15 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Tue, 8 Nov 2016 12:15:19 +0000 (15:15 +0300)
src/ejabberd_sm.erl
src/mod_offline.erl
src/mod_offline_mnesia.erl
src/xmpp_util.erl
test/ejabberd_SUITE.erl
test/ejabberd_SUITE_data/ejabberd.yml
test/offline_tests.erl [new file with mode: 0644]
test/suite.erl

index 6f6a196e56c50a39a4037d379669bb623c7585c4..0655bbcf3e8d8d5411c12a9b157cca117935b64f 100644 (file)
@@ -498,7 +498,7 @@ do_route(From, #jid{lresource = <<"">>} = To, #presence{} = Packet) ->
       end, get_user_present_resources(LUser, LServer));
 do_route(From, #jid{lresource = <<"">>} = To, #message{type = T} = Packet) ->
     ?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]),
-    if T == chat; T == headline; T == normal ->
+    if T == chat; T == headline; T == normal; T == groupchat ->
            route_message(From, To, Packet, T);
        true ->
            Lang = xmpp:get_lang(Packet),
@@ -516,7 +516,8 @@ do_route(From, To, Packet) ->
     case online(Mod:get_sessions(LUser, LServer, LResource)) of
        [] ->
            case Packet of
-               #message{type = T} when T == chat; T == normal ->
+               #message{type = T} when T == chat; T == normal;
+                                       T == headline; T == groupchat ->
                    route_message(From, To, Packet, T);
                #presence{} ->
                    ?DEBUG("dropping presence to unavalable resource:~n~s",
@@ -586,20 +587,16 @@ route_message(From, To, Packet, Type) ->
                        end,
                        PrioRes);
       _ ->
-         case Type of
-           headline -> ok;
-           _ ->
-               case ejabberd_auth:is_user_exists(LUser, LServer) andalso
-                   is_privacy_allow(From, To, Packet) of
-                 true ->
-                     ejabberd_hooks:run(offline_message_hook, LServer,
-                                        [From, To, Packet]);
-                 false ->
-                     Err = xmpp:make_error(Packet,
-                                           xmpp:err_service_unavailable()),
-                     ejabberd_router:route(To, From, Err)
-               end
-         end
+           case ejabberd_auth:is_user_exists(LUser, LServer) andalso
+               is_privacy_allow(From, To, Packet) of
+               true ->
+                   ejabberd_hooks:run(offline_message_hook, LServer,
+                                      [From, To, Packet]);
+               false ->
+                   Err = xmpp:make_error(Packet,
+                                         xmpp:err_service_unavailable()),
+                   ejabberd_router:route(To, From, Err)
+           end
     end.
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
index 240650234084b48b2915d9cd523b492c3258c4bb..6134823c110209e76f81fee97d4cdbfb405c5b29 100644 (file)
 -callback read_message_headers(binary(), binary()) -> any().
 -callback read_message(binary(), binary(), non_neg_integer()) ->
     {ok, #offline_msg{}} | error.
--callback remove_message(binary(), binary(), non_neg_integer()) -> ok.
+-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
 -callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
 -callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
 -callback count_messages(binary(), binary()) -> non_neg_integer().
@@ -315,32 +315,52 @@ get_info(Acc, _From, _To, _Node, _Lang) ->
     Acc.
 
 -spec handle_offline_query(iq()) -> iq().
+handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1},
+                        to = #jid{luser = U2, lserver = S2},
+                        lang = Lang,
+                        sub_els = [#offline{}]} = IQ)
+  when {U1, S1} /= {U2, S2} ->
+    Txt = <<"Query to another users is forbidden">>,
+    xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang));
 handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From,
                         to = #jid{luser = U, lserver = S} = _To,
-                        type = Type,
-                        sub_els = [#offline{purge = Purge,
-                                            items = Items,
-                                            fetch = Fetch}]} = IQ) ->
-    case Type of
-       get ->
-           if Fetch -> handle_offline_fetch(From);
-              true -> handle_offline_items_view(From, Items)
+                        type = Type, lang = Lang,
+                        sub_els = [#offline{} = Offline]} = IQ) ->
+    case {Type, Offline} of
+       {get, #offline{fetch = true, items = [], purge = false}} ->
+           %% TODO: report database errors
+           handle_offline_fetch(From),
+           xmpp:make_iq_result(IQ);
+       {get, #offline{fetch = false, items = [_|_] = Items, purge = false}} ->
+           case handle_offline_items_view(From, Items) of
+               true -> xmpp:make_iq_result(IQ);
+               false -> xmpp:make_error(IQ, xmpp:err_item_not_found())
            end;
-       set ->
-           if Purge -> delete_all_msgs(U, S);
-              true -> handle_offline_items_remove(From, Items)
-           end
-    end,
-    xmpp:make_iq_result(IQ);
+       {set, #offline{fetch = false, items = [], purge = true}} ->
+           case delete_all_msgs(U, S) of
+               {atomic, ok} ->
+                   xmpp:make_iq_result(IQ);
+               _Err ->
+                   Txt = <<"Database failure">>,
+                   xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
+           end;
+       {set, #offline{fetch = false, items = [_|_] = Items, purge = false}} ->
+           case handle_offline_items_remove(From, Items) of
+               true -> xmpp:make_iq_result(IQ);
+               false -> xmpp:make_error(IQ, xmpp:err_item_not_found())
+           end;
+       _ ->
+           xmpp:make_error(IQ, xmpp:err_bad_request())
+    end;
 handle_offline_query(#iq{lang = Lang} = IQ) ->
-    Txt = <<"Query to another users is forbidden">>,
-    xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)).
+    Txt = <<"No module is handling this query">>,
+    xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)).
 
--spec handle_offline_items_view(jid(), [offline_item()]) -> ok.
+-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
 handle_offline_items_view(JID, Items) ->
     {U, S, R} = jid:tolower(JID),
-    lists:foreach(
-      fun(#offline_item{node = Node, action = view}) ->
+    lists:foldl(
+      fun(#offline_item{node = Node, action = view}, Acc) ->
              case fetch_msg_by_node(JID, Node) of
                  {ok, OfflineMsg} ->
                      case offline_msg_to_route(S, OfflineMsg) of
@@ -351,25 +371,22 @@ handle_offline_items_view(JID, Items) ->
                                      Pid ! {route, From, To, NewEl};
                                  none ->
                                      ok
-                             end;
+                             end,
+                             Acc or true;
                          error ->
-                             ok
+                             Acc or false
                      end;
                  error ->
-                     ok
-             end;
-        (_) ->
-             ok
-      end, Items).
+                     Acc or false
+             end
+      end, false, Items).
 
--spec handle_offline_items_remove(jid(), [offline_item()]) -> ok.
+-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
 handle_offline_items_remove(JID, Items) ->
-    lists:foreach(
-      fun(#offline_item{node = Node, action = remove}) ->
-             remove_msg_by_node(JID, Node);
-        (_) ->
-             ok
-      end, Items).
+    lists:foldl(
+      fun(#offline_item{node = Node, action = remove}, Acc) ->
+             Acc or remove_msg_by_node(JID, Node)
+      end, false, Items).
 
 -spec set_offline_tag(message(), binary()) -> message().
 set_offline_tag(Msg, Node) ->
@@ -401,23 +418,22 @@ fetch_msg_by_node(To, Seq) ->
            error
     end.
 
--spec remove_msg_by_node(jid(), binary()) -> ok.
+-spec remove_msg_by_node(jid(), binary()) -> boolean().
 remove_msg_by_node(To, Seq) ->
     case catch binary_to_integer(Seq) of
        I when is_integer(I), I>= 0 ->
            LUser = To#jid.luser,
            LServer = To#jid.lserver,
            Mod = gen_mod:db_mod(LServer, ?MODULE),
-           Mod:remove_message(LUser, LServer, I);
+           Mod:remove_message(LUser, LServer, I),
+           true;
        _ ->
-           ok
+           false
     end.
 
 -spec need_to_store(binary(), message()) -> boolean().
 need_to_store(_LServer, #message{type = error}) -> false;
-need_to_store(_LServer, #message{type = groupchat}) -> false;
-need_to_store(_LServer, #message{type = headline}) -> false;
-need_to_store(LServer, Packet) ->
+need_to_store(LServer, #message{type = Type} = Packet) ->
     case xmpp:has_subtag(Packet, #offline{}) of
        false ->
            case check_store_hint(Packet) of
@@ -425,6 +441,8 @@ need_to_store(LServer, Packet) ->
                    true;
                no_store ->
                    false;
+               none when Type == headline; Type == groupchat ->
+                   false;
                none ->
                    case gen_mod:get_module_opt(
                           LServer, ?MODULE, store_empty_body,
index e8db08ddf181537b8c2d07ed50ec0eabff7c5bc8..c9f088fa48074b46888262db1e21f57082e83603 100644 (file)
@@ -127,12 +127,16 @@ read_message(LUser, LServer, I) ->
 remove_message(LUser, LServer, I) ->
     US = {LUser, LServer},
     TS = integer_to_now(I),
-    Msgs = mnesia:dirty_match_object(
-            offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}),
-    lists:foreach(
-      fun(Msg) ->
-             mnesia:dirty_delete_object(Msg)
-      end, Msgs).
+    case mnesia:dirty_match_object(
+          offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of
+       [] ->
+           {error, notfound};
+       Msgs ->
+           lists:foreach(
+             fun(Msg) ->
+                     mnesia:dirty_delete_object(Msg)
+             end, Msgs)
+    end.
 
 read_all_messages(LUser, LServer) ->
     US = {LUser, LServer},
index 102d884125d9cb2d5f9a6889fb29c9f4fd9d9cb9..fb3bbc7ab41dfa12d2d5ea8e481c090f6b2ab4c2 100644 (file)
@@ -70,7 +70,7 @@ unwrap_carbon(Stanza) -> Stanza.
 is_standalone_chat_state(Stanza) ->
     case unwrap_carbon(Stanza) of
        #message{body = [], subject = [], sub_els = Els} ->
-           IgnoreNS = [?NS_CHATSTATES, ?NS_DELAY],
+           IgnoreNS = [?NS_CHATSTATES, ?NS_DELAY, ?NS_EVENT],
            Stripped = [El || El <- Els,
                              not lists:member(xmpp:get_ns(El), IgnoreNS)],
            Stripped == [];
index a13d801ea6af6eadd3f3fb54b5741a851c15daa8..121719cdfb862d1c06efc093b3ea5f7cc336fcbc 100644 (file)
@@ -399,14 +399,12 @@ db_tests(riak) ->
        privacy_tests:single_cases(),
        vcard,
        muc_tests:single_cases(),
+       offline_tests:master_slave_cases(),
        test_unregister]},
      muc_tests:master_slave_cases(),
      privacy_tests:master_slave_cases(),
      roster_tests:master_slave_cases(),
-     {test_flex_offline, [sequence],
-      [flex_offline_master, flex_offline_slave]},
-     {test_offline, [sequence],
-      [offline_master, offline_slave]},
+     offline_tests:master_slave_cases(),
      {test_announce, [sequence],
       [announce_master, announce_slave]},
      {test_vcard_xupdate, [parallel],
@@ -425,17 +423,15 @@ db_tests(DB) when DB == mnesia; DB == redis ->
        vcard,
        pubsub_single_tests(),
        muc_tests:single_cases(),
+       offline_tests:single_cases(),
        test_unregister]},
      muc_tests:master_slave_cases(),
      privacy_tests:master_slave_cases(),
      pubsub_multiple_tests(),
      roster_tests:master_slave_cases(),
+     offline_tests:master_slave_cases(),
      {test_mix, [parallel],
       [mix_master, mix_slave]},
-     {test_flex_offline, [sequence],
-      [flex_offline_master, flex_offline_slave]},
-     {test_offline, [sequence],
-      [offline_master, offline_slave]},
      {test_old_mam, [parallel],
       [mam_old_master, mam_old_slave]},
      {test_new_mam, [parallel],
@@ -465,17 +461,15 @@ db_tests(_) ->
        vcard,
        pubsub_single_tests(),
        muc_tests:single_cases(),
+       offline_tests:single_cases(),
        test_unregister]},
      muc_tests:master_slave_cases(),
      privacy_tests:master_slave_cases(),
      pubsub_multiple_tests(),
      roster_tests:master_slave_cases(),
+     offline_tests:master_slave_cases(),
      {test_mix, [parallel],
       [mix_master, mix_slave]},
-     {test_flex_offline, [sequence],
-      [flex_offline_master, flex_offline_slave]},
-     {test_offline, [sequence],
-      [offline_master, offline_slave]},
      {test_old_mam, [parallel],
       [mam_old_master, mam_old_slave]},
      {test_new_mam, [parallel],
@@ -2294,6 +2288,35 @@ muc_config_visitor_nickchange_master(Config) ->
 muc_config_visitor_nickchange_slave(Config) ->
     muc_tests:muc_config_visitor_nickchange_slave(Config).
 
+offline_feature_enabled(Config) ->
+    offline_tests:feature_enabled(Config).
+offline_check_identity(Config) ->
+    offline_tests:check_identity(Config).
+offline_send_non_existent(Config) ->
+    offline_tests:send_non_existent(Config).
+offline_view_non_existent(Config) ->
+    offline_tests:view_non_existent(Config).
+offline_remove_non_existent(Config) ->
+    offline_tests:remove_non_existent(Config).
+offline_view_non_integer(Config) ->
+    offline_tests:view_non_integer(Config).
+offline_remove_non_integer(Config) ->
+    offline_tests:remove_non_integer(Config).
+offline_malformed_iq(Config) ->
+    offline_tests:malformed_iq(Config).
+offline_wrong_user(Config) ->
+    offline_tests:wrong_user(Config).
+offline_unsupported_iq(Config) ->
+    offline_tests:unsupported_iq(Config).
+offline_flex_master(Config) ->
+    offline_tests:flex_master(Config).
+offline_flex_slave(Config) ->
+    offline_tests:flex_slave(Config).
+offline_send_all_master(Config) ->
+    offline_tests:send_all_master(Config).
+offline_send_all_slave(Config) ->
+    offline_tests:send_all_slave(Config).
+
 announce_master(Config) ->
     MyJID = my_jid(Config),
     ServerJID = server_jid(Config),
@@ -2317,155 +2340,6 @@ announce_slave(Config) ->
     send(Config, #message{to = MotdDelJID}),
     disconnect(Config).
 
-flex_offline_master(Config) ->
-    Peer = ?config(slave, Config),
-    LPeer = jid:remove_resource(Peer),
-    lists:foreach(
-      fun(I) ->
-             Body = integer_to_binary(I),
-             send(Config, #message{to = LPeer,
-                                   body = [#text{data = Body}],
-                                   subject = [#text{data = <<"subject">>}]})
-      end, lists:seq(1, 5)),
-    disconnect(Config).
-
-flex_offline_slave(Config) ->
-    MyJID = my_jid(Config),
-    MyBareJID = jid:remove_resource(MyJID),
-    Peer = ?config(master, Config),
-    Peer_s = jid:to_string(Peer),
-    true = is_feature_advertised(Config, ?NS_FLEX_OFFLINE),
-    %% Request disco#info
-    #iq{type = result,
-       sub_els = [#disco_info{
-                     node = ?NS_FLEX_OFFLINE,
-                     identities = Ids,
-                     features = Fts,
-                     xdata = [X]}]} =
-       send_recv(Config, #iq{type = get,
-                             sub_els = [#disco_info{
-                                           node = ?NS_FLEX_OFFLINE}]}),
-    %% Check if we have correct identities
-    true = lists:any(
-            fun(#identity{category = <<"automation">>,
-                          type = <<"message-list">>}) -> true;
-               (_) -> false
-            end, Ids),
-    %% Check if we have needed feature
-    true = lists:member(?NS_FLEX_OFFLINE, Fts),
-    %% Check xdata, the 'number_of_messages' should be 5
-    #xdata{type = result,
-          fields = [#xdata_field{type = hidden,
-                                 var = <<"FORM_TYPE">>},
-                    #xdata_field{var = <<"number_of_messages">>,
-                                 values = [<<"5">>]}]} = X,
-    %% Fetch headers,
-    #iq{type = result,
-       sub_els = [#disco_items{
-                     node = ?NS_FLEX_OFFLINE,
-                     items = DiscoItems}]} =
-       send_recv(Config, #iq{type = get,
-                             sub_els = [#disco_items{
-                                           node = ?NS_FLEX_OFFLINE}]}),
-    %% Check if headers are correct
-    Nodes = lists:sort(
-             lists:map(
-               fun(#disco_item{jid = J, name = P, node = N})
-                     when (J == MyBareJID) and (P == Peer_s) ->
-                       N
-               end, DiscoItems)),
-    %% Since headers are received we can send initial presence without a risk
-    %% of getting offline messages flood
-    #presence{from = MyJID} = send_recv(Config, #presence{}),
-    %% Check full fetch
-    #iq{type = result, sub_els = []} =
-       send_recv(Config, #iq{type = get, sub_els = [#offline{fetch = true}]}),
-    lists:foreach(
-      fun({I, N}) ->
-             Text = integer_to_binary(I),
-             #message{body = Body, sub_els = SubEls} = recv_message(Config),
-             [#text{data = Text}] = Body,
-             #offline{items = [#offline_item{node = N}]} =
-                 lists:keyfind(offline, 1, SubEls),
-             #delay{} = lists:keyfind(delay, 1, SubEls)
-      end, lists:zip(lists:seq(1, 5), Nodes)),
-    %% Fetch 2nd and 4th message
-    #iq{type = result, sub_els = []} =
-       send_recv(
-         Config,
-         #iq{type = get,
-             sub_els = [#offline{
-                           items = [#offline_item{
-                                       action = view,
-                                       node = lists:nth(2, Nodes)},
-                                    #offline_item{
-                                       action = view,
-                                       node = lists:nth(4, Nodes)}]}]}),
-    lists:foreach(
-      fun({I, N}) ->
-             Text = integer_to_binary(I),
-             #message{body = [#text{data = Text}],
-                      sub_els = SubEls} = recv_message(Config),
-             #offline{items = [#offline_item{node = N}]} =
-                 lists:keyfind(offline, 1, SubEls)
-      end, lists:zip([2, 4], [lists:nth(2, Nodes), lists:nth(4, Nodes)])),
-    %% Delete 2nd and 4th message
-    #iq{type = result, sub_els = []} =
-       send_recv(
-         Config,
-         #iq{type = set,
-             sub_els = [#offline{
-                           items = [#offline_item{
-                                       action = remove,
-                                       node = lists:nth(2, Nodes)},
-                                    #offline_item{
-                                       action = remove,
-                                       node = lists:nth(4, Nodes)}]}]}),
-    %% Check if messages were deleted
-    #iq{type = result,
-       sub_els = [#disco_items{
-                     node = ?NS_FLEX_OFFLINE,
-                     items = RemainedItems}]} =
-       send_recv(Config, #iq{type = get,
-                             sub_els = [#disco_items{
-                                           node = ?NS_FLEX_OFFLINE}]}),
-    RemainedNodes = [lists:nth(1, Nodes),
-                    lists:nth(3, Nodes),
-                    lists:nth(5, Nodes)],
-    RemainedNodes = lists:sort(
-                     lists:map(
-                       fun(#disco_item{node = N}) -> N end,
-                       RemainedItems)),
-    %% Purge everything left
-    #iq{type = result, sub_els = []} =
-       send_recv(Config, #iq{type = set, sub_els = [#offline{purge = true}]}),
-    %% Check if there is no offline messages
-    #iq{type = result,
-       sub_els = [#disco_items{node = ?NS_FLEX_OFFLINE, items = []}]} =
-       send_recv(Config, #iq{type = get,
-                             sub_els = [#disco_items{
-                                           node = ?NS_FLEX_OFFLINE}]}),
-    disconnect(Config).
-
-offline_master(Config) ->
-    Peer = ?config(slave, Config),
-    LPeer = jid:remove_resource(Peer),
-    send(Config, #message{to = LPeer,
-                          body = [#text{data = <<"body">>}],
-                          subject = [#text{data = <<"subject">>}]}),
-    disconnect(Config).
-
-offline_slave(Config) ->
-    Peer = ?config(master, Config),
-    #presence{} = send_recv(Config, #presence{}),
-    #message{sub_els = SubEls,
-            from = Peer,
-            body = [#text{data = <<"body">>}],
-            subject = [#text{data = <<"subject">>}]} =
-       recv_message(Config),
-    true = lists:keymember(delay, 1, SubEls),
-    disconnect(Config).
-
 carbons_master(Config) ->
     MyJID = my_jid(Config),
     MyBareJID = jid:remove_resource(MyJID),
index 3a6d4947f972a9dd29baa43adc35cfd1fbc3b581..29243d6837de415ed0331d829f5f263df6d15d78 100644 (file)
@@ -388,8 +388,7 @@ access:
   local: 
     local: allow
   max_user_offline_messages: 
-    admin: 5000
-    all: 100
+    all: infinity
   max_user_sessions: 
     all: 10
   muc: 
@@ -459,4 +458,4 @@ s2s_use_starttls: false
 s2s_cafile: CAFILE
 shaper: 
   fast: 50000
-  normal: 1000
+  normal: 10000
diff --git a/test/offline_tests.erl b/test/offline_tests.erl
new file mode 100644 (file)
index 0000000..ea34544
--- /dev/null
@@ -0,0 +1,406 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created :  7 Nov 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(offline_tests).
+
+%% API
+-compile(export_all).
+-import(suite, [send/2, disconnect/1, my_jid/1, send_recv/2, recv_message/1,
+               get_features/1, recv/1, get_event/1, server_jid/1,
+               wait_for_master/1, wait_for_slave/1]).
+-include("suite.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+single_cases() ->
+    {offline_single, [sequence],
+     [single_test(feature_enabled),
+      single_test(check_identity),
+      single_test(send_non_existent),
+      single_test(view_non_existent),
+      single_test(remove_non_existent),
+      single_test(view_non_integer),
+      single_test(remove_non_integer),
+      single_test(malformed_iq),
+      single_test(wrong_user),
+      single_test(unsupported_iq)]}.
+
+feature_enabled(Config) ->
+    Features = get_features(Config),
+    ct:comment("Checking if offline features are set"),
+    true = lists:member(?NS_FEATURE_MSGOFFLINE, Features),
+    true = lists:member(?NS_FLEX_OFFLINE, Features),
+    disconnect(Config).
+
+check_identity(Config) ->
+    #iq{type = result,
+       sub_els = [#disco_info{
+                     node = ?NS_FLEX_OFFLINE,
+                     identities = Ids}]} =
+       send_recv(Config, #iq{type = get,
+                             sub_els = [#disco_info{
+                                           node = ?NS_FLEX_OFFLINE}]}),
+    true = lists:any(
+            fun(#identity{category = <<"automation">>,
+                          type = <<"message-list">>}) -> true;
+               (_) -> false
+            end, Ids),
+    disconnect(Config).
+
+send_non_existent(Config) ->
+    Server = ?config(server, Config),
+    To = jid:make(<<"non-existent">>, Server),
+    #message{type = error} = Err = send_recv(Config, #message{to = To}),
+    #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err),
+    disconnect(Config).
+
+view_non_existent(Config) ->
+    #stanza_error{reason = 'item-not-found'} = view(Config, [randoms:get_string()], false),
+    disconnect(Config).
+
+remove_non_existent(Config) ->
+    ok = remove(Config, [randoms:get_string()]),
+    disconnect(Config).
+
+view_non_integer(Config) ->
+    #stanza_error{reason = 'item-not-found'} = view(Config, [<<"foo">>], false),
+    disconnect(Config).
+
+remove_non_integer(Config) ->
+    #stanza_error{reason = 'item-not-found'} = remove(Config, [<<"foo">>]),
+    disconnect(Config).
+
+malformed_iq(Config) ->
+    Item = #offline_item{node = randoms:get_string()},
+    Range = [{Type, SubEl} || Type <- [set, get],
+                             SubEl <- [#offline{items = [], _ = false},
+                                       #offline{items = [Item], _ = true}]]
+       ++ [{set, #offline{items = [], fetch = true, purge = false}},
+           {set, #offline{items = [Item], fetch = true, purge = false}},
+           {get, #offline{items = [], fetch = false, purge = true}},
+           {get, #offline{items = [Item], fetch = false, purge = true}}],
+    lists:foreach(
+      fun({Type, SubEl}) ->
+             #iq{type = error} = Err =
+                 send_recv(Config, #iq{type = Type, sub_els = [SubEl]}),
+             #stanza_error{reason = 'bad-request'} = xmpp:get_error(Err)
+      end, Range),
+    disconnect(Config).
+
+wrong_user(Config) ->
+    Server = ?config(server, Config),
+    To = jid:make(<<"foo">>, Server),
+    Item = #offline_item{node = randoms:get_string()},
+    Range = [{Type, Items, Purge, Fetch} ||
+               Type <- [set, get],
+               Items <- [[], [Item]],
+               Purge <- [false, true],
+               Fetch <- [false, true]],
+    lists:foreach(
+      fun({Type, Items, Purge, Fetch}) ->
+             #iq{type = error} = Err =
+                 send_recv(Config, #iq{type = Type, to = To,
+                                       sub_els = [#offline{items = Items,
+                                                           purge = Purge,
+                                                           fetch = Fetch}]}),
+             #stanza_error{reason = 'forbidden'} = xmpp:get_error(Err)
+      end, Range),
+    disconnect(Config).
+
+unsupported_iq(Config) ->
+    Item = #offline_item{node = randoms:get_string()},
+    lists:foreach(
+      fun(Type) ->
+             #iq{type = error} = Err =
+                 send_recv(Config, #iq{type = Type, sub_els = [Item]}),
+             #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err)
+      end, [set, get]),
+    disconnect(Config).
+
+%%%===================================================================
+%%% Master-slave tests
+%%%===================================================================
+master_slave_cases() ->
+    {offline_master_slave, [sequence],
+     [master_slave_test(flex),
+      master_slave_test(send_all)]}.
+
+flex_master(Config) ->
+    send_messages(Config, 5),
+    disconnect(Config).
+
+flex_slave(Config) ->
+    wait_for_master(Config),
+    peer_down = get_event(Config),
+    5 = get_number(Config),
+    Nodes = get_nodes(Config),
+    %% Since headers are received we can send initial presence without a risk
+    %% of getting offline messages flood
+    #presence{} = send_recv(Config, #presence{}),
+    ct:comment("Checking fetch"),
+    Nodes = fetch(Config, lists:seq(1, 5)),
+    ct:comment("Fetching 2nd and 4th message"),
+    [2, 4] = view(Config, [lists:nth(2, Nodes), lists:nth(4, Nodes)]),
+    ct:comment("Deleting 2nd and 4th message"),
+    ok = remove(Config, [lists:nth(2, Nodes), lists:nth(4, Nodes)]),
+    ct:comment("Checking if messages were deleted"),
+    [1, 3, 5] = view(Config, [lists:nth(1, Nodes),
+                             lists:nth(3, Nodes),
+                             lists:nth(5, Nodes)]),
+    ct:comment("Purging everything left"),
+    ok = purge(Config),
+    ct:comment("Checking if there are no offline messages"),
+    0 = get_number(Config),
+    clean(disconnect(Config)).
+
+send_all_master(Config) ->
+    wait_for_slave(Config),
+    Peer = ?config(peer, Config),
+    BarePeer = jid:remove_resource(Peer),
+    {Deliver, Errors} = message_iterator(Config),
+    N = lists:foldl(
+         fun(#message{type = error} = Msg, Acc) ->
+                 send(Config, Msg#message{to = BarePeer}),
+                 Acc;
+            (Msg, Acc) ->
+                 I = send(Config, Msg#message{to = BarePeer}),
+                 case xmpp:get_subtag(Msg, #xevent{}) of
+                     #xevent{offline = true, id = undefined} ->
+                         ct:comment("Receiving event-reply for:~n~s",
+                                    [xmpp:pp(Msg)]),
+                         #message{} = Reply = recv_message(Config),
+                         #xevent{id = I} = xmpp:get_subtag(Reply, #xevent{});
+                     _ ->
+                         ok
+                 end,
+                 Acc + 1
+         end, 0, Deliver),
+    lists:foreach(
+      fun(Msg) ->
+             #message{type = error} = Err =
+                 send_recv(Config, Msg#message{to = BarePeer}),
+             #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err)
+      end, Errors),
+    ok = wait_for_complete(Config, N),
+    disconnect(Config).
+
+send_all_slave(Config) ->
+    ServerJID = server_jid(Config),
+    Peer = ?config(peer, Config),
+    wait_for_master(Config),
+    peer_down = get_event(Config),
+    #presence{} = send_recv(Config, #presence{}),
+    {Deliver, _Errors} = message_iterator(Config),
+    lists:foreach(
+      fun(#message{type = error}) ->
+             ok;
+        (#message{type = Type, body = Body, subject = Subject} = Msg) ->
+             ct:comment("Receiving message:~n~s", [xmpp:pp(Msg)]),
+             #message{from = Peer,
+                      type = Type,
+                      body = Body,
+                      subject = Subject} = RecvMsg = recv_message(Config),
+             ct:comment("Checking if delay tag is correctly set"),
+             #delay{from = ServerJID} = xmpp:get_subtag(RecvMsg, #delay{})
+      end, Deliver),
+    disconnect(Config).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+single_test(T) ->
+    list_to_atom("offline_" ++ atom_to_list(T)).
+
+master_slave_test(T) ->
+    {list_to_atom("offline_" ++ atom_to_list(T)), [parallel],
+     [list_to_atom("offline_" ++ atom_to_list(T) ++ "_master"),
+      list_to_atom("offline_" ++ atom_to_list(T) ++ "_slave")]}.
+
+clean(Config) ->
+    {U, S, _} = jid:tolower(my_jid(Config)),
+    mod_offline:remove_user(U, S),
+    Config.
+
+send_messages(Config, Num) ->
+    send_messages(Config, Num, normal, []).
+
+send_messages(Config, Num, Type, SubEls) ->
+    wait_for_slave(Config),
+    Peer = ?config(peer, Config),
+    BarePeer = jid:remove_resource(Peer),
+    lists:foreach(
+      fun(I) ->
+             Body = integer_to_binary(I),
+             send(Config,
+                  #message{to = BarePeer,
+                           type = Type,
+                           body = [#text{data = Body}],
+                           subject = [#text{data = <<"subject">>}],
+                           sub_els = SubEls})
+      end, lists:seq(1, Num)),
+    ct:comment("Waiting for all messages to be delivered to offline spool"),
+    ok = wait_for_complete(Config, Num).
+
+recv_messages(Config, Num) ->
+    wait_for_master(Config),
+    peer_down = get_event(Config),
+    Peer = ?config(peer, Config),
+    #presence{} = send_recv(Config, #presence{}),
+    lists:foreach(
+      fun(I) ->
+             Text = integer_to_binary(I),
+             #message{sub_els = SubEls,
+                      from = Peer,
+                      body = [#text{data = Text}],
+                      subject = [#text{data = <<"subject">>}]} =
+                 recv_message(Config),
+             true = lists:keymember(delay, 1, SubEls)
+      end, lists:seq(1, Num)),
+    clean(disconnect(Config)).
+
+get_number(Config) ->
+    ct:comment("Getting offline message number"),
+    #iq{type = result,
+       sub_els = [#disco_info{
+                     node = ?NS_FLEX_OFFLINE,
+                     xdata = [X]}]} =
+       send_recv(Config, #iq{type = get,
+                             sub_els = [#disco_info{
+                                           node = ?NS_FLEX_OFFLINE}]}),
+    Form = flex_offline:decode(X#xdata.fields),
+    proplists:get_value(number_of_messages, Form).
+
+get_nodes(Config) ->
+    MyJID = my_jid(Config),
+    MyBareJID = jid:remove_resource(MyJID),
+    Peer = ?config(peer, Config),
+    Peer_s = jid:to_string(Peer),
+    ct:comment("Getting headers"), 
+    #iq{type = result,
+       sub_els = [#disco_items{
+                     node = ?NS_FLEX_OFFLINE,
+                     items = DiscoItems}]} =
+       send_recv(Config, #iq{type = get,
+                             sub_els = [#disco_items{
+                                           node = ?NS_FLEX_OFFLINE}]}),
+    ct:comment("Checking if headers are correct"),
+    lists:sort(
+      lists:map(
+       fun(#disco_item{jid = J, name = P, node = N})
+             when (J == MyBareJID) and (P == Peer_s) ->
+               N
+       end, DiscoItems)).
+
+fetch(Config, Range) ->
+    ID = send(Config, #iq{type = get, sub_els = [#offline{fetch = true}]}),
+    Nodes = lists:map(
+             fun(I) ->
+                     Text = integer_to_binary(I),
+                     #message{body = Body, sub_els = SubEls} = recv(Config),
+                     [#text{data = Text}] = Body,
+                     #offline{items = [#offline_item{node = Node}]} =
+                         lists:keyfind(offline, 1, SubEls),
+                     #delay{} = lists:keyfind(delay, 1, SubEls),
+                     Node
+             end, Range),
+    #iq{id = ID, type = result, sub_els = []} = recv(Config),
+    Nodes.
+
+view(Config, Nodes) ->
+    view(Config, Nodes, true).
+
+view(Config, Nodes, NeedReceive) ->
+    Items = lists:map(
+             fun(Node) ->
+                     #offline_item{action = view, node = Node}
+             end, Nodes),
+    I = send(Config,
+            #iq{type = get, sub_els = [#offline{items = Items}]}),
+    Range = if NeedReceive ->
+                   lists:map(
+                     fun(Node) ->
+                             #message{body = [#text{data = Text}],
+                                      sub_els = SubEls} = recv(Config),
+                             #offline{items = [#offline_item{node = Node}]} =
+                                 lists:keyfind(offline, 1, SubEls),
+                             binary_to_integer(Text)
+                     end, Nodes);
+              true ->
+                   []
+           end,
+    case recv(Config) of
+       #iq{id = I, type = result, sub_els = []} -> Range;
+       #iq{id = I, type = error} = Err -> xmpp:get_error(Err)
+    end.
+
+remove(Config, Nodes) ->
+    Items = lists:map(
+             fun(Node) ->
+                     #offline_item{action = remove, node = Node}
+             end, Nodes),
+    case send_recv(Config, #iq{type = set,
+                              sub_els = [#offline{items = Items}]}) of
+       #iq{type = result, sub_els = []} ->
+           ok;
+       #iq{type = error} = Err ->
+           xmpp:get_error(Err)
+    end.
+
+purge(Config) ->
+    case send_recv(Config, #iq{type = set,
+                              sub_els = [#offline{purge = true}]}) of
+       #iq{type = result, sub_els = []} ->
+           ok;
+       #iq{type = error} = Err ->
+           xmpp:get_error(Err)
+    end.
+
+wait_for_complete(_Config, 0) ->
+    ok;
+wait_for_complete(Config, N) ->
+    {U, S, _} = jid:tolower(?config(peer, Config)),
+    lists:foldl(
+      fun(_Time, ok) ->
+             ok;
+        (Time, Acc) ->
+             timer:sleep(Time),
+             case mod_offline:count_offline_messages(U, S) of
+                 N -> ok;
+                 _ -> Acc
+             end
+      end, error, [0, 100, 200, 2000, 5000, 10000]).
+
+message_iterator(Config) ->
+    ServerJID = server_jid(Config),
+    ChatStates = [[#chatstate{type = composing}]],
+    Offline = [[#offline{}]],
+    Hints = [[#hint{type = T}] || T <- [store, 'no-store']],
+    XEvent = [[#xevent{id = ID, offline = OfflineFlag}]
+             || ID <- [undefined, randoms:get_string()],
+                OfflineFlag <- [false, true]],
+    Delay = [[#delay{stamp = p1_time_compat:timestamp(), from = ServerJID}]],
+    AllEls = [Els1 ++ Els2 || Els1 <- [[]] ++ ChatStates ++ Delay ++ Hints ++ Offline,
+                             Els2 <- [[]] ++ XEvent],
+    All = [#message{type = Type, body = Body, subject = Subject, sub_els = Els}
+          || %%Type <- [chat],
+             Type <- [error, chat, normal, groupchat, headline],
+             Body <- [[], xmpp:mk_text(<<"body">>)],
+             Subject <- [[], xmpp:mk_text(<<"subject">>)],
+             Els <- AllEls],
+    lists:partition(
+      fun(#message{type = error}) -> true;
+        (#message{sub_els = [#offline{}|_]}) -> false;
+        (#message{sub_els = [_, #xevent{id = I}]}) when I /= undefined -> false;
+        (#message{sub_els = [#xevent{id = I}]}) when I /= undefined -> false;
+        (#message{sub_els = [#hint{type = store}|_]}) -> true;
+        (#message{sub_els = [#hint{type = 'no-store'}|_]}) -> false;
+        (#message{body = [], subject = []}) -> false;
+        (#message{type = Type}) -> (Type == chat) or (Type == normal);
+        (_) -> false
+      end, All).
index f88ac5a5e1b3334bc2cf9b7dec825cc152086ac0..52c030df117f7d7ddce09d30f3d165ca3bb7f845 100644 (file)
@@ -199,27 +199,31 @@ init_stream(Config) ->
                component -> ?NS_COMPONENT;
                server -> ?NS_SERVER
            end,
-    #stream_start{id = ID, xmlns = XMLNS, version = Version} = recv(Config),
-    set_opt(stream_id, ID, NewConfig).
+    receive
+       #stream_start{id = ID, xmlns = XMLNS, version = Version} ->
+           set_opt(stream_id, ID, NewConfig)
+    end.
 
 process_stream_features(Config) ->
-    #stream_features{sub_els = Fs} = recv(Config),
-    Mechs = lists:flatmap(
-              fun(#sasl_mechanisms{list = Ms}) ->
-                      Ms;
-                 (_) ->
-                      []
-              end, Fs),
-    lists:foldl(
-      fun(#feature_register{}, Acc) ->
-              set_opt(register, true, Acc);
-         (#starttls{}, Acc) ->
-              set_opt(starttls, true, Acc);
-         (#compression{methods = Ms}, Acc) ->
-              set_opt(compression, Ms, Acc);
-         (_, Acc) ->
-              Acc
-      end, set_opt(mechs, Mechs, Config), Fs).
+    receive
+       #stream_features{sub_els = Fs} ->
+           Mechs = lists:flatmap(
+                     fun(#sasl_mechanisms{list = Ms}) ->
+                             Ms;
+                        (_) ->
+                             []
+                     end, Fs),
+           lists:foldl(
+             fun(#feature_register{}, Acc) ->
+                     set_opt(register, true, Acc);
+                (#starttls{}, Acc) ->
+                     set_opt(starttls, true, Acc);
+                (#compression{methods = Ms}, Acc) ->
+                     set_opt(compression, Ms, Acc);
+                (_, Acc) ->
+                     Acc
+             end, set_opt(mechs, Mechs, Config), Fs)
+    end.
 
 disconnect(Config) ->
     ct:comment("Disconnecting"),
@@ -245,7 +249,7 @@ starttls(Config) ->
 
 starttls(Config, ShouldFail) ->
     send(Config, #starttls{}),
-    case recv(Config) of
+    receive
        #starttls_proceed{} when ShouldFail ->
            ct:fail(starttls_should_have_failed);
        #starttls_failure{} when ShouldFail ->
@@ -262,7 +266,7 @@ starttls(Config, ShouldFail) ->
 
 zlib(Config) ->
     send(Config, #compress{methods = [<<"zlib">>]}),
-    #compressed{} = recv(Config),
+    receive #compressed{} -> ok end,
     ZlibSocket = ejabberd_socket:compress(?config(socket, Config)),
     process_stream_features(init_stream(set_opt(socket, ZlibSocket, Config))).
 
@@ -376,7 +380,7 @@ auth_component(Config, ShouldFail) ->
     Password = ?config(password, Config),
     Digest = p1_sha:sha(<<StreamID/binary, Password/binary>>),
     send(Config, #handshake{data = Digest}),
-    case recv(Config) of
+    receive
        #handshake{} when ShouldFail ->
            ct:fail(component_auth_should_have_failed);
        #handshake{} ->
@@ -399,7 +403,7 @@ auth_SASL(Mech, Config, ShouldFail) ->
     wait_auth_SASL_result(set_opt(sasl, SASL, Config), ShouldFail).
 
 wait_auth_SASL_result(Config, ShouldFail) ->
-    case recv(Config) of
+    receive
        #sasl_success{} when ShouldFail ->
            ct:fail(sasl_auth_should_have_failed);
         #sasl_success{} ->
@@ -409,24 +413,25 @@ wait_auth_SASL_result(Config, ShouldFail) ->
            NS = if Type == client -> ?NS_CLIENT;
                    Type == server -> ?NS_SERVER
                 end,
-           #stream_start{xmlns = NS, version = {1,0}} = recv(Config),
-            #stream_features{sub_els = Fs} = recv(Config),
-           if Type == client ->
-                   #xmpp_session{optional = true} =
-                       lists:keyfind(xmpp_session, 1, Fs);
-              true ->
-                   ok
-           end,
-           lists:foldl(
-             fun(#feature_sm{}, ConfigAcc) ->
-                     set_opt(sm, true, ConfigAcc);
-                (#feature_csi{}, ConfigAcc) ->
-                     set_opt(csi, true, ConfigAcc);
-                (#rosterver_feature{}, ConfigAcc) ->
-                     set_opt(rosterver, true, ConfigAcc);
-                (_, ConfigAcc) ->
-                     ConfigAcc
-             end, Config, Fs);
+           receive #stream_start{xmlns = NS, version = {1,0}} -> ok end,
+            receive #stream_features{sub_els = Fs} ->
+                   if Type == client ->
+                           #xmpp_session{optional = true} =
+                               lists:keyfind(xmpp_session, 1, Fs);
+                      true ->
+                           ok
+                   end,
+                   lists:foldl(
+                     fun(#feature_sm{}, ConfigAcc) ->
+                             set_opt(sm, true, ConfigAcc);
+                        (#feature_csi{}, ConfigAcc) ->
+                             set_opt(csi, true, ConfigAcc);
+                        (#rosterver_feature{}, ConfigAcc) ->
+                             set_opt(rosterver, true, ConfigAcc);
+                        (_, ConfigAcc) ->
+                             ConfigAcc
+                     end, Config, Fs)
+           end;
         #sasl_challenge{text = ClientIn} ->
             {Response, SASL} = (?config(sasl, Config))(ClientIn),
             send(Config, #sasl_response{text = Response}),