]> granicus.if.org Git - ejabberd/commitdiff
XEP-0013: Flexible Offline Message Retrieval support
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Tue, 9 Feb 2016 14:59:54 +0000 (17:59 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Tue, 9 Feb 2016 14:59:54 +0000 (17:59 +0300)
include/ns.hrl
src/ejabberd_c2s.erl
src/mod_disco.erl
src/mod_offline.erl

index 372b68b7c54e418a97b9f7713fd8a69b71c058b8..6934195d957c0f6ea377f5c1caba2a5fa6c3f99a 100644 (file)
 -define(NS_FEATURE_COMPRESS,
        <<"http://jabber.org/features/compress">>).
 -define(NS_FEATURE_MSGOFFLINE, <<"msgoffline">>).
+-define(NS_FLEX_OFFLINE, <<"http://jabber.org/protocol/offline">>).
 -define(NS_COMPRESS,
        <<"http://jabber.org/protocol/compress">>).
 -define(NS_CAPS, <<"http://jabber.org/protocol/caps">>).
index 8959ae50e571d36d725a8025044d7d47c7dd1eba..b9f9d815d3154c1d738e546fe0dfb0672453ea20 100644 (file)
                mgmt_resend,
                mgmt_stanzas_in = 0,
                mgmt_stanzas_out = 0,
+               ask_offline = true,
                lang = <<"">>}).
 
 %-define(DBGFSM, true).
@@ -1737,6 +1738,8 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
                From, jid:make(USR), Packet)
       end, lists:usort(Recipients)),
     fsm_next_state(StateName, StateData);
+handle_info(dont_ask_offline, StateName, StateData) ->
+    fsm_next_state(StateName, StateData#state{ask_offline = false});
 handle_info(Info, StateName, StateData) ->
     ?ERROR_MSG("Unexpected info: ~p", [Info]),
     fsm_next_state(StateName, StateData).
@@ -2310,7 +2313,7 @@ process_privacy_iq(From, To,
     ejabberd_router:route(To, From, jlib:iq_to_xml(IQRes)),
     NewStateData.
 
-resend_offline_messages(StateData) ->
+resend_offline_messages(#state{ask_offline = true} = StateData) ->
     case ejabberd_hooks:run_fold(resend_offline_messages_hook,
                                 StateData#state.server, [],
                                 [StateData#state.user, StateData#state.server])
@@ -2331,7 +2334,9 @@ resend_offline_messages(StateData) ->
                                end
                        end,
                        Rs)
-    end.
+    end;
+resend_offline_messages(_StateData) ->
+    ok.
 
 resend_subscription_requests(#state{user = User,
                                    server = Server} = StateData) ->
index fc3397e17e645fb6c26ac04809a5fd420facf769..734e90d367f2d814946a7e9e0359a356132153ec 100644 (file)
@@ -382,6 +382,8 @@ process_sm_iq_info(From, To,
                Identity = ejabberd_hooks:run_fold(disco_sm_identity,
                                                   Host, [],
                                                   [From, To, Node, Lang]),
+               Info = ejabberd_hooks:run_fold(disco_info, Host, [],
+                                              [From, To, Node, Lang]),
                case ejabberd_hooks:run_fold(disco_sm_features, Host,
                                             empty, [From, To, Node, Lang])
                    of
@@ -397,7 +399,7 @@ process_sm_iq_info(From, To,
                                            [{<<"xmlns">>, ?NS_DISCO_INFO}
                                             | ANode],
                                        children =
-                                           Identity ++
+                                           Identity ++ Info ++
                                              features_to_xml(Features)}]};
                  {error, Error} ->
                      IQ#iq{type = error, sub_el = [SubEl, Error]}
index 28a8aa4ffc0e7f88e535cf777001eca8c6c9b0d1..5e566011bd95dd1c4d83ead3e519b9a58214cdfc 100644 (file)
         resend_offline_messages/2,
         pop_offline_messages/3,
         get_sm_features/5,
+        get_sm_identity/5,
+        get_sm_items/5,
+        get_info/5,
+        handle_offline_query/3,
         remove_expired_messages/1,
         remove_old_messages/2,
         remove_user/2,
@@ -113,6 +117,8 @@ init([Host, Opts]) ->
          update_table();
       _ -> ok
     end,
+    IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
+                            no_queue),
     ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
                       store_packet, 50),
     ejabberd_hooks:add(resend_offline_messages_hook, Host,
@@ -125,12 +131,19 @@ init([Host, Opts]) ->
                       ?MODULE, get_sm_features, 50),
     ejabberd_hooks:add(disco_local_features, Host,
                       ?MODULE, get_sm_features, 50),
