]> granicus.if.org Git - ejabberd/commitdiff
Improve mod_multicast
authorPaweł Chmielowski <pchmielowski@process-one.net>
Wed, 4 Apr 2018 10:05:37 +0000 (12:05 +0200)
committerPaweł Chmielowski <pchmielowski@process-one.net>
Wed, 4 Apr 2018 10:06:35 +0000 (12:06 +0200)
src/mod_multicast.erl

index 9f02995fea0a0ea937715a6235627e2f9b566e7e..846049a55fdfde263b3973136eeeef215362d55b 100644 (file)
 -include("translate.hrl").
 -include("xmpp.hrl").
 
--record(state,
-       {lserver, lservice, access, service_limits}).
+-record(multicastc, {rserver :: binary(),
+                    response,
+                    ts :: integer()}).
+
+-record(dest, {jid_string :: binary() | none,
+              jid_jid :: xmpp:jid(),
+              type :: to | cc | bcc,
+              address :: address()}).
+
+-type limit_value() :: {default | custom, integer()}.
+-record(limits, {message :: limit_value(),
+                presence :: limit_value()}).
+
+-record(service_limits, {local :: #limits{},
+                        remote :: #limits{}}).
+
+-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}.
+
+-record(group, {server :: binary(),
+               dests :: [#dest{}],
+               multicast :: routing(),
+               others :: [#address{}],
+               addresses :: [#address{}]}).
+
+-record(state, {lserver :: binary(),
+               lservice :: binary(),
+               access :: atom(),
+               service_limits :: #service_limits{}}).
 -type state() :: #state{}.
 
--record(multicastc, {rserver, response, ts}).
-
-%% ts: timestamp (in seconds) when the cache item was last updated
-
--record(dest, {jid_string = none :: binary(),
-              jid_jid :: jid(),
-              type :: atom(),
-              full_xml :: address()}).
-
-%% jid_string = string()
-%% jid_jid = jid()
-%% full_xml = xml()
-
--record(group,
-       {server, dests, multicast, others, addresses}).
-
-%% server = string()
-%% dests = [string()]
-%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached
-%%  after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()}
-%% others = [xml()]
-%% packet = xml()
-
--record(waiter,
-       {awaiting, group, renewal = false, sender, packet,
-        aattrs, addresses}).
-
-%% awaiting = {[Remote_service], Local_service, Type_awaiting}
-%%  Remote_service = Local_service = string()
-%%  Type_awaiting = info | items
-%% group = #group
-%% renewal = true | false
-%% sender = From
-%% packet = xml()
-%% aattrs = [xml()]
-
--record(limits, {message, presence}).
-
-%% message = presence = integer() | infinite
-
--record(service_limits, {local, remote}).
-
 %% All the elements are of type value()
 
 -define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>).
 
 -define(MAXTIME_CACHE_NEGATIVE, 86400).
 
+-define(MAXTIME_CACHE_NEGOTIATING, 600).
+
 -define(CACHE_PURGE_TIMER, 86400000).
 
 -define(DISCO_QUERY_TIMEOUT, 10000).
@@ -130,6 +116,7 @@ reload(LServerS, NewOpts, OldOpts) ->
 %% gen_server callbacks
 %%====================================================================
 
+-spec init(list()) -> {ok, state()}.
 init([LServerS, Opts]) ->
     process_flag(trap_exit, true),
     [LServiceS|_] = gen_mod:get_opt_hosts(LServerS, Opts),
@@ -137,7 +124,6 @@ init([LServerS, Opts]) ->
     SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts)),
     create_cache(),
     try_start_loop(),
-    create_pool(),
     ejabberd_router_multicast:register_route(LServerS),
     ejabberd_router:register_route(LServiceS, LServerS),
     {ok,
@@ -277,21 +263,22 @@ iq_vcard(Lang) ->
 %%% Route
 %%%-------------------------
 
+-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'.
 route_trusted(LServiceS, LServerS, FromJID,
              Destinations, Packet) ->
     Packet_stripped = Packet,
-    AAttrs = [],
     Delivereds = [],
     Dests2 = lists:map(
               fun(D) ->
                       #dest{jid_string = jid:encode(D),
-                            jid_jid = D, type = bcc,
-                            full_xml = #address{type = bcc, jid = D}}
+                            jid_jid    = D, type = bcc,
+                            address    = #address{type = bcc, jid = D}}
               end, Destinations),
     Groups = group_dests(Dests2),
     route_common(LServerS, LServiceS, FromJID, Groups,
-                Delivereds, Packet_stripped, AAttrs).
+                Delivereds, Packet_stripped).
 
+-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
 route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
     try route_untrusted2(LServiceS, LServerS, Access,
                         SLimits, Packet)
@@ -321,6 +308,7 @@ route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
                      <<"Unknown problem">>)
     end.
 
+-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
 route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
     FromJID = xmpp:get_from(Packet),
     ok = check_access(LServerS, Access, FromJID),
@@ -333,53 +321,40 @@ route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
     Groups = group_dests(Dests2),
     ok = check_relay(FromJID#jid.server, LServerS, Groups),
     route_common(LServerS, LServiceS, FromJID, Groups,
-                Delivereds, Packet_stripped, []).
+                Delivereds, Packet_stripped).
 
 -spec route_common(binary(), binary(), jid(), [#group{}],
-                  [address()], stanza(), list()) -> any().
+                  [address()], stanza()) -> 'ok'.
 route_common(LServerS, LServiceS, FromJID, Groups,
-            Delivereds, Packet_stripped, AAttrs) ->
-    Groups2 = look_cached_servers(LServerS, Groups),
+            Delivereds, Packet_stripped) ->
+    Groups2 = look_cached_servers(LServerS, LServiceS, Groups),
     Groups3 = build_others_xml(Groups2),
     Groups4 = add_addresses(Delivereds, Groups3),
     AGroups = decide_action_groups(Groups4),
-    act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
+    act_groups(FromJID, Packet_stripped, LServiceS,
               AGroups).
 
-act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
-          AGroups) ->
-    [perform(FromJID, Packet_stripped, AAttrs, LServiceS,
-            AGroup)
-     || AGroup <- AGroups].
-
-perform(From, Packet, AAttrs, _,
+-spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'.
+act_groups(FromJID, Packet_stripped, LServiceS, AGroups) ->
+    lists:foreach(
+       fun(AGroup) ->
+           perform(FromJID, Packet_stripped, LServiceS,
+                   AGroup)
+       end, AGroups).
+
+-spec perform(jid(), stanza(), binary(),
+             {routing(), #group{}}) -> 'ok'.
+perform(From, Packet, _,
        {route_single, Group}) ->
-    [route_packet(From, ToUser, Packet, AAttrs,
-                 Group#group.others, Group#group.addresses)
-     || ToUser <- Group#group.dests];
-perform(From, Packet, AAttrs, _,
+    lists:foreach(
+       fun(ToUser) ->
+           route_packet(From, ToUser, Packet,
+                        Group#group.others, Group#group.addresses)
+       end, Group#group.dests);
+perform(From, Packet, _,
        {{route_multicast, JID, RLimits}, Group}) ->
-    route_packet_multicast(From, JID, Packet, AAttrs,
-                          Group#group.dests, Group#group.addresses, RLimits);
-perform(From, Packet, AAttrs, LServiceS,
-       {{ask, Old_service, renewal}, Group}) ->
-    send_query_info(Old_service, LServiceS),
-    add_waiter(#waiter{awaiting =
-                          {[Old_service], LServiceS, info},
-                      group = Group, renewal = true, sender = From,
-                      packet = Packet, aattrs = AAttrs,
-                      addresses = Group#group.addresses});
-perform(_From, _Packet, _AAttrs, LServiceS,
-       {{ask, LServiceS, _}, _Group}) ->
-    ok;
-perform(From, Packet, AAttrs, LServiceS,
-       {{ask, Server, not_renewal}, Group}) ->
-    send_query_info(Server, LServiceS),
-    add_waiter(#waiter{awaiting =
-                          {[Server], LServiceS, info},
-                      group = Group, renewal = false, sender = From,
-                      packet = Packet, aattrs = AAttrs,
-                      addresses = Group#group.addresses}).
+    route_packet_multicast(From, JID, Packet,
+                          Group#group.dests, Group#group.addresses, RLimits).
 
 %%%-------------------------
 %%% Check access permission
@@ -427,7 +402,7 @@ split_addresses_todeliver(Addresses) ->
 %%% Check does not exceed limit of destinations
 %%%-------------------------
 
--spec check_limit_dests(_, jid(), stanza(), [address()]) -> ok.
+-spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok.
 check_limit_dests(SLimits, FromJID, Packet,
                  Addresses) ->
     SenderT = sender_type(FromJID),
@@ -448,10 +423,10 @@ check_limit_dests(SLimits, FromJID, Packet,
 convert_dest_record(Addrs) ->
     lists:map(
       fun(#address{jid = undefined} = Addr) ->
-             #dest{jid_string = none, full_xml = Addr};
+             #dest{jid_string = none, address = Addr};
         (#address{jid = JID, type = Type} = Addr) ->
              #dest{jid_string = jid:encode(JID), jid_jid = JID,
-                   type = Type, full_xml = Addr}
+                   type = Type, address = Addr}
       end, Addrs).
 
 %%%-------------------------
@@ -469,9 +444,9 @@ split_dests_jid(Dests) ->
                    end,
                    Dests).
 
--spec report_not_jid(jid(), stanza(), #dest{}) -> any().
+-spec report_not_jid(jid(), stanza(), [#dest{}]) -> any().
 report_not_jid(From, Packet, Dests) ->
-    Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.full_xml))
+    Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address))
              || Dest <- Dests],
     [route_error(xmpp:set_from_to(Packet, From, From), jid_malformed,
                 <<"This service can not process the address: ",
@@ -497,14 +472,14 @@ group_dests(Dests) ->
 %%% Look for cached responses
 %%%-------------------------
 
-look_cached_servers(LServerS, Groups) ->
-    [look_cached(LServerS, Group) || Group <- Groups].
+look_cached_servers(LServerS, LServiceS, Groups) ->
+    [look_cached(LServerS, LServiceS, Group) || Group <- Groups].
 
-look_cached(LServerS, G) ->
+look_cached(LServerS, LServiceS, G) ->
     Maxtime_positive = (?MAXTIME_CACHE_POSITIVE),
     Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE),
     Cached_response = search_server_on_cache(G#group.server,
-                                            LServerS,
+                                            LServerS, LServiceS,
                                             {Maxtime_positive,
                                              Maxtime_negative}),
     G#group{multicast = Cached_response}.
@@ -520,7 +495,7 @@ build_others_xml(Groups) ->
 
 build_other_xml(Dests) ->
     lists:foldl(fun (Dest, R) ->
-                       XML = Dest#dest.full_xml,
+                       XML = Dest#dest.address,
                        case Dest#dest.type of
                          to -> [add_delivered(XML) | R];
                          cc -> [add_delivered(XML) | R];
@@ -554,53 +529,38 @@ add_addresses2(Delivereds, [Group | Groups], Res, Pa,
 %%% Decide action groups
 %%%-------------------------
 
+-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}].
 decide_action_groups(Groups) ->
-    [{decide_action_group(Group), Group}
+    [{Group#group.multicast, Group}
      || Group <- Groups].
 
-decide_action_group(Group) ->
-    Server = Group#group.server,
-    case Group#group.multicast of
-      {cached, local_server} ->
-         %% Send a copy of the packet to each local user on Dests
-         route_single;
-      {cached, not_supported} ->
-         %% Send a copy of the packet to each remote user on Dests
-         route_single;
-      {cached, {multicast_supported, JID, RLimits}} ->
-         {route_multicast, JID, RLimits};
-      {obsolete,
-       {multicast_supported, Old_service, _RLimits}} ->
-         {ask, Old_service, renewal};
-      {obsolete, not_supported} -> {ask, Server, not_renewal};
-      not_cached -> {ask, Server, not_renewal}
-    end.
-
 %%%-------------------------
 %%% Route packet
 %%%-------------------------
 
-route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) ->
+-spec route_packet(jid(), #dest{}, xmpp:stanza(), [addresses()], [addresses()]) -> 'ok'.
+route_packet(From, ToDest, Packet, Others, Addresses) ->
     Dests = case ToDest#dest.type of
              bcc -> [];
              _ -> [ToDest]
            end,
     route_packet2(From, ToDest#dest.jid_string, Dests,
-                 Packet, AAttrs, {Others, Addresses}).
+                 Packet, {Others, Addresses}).
 
-route_packet_multicast(From, ToS, Packet, AAttrs, Dests,
+-spec route_packet_multicast(jid(), binary(), xmpp:stanza(), [#dest{}], [address()], #limits{}) -> 'ok'.
+route_packet_multicast(From, ToS, Packet, Dests,
                       Addresses, Limits) ->
     Type_of_stanza = type_of_stanza(Packet),
     {_Type, Limit_number} = get_limit_number(Type_of_stanza,
                                             Limits),
     Fragmented_dests = fragment_dests(Dests, Limit_number),
-    [route_packet2(From, ToS, DFragment, Packet, AAttrs,
-                  Addresses)
-     || DFragment <- Fragmented_dests].
+    lists:foreach(fun(DFragment) ->
+       route_packet2(From, ToS, DFragment, Packet,
+                     Addresses)
+       end, Fragmented_dests).
 
--spec route_packet2(jid(), binary(), [#dest{}], stanza(), list(), [address()]) -> ok.
-route_packet2(From, ToS, Dests, Packet, _AAttrs,
-             Addresses) ->
+-spec route_packet2(jid(), binary(), [#dest{}], xmpp:stanza(), {[address()], [address()]} | [address()]) -> 'ok'.
+route_packet2(From, ToS, Dests, Packet, Addresses) ->
     Els = case append_dests(Dests, Addresses) of
              [] ->
                  xmpp:get_els(Packet);
@@ -613,10 +573,10 @@ route_packet2(From, ToS, Dests, Packet, _AAttrs,
 
 -spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()].
 append_dests(_Dests, {Others, Addresses}) ->
-    Addresses++Others;
+    Addresses ++ Others;
 append_dests([], Addresses) -> Addresses;
 append_dests([Dest | Dests], Addresses) ->
-    append_dests(Dests, [Dest#dest.full_xml | Addresses]).
+    append_dests(Dests, [Dest#dest.address | Addresses]).
 
 %%%-------------------------
 %%% Check relay
@@ -647,20 +607,22 @@ check_relay_required(LServerS, Groups) ->
 %%% Check protocol support: Send request
 %%%-------------------------
 
-send_query_info(RServerS, LServiceS) ->
+-spec send_query_info(binary(), binary(), binary()) -> ok.
+send_query_info(RServerS, LServiceS, ID) ->
     case str:str(RServerS, <<"echo.">>) of
-      1 -> false;
-      _ -> send_query(RServerS, LServiceS, #disco_info{})
+      1 -> ok;
+      _ -> send_query(RServerS, LServiceS, ID, #disco_info{})
     end.
 
-send_query_items(RServerS, LServiceS) ->
-    send_query(RServerS, LServiceS, #disco_items{}).
+-spec send_query_items(binary(), binary(), binary()) -> ok.
+send_query_items(RServerS, LServiceS, ID) ->
+    send_query(RServerS, LServiceS, ID, #disco_items{}).
 
--spec send_query(binary(), binary(), [disco_info()|disco_items()]) -> ok.
-send_query(RServerS, LServiceS, SubEl) ->
+-spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok.
+send_query(RServerS, LServiceS, ID, SubEl) ->
     Packet = #iq{from = stj(LServiceS),
                 to = stj(RServerS),
-                id = randoms:get_string(),
+                id = ID,
                 type = get, sub_els = [SubEl]},
     ejabberd_router:route(Packet).
 
@@ -670,10 +632,31 @@ send_query(RServerS, LServiceS, SubEl) ->
 
 process_iqreply_error(LServiceS, Packet) ->
     FromS = jts(xmpp:get_from(Packet)),
-    case search_waiter(FromS, LServiceS, info) of
-      {found_waiter, Waiter} ->
-         received_awaiter(FromS, Waiter, LServiceS);
-      _ -> ok
+    ID = Packet#iq.id,
+    case str:tokens(ID, <<"/">>) of
+        [RServer, _] ->
+            case look_server(RServer) of
+                {cached, {_Response, {wait_for_info, ID}}, _TS}
+                when RServer == FromS ->
+                    add_response(RServer, not_supported, cached);
+                {cached, {_Response, {wait_for_items, ID}}, _TS}
+                when RServer == FromS ->
+                    add_response(RServer, not_supported, cached);
+                {cached, {Response, {wait_for_items_info, ID, Items}},
+                 _TS} ->
+                    case lists:member(FromS, Items) of
+                        true ->
+                            received_awaiter(
+                              FromS, RServer, Response, ID, Items,
+                              LServiceS);
+                        false ->
+                            ok
+                    end;
+                _ ->
+                    ok
+            end;
+        _ ->
+            ok
     end.
 
 %%%-------------------------
@@ -681,12 +664,12 @@ process_iqreply_error(LServiceS, Packet) ->
 %%%-------------------------
 
 -spec process_iqreply_result(binary(), iq()) -> any().
-process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) ->
+process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) ->
     case SubEl of
        #disco_info{} ->
-           process_discoinfo_result(From, LServiceS, SubEl);
+           process_discoinfo_result(From, LServiceS, ID, SubEl);
        #disco_items{} ->
-           process_discoitems_result(From, LServiceS, SubEl);
+           process_discoitems_result(From, LServiceS, ID, SubEl);
        _ ->
            ok
     end.
@@ -695,46 +678,53 @@ process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) ->
 %%% Check protocol support: Receive response: Disco Info
 %%%-------------------------
 
-process_discoinfo_result(From, LServiceS, DiscoInfo) ->
+process_discoinfo_result(From, LServiceS, ID, DiscoInfo) ->
     FromS = jts(From),
-    case search_waiter(FromS, LServiceS, info) of
-      {found_waiter, Waiter} ->
-         process_discoinfo_result2(From, FromS, LServiceS, DiscoInfo,
-                                   Waiter);
-      _ -> ok
+    case str:tokens(ID, <<"/">>) of
+        [RServer, _] ->
+            case look_server(RServer) of
+                {cached, {Response, {wait_for_info, ID} = ST}, _TS}
+                when RServer == FromS ->
+                    process_discoinfo_result2(
+                      From, FromS, LServiceS, DiscoInfo,
+                      RServer, Response, ST);
+                {cached, {Response, {wait_for_items_info, ID, Items} = ST},
+                 _TS} ->
+                    case lists:member(FromS, Items) of
+                        true ->
+                            process_discoinfo_result2(
+                              From, FromS, LServiceS, DiscoInfo,
+                              RServer, Response, ST);
+                        false ->
+                            ok
+                    end;
+                _ ->
+                    ok
+            end;
+        _ ->
+            ok
     end.
 
 process_discoinfo_result2(From, FromS, LServiceS,
                          #disco_info{features = Feats} = DiscoInfo,
-                         Waiter) ->
+                         RServer, Response, ST) ->
     Multicast_support = lists:member(?NS_ADDRESS, Feats),
-    Group = Waiter#waiter.group,
-    RServer = Group#group.server,
     case Multicast_support of
        true ->
            SenderT = sender_type(From),
            RLimits = get_limits_xml(DiscoInfo, SenderT),
-           add_response(RServer, {multicast_supported, FromS, RLimits}),
-           FromM = Waiter#waiter.sender,
-           DestsM = Group#group.dests,
-           PacketM = Waiter#waiter.packet,
-           AAttrsM = Waiter#waiter.aattrs,
-           AddressesM = Waiter#waiter.addresses,
-           RServiceM = FromS,
-           route_packet_multicast(FromM, RServiceM, PacketM,
-               AAttrsM, DestsM, AddressesM, RLimits),
-           delo_waiter(Waiter);
+           add_response(RServer, {multicast_supported, FromS, RLimits}, cached);
        false ->
-           case FromS of
-               RServer ->
-               send_query_items(FromS, LServiceS),
-               delo_waiter(Waiter),
-               add_waiter(Waiter#waiter{awaiting =
-                                            {[FromS], LServiceS, items},
-                                        renewal = false});
-           %% We asked a component, and it does not support XEP33
-           _ -> received_awaiter(FromS, Waiter, LServiceS)
-         end
+           case ST of
+               {wait_for_info, _ID} ->
+                   Random = randoms:get_string(),
+                   ID = <<RServer/binary, $/, Random/binary>>,
+                   send_query_items(FromS, LServiceS, ID),
+                   add_response(RServer, Response, {wait_for_items, ID});
+               %% We asked a component, and it does not support XEP33
+               {wait_for_items_info, ID, Items} ->
+                   received_awaiter(FromS, RServer, Response, ID, Items, LServiceS)
+           end
     end.
 
 get_limits_xml(DiscoInfo, SenderT) ->
@@ -778,27 +768,32 @@ get_limits_values(Fields) ->
 %%% Check protocol support: Receive response: Disco Items
 %%%-------------------------
 
-process_discoitems_result(From, LServiceS, #disco_items{items = Items}) ->
+process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) ->
     FromS = jts(From),
-    case search_waiter(FromS, LServiceS, items) of
-        {found_waiter, Waiter} ->
-            List = lists:flatmap(
-                    fun(#disco_item{jid = #jid{luser = <<"">>,
-                                               lserver = LServer,
-                                               lresource = <<"">>}}) ->
-                            [LServer];
-                       (_) ->
-                            []
-                    end, Items),
-            case List of
-                [] ->
-                    received_awaiter(FromS, Waiter, LServiceS);
+    case str:tokens(ID, <<"/">>) of
+        [FromS = RServer, _] ->
+            case look_server(RServer) of
+                {cached, {Response, {wait_for_items, ID}}, _TS} ->
+                    List = lists:flatmap(
+                             fun(#disco_item{jid = #jid{luser = <<"">>,
+                                                        lserver = LServer,
+                                                        lresource = <<"">>}}) ->
+                                     [LServer];
+                                (_) ->
+                                     []
+                             end, Items),
+                    case List of
+                        [] ->
+                            add_response(RServer, not_supported, cached);
+                        _ ->
+                            Random = randoms:get_string(),
+                            ID2 = <<RServer/binary, $/, Random/binary>>,
+                            [send_query_info(Item, LServiceS, ID2) || Item <- List],
+                            add_response(RServer, Response,
+                                         {wait_for_items_info, ID2, List})
+                    end;
                 _ ->
-                    [send_query_info(Item, LServiceS) || Item <- List],
-                    delo_waiter(Waiter),
-                    add_waiter(Waiter#waiter{awaiting =
-                                             {List, LServiceS, info},
-                                             renewal = false})
+                    ok
             end;
         _ ->
             ok
@@ -808,33 +803,12 @@ process_discoitems_result(From, LServiceS, #disco_items{items = Items}) ->
 %%% Check protocol support: Receive response: Received awaiter
 %%%-------------------------
 
-received_awaiter(JID, Waiter, LServiceS) ->
-    {JIDs, LServiceS, _} = Waiter#waiter.awaiting,
-    delo_waiter(Waiter),
-    Group = Waiter#waiter.group,
-    RServer = Group#group.server,
+received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) ->
     case lists:delete(JID, JIDs) of
-      [] ->
-         case Waiter#waiter.renewal of
-           false ->
-               add_response(RServer, not_supported),
-               From = Waiter#waiter.sender,
-               Packet = Waiter#waiter.packet,
-               AAttrs = Waiter#waiter.aattrs,
-               Others = Group#group.others,
-               Addresses = Waiter#waiter.addresses,
-               [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses)
-                || ToUser <- Group#group.dests];
-           true ->
-               send_query_info(RServer, LServiceS),
-               add_waiter(Waiter#waiter{awaiting =
-                                            {[RServer], LServiceS, info},
-                                        renewal = false})
-         end;
-      JIDs2 ->
-         add_waiter(Waiter#waiter{awaiting =
-                                      {JIDs2, LServiceS, info},
-                                  renewal = false})
+        [] ->
+            add_response(RServer, not_supported, cached);
+        JIDs2 ->
+            add_response(RServer, Response, {wait_for_items_info, ID, JIDs2})
     end.
 
 %%%-------------------------
@@ -846,25 +820,52 @@ create_cache() ->
                        [{ram_copies, [node()]},
                         {attributes, record_info(fields, multicastc)}]).
 
-add_response(RServer, Response) ->
+add_response(RServer, Response, State) ->
     Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
     mnesia:dirty_write(#multicastc{rserver = RServer,
-                                  response = Response, ts = Secs}).
+                                  response = {Response, State}, ts = Secs}).
 
-search_server_on_cache(RServer, LServerS, _Maxmins)
+search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins)
     when RServer == LServerS ->
-    {cached, local_server};
-search_server_on_cache(RServer, _LServerS, Maxmins) ->
+    route_single;
+search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) ->
     case look_server(RServer) of
-      not_cached -> not_cached;
-      {cached, Response, Ts} ->
-         Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
-         case is_obsolete(Response, Ts, Now, Maxmins) of
-           false -> {cached, Response};
-           true -> {obsolete, Response}
-         end
+        not_cached ->
+            query_info(RServer, LServiceS, not_supported),
+            route_single;
+        {cached, {Response, State}, TS} ->
+            Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
+            Response2 =
+                case State of
+                    cached ->
+                        case is_obsolete(Response, TS, Now, Maxmins) of
+                            false -> ok;
+                            true ->
+                                query_info(RServer, LServiceS, Response)
+                        end,
+                        Response;
+                    _ ->
+                        if
+                            Now - TS > ?MAXTIME_CACHE_NEGOTIATING ->
+                                query_info(RServer, LServiceS, not_supported),
+                                not_supported;
+                            true ->
+                                Response
+                        end
+                end,
+            case Response2 of
+                not_supported -> route_single;
+                {multicast_supported, Service, Limits} ->
+                    {route_multicast, Service, Limits}
+            end
     end.
 
+query_info(RServer, LServiceS, Response) ->
+    Random = randoms:get_string(),
+    ID = <<RServer/binary, $/, Random/binary>>,
+    send_query_info(RServer, LServiceS, ID),
+    add_response(RServer, Response, {wait_for_info, ID}).
+
 look_server(RServer) ->
     case mnesia:dirty_read(multicastc, RServer) of
       [] -> not_cached;
@@ -935,44 +936,6 @@ purge_loop(NM) ->
       try_stop -> purge_loop_finished
     end.
 
-%%%-------------------------
-%%% Pool
-%%%-------------------------
-
-create_pool() ->
-    catch
-      begin
-          ets:new(multicastp,
-                  [duplicate_bag, public, named_table, {keypos, 2}]),
-          ets:give_away(multicastp, whereis(ejabberd), ok)
-      end.
-
-add_waiter(Waiter) ->
-    true = ets:insert(multicastp, Waiter).
-
-delo_waiter(Waiter) ->
-    true = ets:delete_object(multicastp, Waiter).
-
--spec search_waiter(binary(), binary(), info | items) ->
-    {found_waiter, #waiter{}} | waiter_not_found.
-
-search_waiter(JID, LServiceS, Type) ->
-    Rs = ets:foldl(fun (W, Res) ->
-                          {JIDs, LServiceS1, Type1} = W#waiter.awaiting,
-                          case lists:member(JID, JIDs) and
-                                 (LServiceS == LServiceS1)
-                                 and (Type1 == Type)
-                              of
-                            true -> Res ++ [W];
-                            false -> Res
-                          end
-                  end,
-                  [], multicastp),
-    case Rs of
-      [R | _] -> {found_waiter, R};
-      [] -> waiter_not_found
-    end.
-
 %%%-------------------------
 %%% Limits: utils
 %%%-------------------------
@@ -1006,11 +969,13 @@ get_from_limitopts(LimitOpts, SenderT) ->
 build_remote_limit_record(LimitOpts, SenderT) ->
     build_limit_record(LimitOpts, SenderT).
 
+-spec build_limit_record(any(), local | remote) -> #limits{}.
 build_limit_record(LimitOpts, SenderT) ->
     Limits = [get_limit_value(Name, Default, LimitOpts)
              || {Name, Default} <- list_of_limits(SenderT)],
     list_to_tuple([limits | Limits]).
 
+-spec get_limit_value(atom(), integer(), any()) -> limit_value().
 get_limit_value(Name, Default, LimitOpts) ->
     case lists:keysearch(Name, 1, LimitOpts) of
       {value, {Name, Number}} -> {custom, Number};
@@ -1019,11 +984,13 @@ get_limit_value(Name, Default, LimitOpts) ->
 
 type_of_stanza(Stanza) -> element(1, Stanza).
 
+-spec get_limit_number(message | presence, #limits{}) -> limit_value().
 get_limit_number(message, Limits) ->
     Limits#limits.message;
 get_limit_number(presence, Limits) ->
     Limits#limits.presence.
 
+-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}.
 get_slimit_group(local, SLimits) ->
     SLimits#service_limits.local;
 get_slimit_group(remote, SLimits) ->