+    ejabberd_hooks:add(disco_sm_identity, Host,
+                      ?MODULE, get_sm_identity, 50),
+    ejabberd_hooks:add(disco_sm_items, Host,
+                      ?MODULE, get_sm_items, 50),
+    ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50),
     ejabberd_hooks:add(webadmin_page_host, Host,
                       ?MODULE, webadmin_page, 50),
     ejabberd_hooks:add(webadmin_user, Host,
                       ?MODULE, webadmin_user, 50),
     ejabberd_hooks:add(webadmin_user_parse_query, Host,
                       ?MODULE, webadmin_user_parse_query, 50),
+    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
+                                 ?MODULE, handle_offline_query, IQDisc),
     AccessMaxOfflineMsgs =
        gen_mod:get_opt(access_max_user_messages, Opts,
                        fun(A) when is_atom(A) -> A end,
@@ -175,12 +188,16 @@ terminate(_Reason, State) ->
                          ?MODULE, remove_user, 50),
     ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
     ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50),
+    ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50),
+    ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50),
+    ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50),
     ejabberd_hooks:delete(webadmin_page_host, Host,
                          ?MODULE, webadmin_page, 50),
     ejabberd_hooks:delete(webadmin_user, Host,
                          ?MODULE, webadmin_user, 50),
     ejabberd_hooks:delete(webadmin_user_parse_query, Host,
                          ?MODULE, webadmin_user_parse_query, 50),
+    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
     ok.
 
 
@@ -276,38 +293,223 @@ get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
                {result, I} -> I;
                _ -> []
            end,
-    {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]};
+    {result, Feats ++ [?NS_FEATURE_MSGOFFLINE, ?NS_FLEX_OFFLINE]};
 
 get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) ->
     %% override all lesser features...
     {result, []};
 
+get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+               ?NS_FLEX_OFFLINE, _Lang) ->
+    {result, [?NS_FLEX_OFFLINE]};
+
 get_sm_features(Acc, _From, _To, _Node, _Lang) ->
     Acc.
 
+get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+               ?NS_FLEX_OFFLINE, _Lang) ->
+    Identity = #xmlel{name = <<"identity">>,
+                     attrs = [{<<"category">>, <<"automation">>},
+                              {<<"type">>, <<"message-list">>}]},
+    [Identity];
+get_sm_identity(Acc, _From, _To, _Node, _Lang) ->
+    Acc.
+
+get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
+            #jid{luser = U, lserver = S},
+            ?NS_FLEX_OFFLINE, _Lang) ->
+    case ejabberd_sm:get_session_pid(U, S, R) of
+       Pid when is_pid(Pid) ->
+           Hdrs = read_message_headers(U, S),
+           BareJID = jid:to_string(jid:remove_resource(JID)),
+           Pid ! dont_ask_offline,
+           {result, lists:map(
+                      fun({Node, From, _OfflineMsg}) ->
+                              #xmlel{name = <<"item">>,
+                                     attrs = [{<<"jid">>, BareJID},
+                                              {<<"node">>, Node},
+                                              {<<"name">>, From}]}
+                      end, Hdrs)};
+       none ->
+           {result, []}
+    end;
+get_sm_items(Acc, _From, _To, _Node, _Lang) ->
+    Acc.
+
+get_info(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+        ?NS_FLEX_OFFLINE, _Lang) ->
+    N = jlib:integer_to_binary(count_offline_messages(U, S)),
+    [#xmlel{name = <<"x">>,
+           attrs = [{<<"xmlns">>, ?NS_XDATA},
+                    {<<"type">>, <<"result">>}],
+           children = [#xmlel{name = <<"field">>,
+                              attrs = [{<<"var">>, <<"FORM_TYPE">>},
+                                       {<<"type">>, <<"hidden">>}],
+                              children = [#xmlel{name = <<"value">>,
+                                                 children = [{xmlcdata,
+                                                              ?NS_FLEX_OFFLINE}]}]},
+                       #xmlel{name = <<"field">>,
+                              attrs = [{<<"var">>, <<"number_of_messages">>}],
+                              children = [#xmlel{name = <<"value">>,
+                                                 children = [{xmlcdata, N}]}]}]}];
+get_info(Acc, _From, _To, _Node, _Lang) ->
+    Acc.
+
+handle_offline_query(#jid{luser = U, lserver = S} = From,
+                    #jid{luser = U, lserver = S} = _To,
+                    #iq{type = Type, sub_el = SubEl} = IQ) ->
+    case Type of
+       get ->
+           case fxml:get_subtag(SubEl, <<"fetch">>) of
+               #xmlel{} ->
+                   handle_offline_fetch(From);
+               false ->
+                   handle_offline_items_view(From, SubEl)
+           end;
+       set ->
+           case fxml:get_subtag(SubEl, <<"purge">>) of
+               #xmlel{} ->
+                   delete_all_msgs(U, S);
+               false ->
+                   handle_offline_items_remove(From, SubEl)
+           end
+    end,
+    IQ#iq{type = result, sub_el = []};
+handle_offline_query(_From, _To, #iq{sub_el = SubEl} = IQ) ->
+    IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}.
+
+handle_offline_items_view(JID, #xmlel{children = Items}) ->
+    {U, S, R} = jid:tolower(JID),
+    lists:foreach(
+      fun(Node) ->
+             case fetch_msg_by_node(JID, Node) of
+                 {ok, OfflineMsg} ->
+                     case offline_msg_to_route(S, OfflineMsg) of
+                         {route, From, To, El} ->
+                             NewEl = set_offline_tag(El, Node),
+                             case ejabberd_sm:get_session_pid(U, S, R) of
+                                 Pid when is_pid(Pid) ->
+                                     Pid ! {route, From, To, NewEl};
+                                 none ->
+                                     ok
+                             end;
+                         error ->
+                             ok
+                     end;
+                 error ->
+                     ok
+             end
+      end, get_nodes_from_items(Items, <<"view">>)).
+
+handle_offline_items_remove(JID, #xmlel{children = Items}) ->
+    lists:foreach(
+      fun(Node) ->
+             remove_msg_by_node(JID, Node)
+      end, get_nodes_from_items(Items, <<"remove">>)).
+
+get_nodes_from_items(Items, Action) ->
+    lists:flatmap(
+      fun(#xmlel{name = <<"item">>, attrs = Attrs}) ->
+             case fxml:get_attr_s(<<"action">>, Attrs) of
+                 Action ->
+                     case fxml:get_attr_s(<<"node">>, Attrs) of
+                         <<"">> ->
+                             [];
+                         TS ->
+                             [TS]
+                     end;
+                 _ ->
+                     []
+             end;
+        (_) ->
+             []
+      end, Items).
+
+set_offline_tag(#xmlel{children = Els} = El, Node) ->
+    OfflineEl = #xmlel{name = <<"offline">>,
+                      attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}],
+                      children = [#xmlel{name = <<"item">>,
+                                         attrs = [{<<"node">>, Node}]}]},
+    El#xmlel{children = [OfflineEl|Els]}.
+
+handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
+    case ejabberd_sm:get_session_pid(U, S, R) of
+       none ->
+           ok;
+       Pid when is_pid(Pid) ->
+           Pid ! dont_ask_offline,
+           lists:foreach(
+             fun({Node, _, Msg}) ->
+                     case offline_msg_to_route(S, Msg) of
+                         {route, From, To, El} ->
+                             NewEl = set_offline_tag(El, Node),
+                             Pid ! {route, From, To, NewEl};
+                         error ->
+                             ok
+                     end
+             end, read_message_headers(U, S))
+    end.
+
+fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
+    case jid:from_string(From_s) of
+       From = #jid{} ->
+           case gen_mod:db_type(To#jid.lserver, ?MODULE) of
+               odbc ->
+                   read_message(From, To, Seq, odbc);
+               DBType ->
+                   case binary_to_timestamp(Seq) of
+                       undefined -> ok;
+                       TS -> read_message(From, To, TS, DBType)
+                   end
+           end;
+       error ->
+           ok
+    end.
+
+remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
+    case jid:from_string(From_s) of
+       From = #jid{} ->
+           case gen_mod:db_type(To#jid.lserver, ?MODULE) of
+               odbc ->
+                   remove_message(From, To, Seq, odbc);
+               DBType ->
+                   case binary_to_timestamp(Seq) of
+                       undefined -> ok;
+                       TS -> remove_message(From, To, TS, DBType)
+                   end
+           end;
+       error ->
+           ok
+    end.
+
 need_to_store(LServer, Packet) ->
     Type = fxml:get_tag_attr_s(<<"type">>, Packet),
     if (Type /= <<"error">>) and (Type /= <<"groupchat">>)
        and (Type /= <<"headline">>) ->
-           case check_store_hint(Packet) of
-               store ->
-                   true;
-               no_store ->
-                   false;
-               none ->
-                   case gen_mod:get_module_opt(
-                          LServer, ?MODULE, store_empty_body,
-                          fun(V) when is_boolean(V) -> V;
-                             (unless_chat_state) -> unless_chat_state
-                          end,
-                          unless_chat_state) of
-                       false ->
-                           fxml:get_subtag(Packet, <<"body">>) /= false;
-                       unless_chat_state ->
-                           not jlib:is_standalone_chat_state(Packet);
-                       true ->
-                           true
-                   end
+           case has_offline_tag(Packet) of
+               false ->
+                   case check_store_hint(Packet) of
+                       store ->
+                           true;
+                       no_store ->
+                           false;
+                       none ->
+                           case gen_mod:get_module_opt(
+                                  LServer, ?MODULE, store_empty_body,
+                                  fun(V) when is_boolean(V) -> V;
+                                     (unless_chat_state) -> unless_chat_state
+                                  end,
+                                  unless_chat_state) of
+                               false ->
+                                   fxml:get_subtag(Packet, <<"body">>) /= false;
+                               unless_chat_state ->
+                                   not jlib:is_standalone_chat_state(Packet);
+                               true ->
+                                   true
+                           end
+                   end;
+               true ->
+                   false
            end;
        true ->
            false
@@ -353,6 +555,9 @@ has_no_store_hint(Packet) ->
       orelse
       fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false.
 
+has_offline_tag(Packet) ->
+    fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false.
+
 %% Check if the packet has any content about XEP-0022
 check_event(From, To, Packet) ->
     #xmlel{name = Name, attrs = Attrs, children = Els} =
@@ -713,6 +918,123 @@ offline_msg_to_route(_LServer, #xmlel{} = El) ->
             error
     end.
 
+binary_to_timestamp(TS) ->
+    case catch jlib:binary_to_integer(TS) of
+       Int when is_integer(Int) ->
+           Secs = Int div 1000000,
+           USec = Int rem 1000000,
+           MSec = Secs div 1000000,
+           Sec = Secs rem 1000000,
+           {MSec, Sec, USec};
+       _ ->
+           undefined
+    end.
+
+timestamp_to_binary({MS, S, US}) ->
+    format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)).
+
+format_timestamp(TS) ->
+    iolist_to_binary(io_lib:format("~20..0s", [TS])).
+
+offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) ->
+    TS = timestamp_to_binary(Int),
+    From_s = jid:to_string(From),
+    {<<TS/binary, "+", From_s/binary>>, From_s, Msg}.
+
+read_message_headers(LUser, LServer) ->
+    DBType = gen_mod:db_type(LServer, ?MODULE),
+    read_message_headers(LUser, LServer, DBType).
+
+read_message_headers(LUser, LServer, mnesia) ->
+    Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
+    Hdrs = lists:map(fun offline_msg_to_header/1, Msgs),
+    lists:keysort(1, Hdrs);
+read_message_headers(LUser, LServer, riak) ->
+    case ejabberd_riak:get_by_index(
+           offline_msg, offline_msg_schema(),
+          <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+           Hdrs = lists:map(fun offline_msg_to_header/1, Rs),
+           lists:keysort(1, Hdrs);
+       _Err ->
+           []
+    end;
+read_message_headers(LUser, LServer, odbc) ->
+    Username = ejabberd_odbc:escape(LUser),
+    case catch ejabberd_odbc:sql_query(
+                LServer, [<<"select xml, seq from spool where username ='">>,
+                          Username, <<"' order by seq;">>]) of
+       {selected, [<<"xml">>, <<"seq">>], Rows} ->
+           Hdrs = lists:flatmap(
+                    fun([XML, Seq]) ->
+                            try
+                                #xmlel{} = El = fxml_stream:parse_element(XML),
+                                From = fxml:get_tag_attr_s(<<"from">>, El),
+                                #jid{} = jid:from_string(From),
+                                TS = format_timestamp(Seq),
+                                [{<<TS/binary, "+", From/binary>>, From, El}]
+                            catch _:_ -> []
+                            end
+                    end, Rows),
+           lists:keysort(1, Hdrs);
+       _Err ->
+           []
+    end.
+
+read_message(_From, To, TS, mnesia) ->
+    {U, S, _} = jid:tolower(To),
+    case mnesia:dirty_match_object(
+          offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of
+       [Msg|_] ->
+           {ok, Msg};
+       _ ->
+           error
+    end;
+read_message(_From, _To, TS, riak) ->
+    case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
+       {ok, Msg} ->
+           {ok, Msg};
+       _ ->
+           error
+    end;
+read_message(_From, To, Seq, odbc) ->
+    {LUser, LServer, _} = jid:tolower(To),
+    Username = ejabberd_odbc:escape(LUser),
+    SSeq = ejabberd_odbc:escape(Seq),
+    case ejabberd_odbc:sql_query(
+          LServer,
+          [<<"select xml from spool  where username='">>, Username,
+           <<"'  and seq='">>, SSeq, <<"';">>]) of
+       {selected, [<<"xml">>], [[RawXML]|_]} ->
+           case fxml_stream:parse_element(RawXML) of
+               #xmlel{} = El -> {ok, El};
+               {error, _} -> error
+           end;
+       _ ->
+           error
+    end.
+
+remove_message(_From, To, TS, mnesia) ->
+    {U, S, _} = jid:tolower(To),
+    Msgs = mnesia:dirty_match_object(
+            offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}),
+    lists:foreach(
+      fun(Msg) ->
+             mnesia:dirty_delete_object(Msg)
+      end, Msgs);
+remove_message(_From, _To, TS, riak) ->
+    ejabberd_riak:delete(offline_msg, TS),
+    ok;
+remove_message(_From, To, Seq, odbc) ->
+    {LUser, LServer, _} = jid:tolower(To),
+    Username = ejabberd_odbc:escape(LUser),
+    SSeq = ejabberd_odbc:escape(Seq),
+    ejabberd_odbc:sql_query(
+      LServer,
+      [<<"delete from spool  where username='">>, Username,
+       <<"'  and seq='">>, SSeq, <<"';">>]),
+    ok.
+
 read_all_msgs(LUser, LServer, mnesia) ->
     US = {LUser, LServer},
     lists:keysort(#offline_msg.timestamp,