-include("translate.hrl").
-include("xmpp.hrl").
--record(state,
- {lserver, lservice, access, service_limits}).
+-record(multicastc, {rserver :: binary(),
+ response,
+ ts :: integer()}).
+
+-record(dest, {jid_string :: binary() | none,
+ jid_jid :: xmpp:jid(),
+ type :: to | cc | bcc,
+ address :: address()}).
+
+-type limit_value() :: {default | custom, integer()}.
+-record(limits, {message :: limit_value(),
+ presence :: limit_value()}).
+
+-record(service_limits, {local :: #limits{},
+ remote :: #limits{}}).
+
+-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}.
+
+-record(group, {server :: binary(),
+ dests :: [#dest{}],
+ multicast :: routing(),
+ others :: [#address{}],
+ addresses :: [#address{}]}).
+
+-record(state, {lserver :: binary(),
+ lservice :: binary(),
+ access :: atom(),
+ service_limits :: #service_limits{}}).
-type state() :: #state{}.
--record(multicastc, {rserver, response, ts}).
-
-%% ts: timestamp (in seconds) when the cache item was last updated
-
--record(dest, {jid_string = none :: binary(),
- jid_jid :: jid(),
- type :: atom(),
- full_xml :: address()}).
-
-%% jid_string = string()
-%% jid_jid = jid()
-%% full_xml = xml()
-
--record(group,
- {server, dests, multicast, others, addresses}).
-
-%% server = string()
-%% dests = [string()]
-%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached
-%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()}
-%% others = [xml()]
-%% packet = xml()
-
--record(waiter,
- {awaiting, group, renewal = false, sender, packet,
- aattrs, addresses}).
-
-%% awaiting = {[Remote_service], Local_service, Type_awaiting}
-%% Remote_service = Local_service = string()
-%% Type_awaiting = info | items
-%% group = #group
-%% renewal = true | false
-%% sender = From
-%% packet = xml()
-%% aattrs = [xml()]
-
--record(limits, {message, presence}).
-
-%% message = presence = integer() | infinite
-
--record(service_limits, {local, remote}).
-
%% All the elements are of type value()
-define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>).
-define(MAXTIME_CACHE_NEGATIVE, 86400).
+-define(MAXTIME_CACHE_NEGOTIATING, 600).
+
-define(CACHE_PURGE_TIMER, 86400000).
-define(DISCO_QUERY_TIMEOUT, 10000).
%% gen_server callbacks
%%====================================================================
+-spec init(list()) -> {ok, state()}.
init([LServerS, Opts]) ->
process_flag(trap_exit, true),
[LServiceS|_] = gen_mod:get_opt_hosts(LServerS, Opts),
SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts)),
create_cache(),
try_start_loop(),
- create_pool(),
ejabberd_router_multicast:register_route(LServerS),
ejabberd_router:register_route(LServiceS, LServerS),
{ok,
%%% Route
%%%-------------------------
+-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'.
route_trusted(LServiceS, LServerS, FromJID,
Destinations, Packet) ->
Packet_stripped = Packet,
- AAttrs = [],
Delivereds = [],
Dests2 = lists:map(
fun(D) ->
#dest{jid_string = jid:encode(D),
- jid_jid = D, type = bcc,
- full_xml = #address{type = bcc, jid = D}}
+ jid_jid = D, type = bcc,
+ address = #address{type = bcc, jid = D}}
end, Destinations),
Groups = group_dests(Dests2),
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, AAttrs).
+ Delivereds, Packet_stripped).
+-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
try route_untrusted2(LServiceS, LServerS, Access,
SLimits, Packet)
<<"Unknown problem">>)
end.
+-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
FromJID = xmpp:get_from(Packet),
ok = check_access(LServerS, Access, FromJID),
Groups = group_dests(Dests2),
ok = check_relay(FromJID#jid.server, LServerS, Groups),
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, []).
+ Delivereds, Packet_stripped).
-spec route_common(binary(), binary(), jid(), [#group{}],
- [address()], stanza(), list()) -> any().
+ [address()], stanza()) -> 'ok'.
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, AAttrs) ->
- Groups2 = look_cached_servers(LServerS, Groups),
+ Delivereds, Packet_stripped) ->
+ Groups2 = look_cached_servers(LServerS, LServiceS, Groups),
Groups3 = build_others_xml(Groups2),
Groups4 = add_addresses(Delivereds, Groups3),
AGroups = decide_action_groups(Groups4),
- act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
+ act_groups(FromJID, Packet_stripped, LServiceS,
AGroups).
-act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
- AGroups) ->
- [perform(FromJID, Packet_stripped, AAttrs, LServiceS,
- AGroup)
- || AGroup <- AGroups].
-
-perform(From, Packet, AAttrs, _,
+-spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'.
+act_groups(FromJID, Packet_stripped, LServiceS, AGroups) ->
+ lists:foreach(
+ fun(AGroup) ->
+ perform(FromJID, Packet_stripped, LServiceS,
+ AGroup)
+ end, AGroups).
+
+-spec perform(jid(), stanza(), binary(),
+ {routing(), #group{}}) -> 'ok'.
+perform(From, Packet, _,
{route_single, Group}) ->
- [route_packet(From, ToUser, Packet, AAttrs,
- Group#group.others, Group#group.addresses)
- || ToUser <- Group#group.dests];
-perform(From, Packet, AAttrs, _,
+ lists:foreach(
+ fun(ToUser) ->
+ route_packet(From, ToUser, Packet,
+ Group#group.others, Group#group.addresses)
+ end, Group#group.dests);
+perform(From, Packet, _,
{{route_multicast, JID, RLimits}, Group}) ->
- route_packet_multicast(From, JID, Packet, AAttrs,
- Group#group.dests, Group#group.addresses, RLimits);
-perform(From, Packet, AAttrs, LServiceS,
- {{ask, Old_service, renewal}, Group}) ->
- send_query_info(Old_service, LServiceS),
- add_waiter(#waiter{awaiting =
- {[Old_service], LServiceS, info},
- group = Group, renewal = true, sender = From,
- packet = Packet, aattrs = AAttrs,
- addresses = Group#group.addresses});
-perform(_From, _Packet, _AAttrs, LServiceS,
- {{ask, LServiceS, _}, _Group}) ->
- ok;
-perform(From, Packet, AAttrs, LServiceS,
- {{ask, Server, not_renewal}, Group}) ->
- send_query_info(Server, LServiceS),
- add_waiter(#waiter{awaiting =
- {[Server], LServiceS, info},
- group = Group, renewal = false, sender = From,
- packet = Packet, aattrs = AAttrs,
- addresses = Group#group.addresses}).
+ route_packet_multicast(From, JID, Packet,
+ Group#group.dests, Group#group.addresses, RLimits).
%%%-------------------------
%%% Check access permission
%%% Check does not exceed limit of destinations
%%%-------------------------
--spec check_limit_dests(_, jid(), stanza(), [address()]) -> ok.
+-spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok.
check_limit_dests(SLimits, FromJID, Packet,
Addresses) ->
SenderT = sender_type(FromJID),
convert_dest_record(Addrs) ->
lists:map(
fun(#address{jid = undefined} = Addr) ->
- #dest{jid_string = none, full_xml = Addr};
+ #dest{jid_string = none, address = Addr};
(#address{jid = JID, type = Type} = Addr) ->
#dest{jid_string = jid:encode(JID), jid_jid = JID,
- type = Type, full_xml = Addr}
+ type = Type, address = Addr}
end, Addrs).
%%%-------------------------
end,
Dests).
--spec report_not_jid(jid(), stanza(), #dest{}) -> any().
+-spec report_not_jid(jid(), stanza(), [#dest{}]) -> any().
report_not_jid(From, Packet, Dests) ->
- Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.full_xml))
+ Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address))
|| Dest <- Dests],
[route_error(xmpp:set_from_to(Packet, From, From), jid_malformed,
<<"This service can not process the address: ",
%%% Look for cached responses
%%%-------------------------
-look_cached_servers(LServerS, Groups) ->
- [look_cached(LServerS, Group) || Group <- Groups].
+look_cached_servers(LServerS, LServiceS, Groups) ->
+ [look_cached(LServerS, LServiceS, Group) || Group <- Groups].
-look_cached(LServerS, G) ->
+look_cached(LServerS, LServiceS, G) ->
Maxtime_positive = (?MAXTIME_CACHE_POSITIVE),
Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE),
Cached_response = search_server_on_cache(G#group.server,
- LServerS,
+ LServerS, LServiceS,
{Maxtime_positive,
Maxtime_negative}),
G#group{multicast = Cached_response}.
build_other_xml(Dests) ->
lists:foldl(fun (Dest, R) ->
- XML = Dest#dest.full_xml,
+ XML = Dest#dest.address,
case Dest#dest.type of
to -> [add_delivered(XML) | R];
cc -> [add_delivered(XML) | R];
%%% Decide action groups
%%%-------------------------
+-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}].
decide_action_groups(Groups) ->
- [{decide_action_group(Group), Group}
+ [{Group#group.multicast, Group}
|| Group <- Groups].
-decide_action_group(Group) ->
- Server = Group#group.server,
- case Group#group.multicast of
- {cached, local_server} ->
- %% Send a copy of the packet to each local user on Dests
- route_single;
- {cached, not_supported} ->
- %% Send a copy of the packet to each remote user on Dests
- route_single;
- {cached, {multicast_supported, JID, RLimits}} ->
- {route_multicast, JID, RLimits};
- {obsolete,
- {multicast_supported, Old_service, _RLimits}} ->
- {ask, Old_service, renewal};
- {obsolete, not_supported} -> {ask, Server, not_renewal};
- not_cached -> {ask, Server, not_renewal}
- end.
-
%%%-------------------------
%%% Route packet
%%%-------------------------
-route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) ->
+-spec route_packet(jid(), #dest{}, xmpp:stanza(), [addresses()], [addresses()]) -> 'ok'.
+route_packet(From, ToDest, Packet, Others, Addresses) ->
Dests = case ToDest#dest.type of
bcc -> [];
_ -> [ToDest]
end,
route_packet2(From, ToDest#dest.jid_string, Dests,
- Packet, AAttrs, {Others, Addresses}).
+ Packet, {Others, Addresses}).
-route_packet_multicast(From, ToS, Packet, AAttrs, Dests,
+-spec route_packet_multicast(jid(), binary(), xmpp:stanza(), [#dest{}], [address()], #limits{}) -> 'ok'.
+route_packet_multicast(From, ToS, Packet, Dests,
Addresses, Limits) ->
Type_of_stanza = type_of_stanza(Packet),
{_Type, Limit_number} = get_limit_number(Type_of_stanza,
Limits),
Fragmented_dests = fragment_dests(Dests, Limit_number),
- [route_packet2(From, ToS, DFragment, Packet, AAttrs,
- Addresses)
- || DFragment <- Fragmented_dests].
+ lists:foreach(fun(DFragment) ->
+ route_packet2(From, ToS, DFragment, Packet,
+ Addresses)
+ end, Fragmented_dests).
--spec route_packet2(jid(), binary(), [#dest{}], stanza(), list(), [address()]) -> ok.
-route_packet2(From, ToS, Dests, Packet, _AAttrs,
- Addresses) ->
+-spec route_packet2(jid(), binary(), [#dest{}], xmpp:stanza(), {[address()], [address()]} | [address()]) -> 'ok'.
+route_packet2(From, ToS, Dests, Packet, Addresses) ->
Els = case append_dests(Dests, Addresses) of
[] ->
xmpp:get_els(Packet);
-spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()].
append_dests(_Dests, {Others, Addresses}) ->
- Addresses++Others;
+ Addresses ++ Others;
append_dests([], Addresses) -> Addresses;
append_dests([Dest | Dests], Addresses) ->
- append_dests(Dests, [Dest#dest.full_xml | Addresses]).
+ append_dests(Dests, [Dest#dest.address | Addresses]).
%%%-------------------------
%%% Check relay
%%% Check protocol support: Send request
%%%-------------------------
-send_query_info(RServerS, LServiceS) ->
+-spec send_query_info(binary(), binary(), binary()) -> ok.
+send_query_info(RServerS, LServiceS, ID) ->
case str:str(RServerS, <<"echo.">>) of
- 1 -> false;
- _ -> send_query(RServerS, LServiceS, #disco_info{})
+ 1 -> ok;
+ _ -> send_query(RServerS, LServiceS, ID, #disco_info{})
end.
-send_query_items(RServerS, LServiceS) ->
- send_query(RServerS, LServiceS, #disco_items{}).
+-spec send_query_items(binary(), binary(), binary()) -> ok.
+send_query_items(RServerS, LServiceS, ID) ->
+ send_query(RServerS, LServiceS, ID, #disco_items{}).
--spec send_query(binary(), binary(), [disco_info()|disco_items()]) -> ok.
-send_query(RServerS, LServiceS, SubEl) ->
+-spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok.
+send_query(RServerS, LServiceS, ID, SubEl) ->
Packet = #iq{from = stj(LServiceS),
to = stj(RServerS),
- id = randoms:get_string(),
+ id = ID,
type = get, sub_els = [SubEl]},
ejabberd_router:route(Packet).
process_iqreply_error(LServiceS, Packet) ->
FromS = jts(xmpp:get_from(Packet)),
- case search_waiter(FromS, LServiceS, info) of
- {found_waiter, Waiter} ->
- received_awaiter(FromS, Waiter, LServiceS);
- _ -> ok
+ ID = Packet#iq.id,
+ case str:tokens(ID, <<"/">>) of
+ [RServer, _] ->
+ case look_server(RServer) of
+ {cached, {_Response, {wait_for_info, ID}}, _TS}
+ when RServer == FromS ->
+ add_response(RServer, not_supported, cached);
+ {cached, {_Response, {wait_for_items, ID}}, _TS}
+ when RServer == FromS ->
+ add_response(RServer, not_supported, cached);
+ {cached, {Response, {wait_for_items_info, ID, Items}},
+ _TS} ->
+ case lists:member(FromS, Items) of
+ true ->
+ received_awaiter(
+ FromS, RServer, Response, ID, Items,
+ LServiceS);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
end.
%%%-------------------------
%%%-------------------------
-spec process_iqreply_result(binary(), iq()) -> any().
-process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) ->
+process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) ->
case SubEl of
#disco_info{} ->
- process_discoinfo_result(From, LServiceS, SubEl);
+ process_discoinfo_result(From, LServiceS, ID, SubEl);
#disco_items{} ->
- process_discoitems_result(From, LServiceS, SubEl);
+ process_discoitems_result(From, LServiceS, ID, SubEl);
_ ->
ok
end.
%%% Check protocol support: Receive response: Disco Info
%%%-------------------------
-process_discoinfo_result(From, LServiceS, DiscoInfo) ->
+process_discoinfo_result(From, LServiceS, ID, DiscoInfo) ->
FromS = jts(From),
- case search_waiter(FromS, LServiceS, info) of
- {found_waiter, Waiter} ->
- process_discoinfo_result2(From, FromS, LServiceS, DiscoInfo,
- Waiter);
- _ -> ok
+ case str:tokens(ID, <<"/">>) of
+ [RServer, _] ->
+ case look_server(RServer) of
+ {cached, {Response, {wait_for_info, ID} = ST}, _TS}
+ when RServer == FromS ->
+ process_discoinfo_result2(
+ From, FromS, LServiceS, DiscoInfo,
+ RServer, Response, ST);
+ {cached, {Response, {wait_for_items_info, ID, Items} = ST},
+ _TS} ->
+ case lists:member(FromS, Items) of
+ true ->
+ process_discoinfo_result2(
+ From, FromS, LServiceS, DiscoInfo,
+ RServer, Response, ST);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
end.
process_discoinfo_result2(From, FromS, LServiceS,
#disco_info{features = Feats} = DiscoInfo,
- Waiter) ->
+ RServer, Response, ST) ->
Multicast_support = lists:member(?NS_ADDRESS, Feats),
- Group = Waiter#waiter.group,
- RServer = Group#group.server,
case Multicast_support of
true ->
SenderT = sender_type(From),
RLimits = get_limits_xml(DiscoInfo, SenderT),
- add_response(RServer, {multicast_supported, FromS, RLimits}),
- FromM = Waiter#waiter.sender,
- DestsM = Group#group.dests,
- PacketM = Waiter#waiter.packet,
- AAttrsM = Waiter#waiter.aattrs,
- AddressesM = Waiter#waiter.addresses,
- RServiceM = FromS,
- route_packet_multicast(FromM, RServiceM, PacketM,
- AAttrsM, DestsM, AddressesM, RLimits),
- delo_waiter(Waiter);
+ add_response(RServer, {multicast_supported, FromS, RLimits}, cached);
false ->
- case FromS of
- RServer ->
- send_query_items(FromS, LServiceS),
- delo_waiter(Waiter),
- add_waiter(Waiter#waiter{awaiting =
- {[FromS], LServiceS, items},
- renewal = false});
- %% We asked a component, and it does not support XEP33
- _ -> received_awaiter(FromS, Waiter, LServiceS)
- end
+ case ST of
+ {wait_for_info, _ID} ->
+ Random = randoms:get_string(),
+ ID = <<RServer/binary, $/, Random/binary>>,
+ send_query_items(FromS, LServiceS, ID),
+ add_response(RServer, Response, {wait_for_items, ID});
+ %% We asked a component, and it does not support XEP33
+ {wait_for_items_info, ID, Items} ->
+ received_awaiter(FromS, RServer, Response, ID, Items, LServiceS)
+ end
end.
get_limits_xml(DiscoInfo, SenderT) ->
%%% Check protocol support: Receive response: Disco Items
%%%-------------------------
-process_discoitems_result(From, LServiceS, #disco_items{items = Items}) ->
+process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) ->
FromS = jts(From),
- case search_waiter(FromS, LServiceS, items) of
- {found_waiter, Waiter} ->
- List = lists:flatmap(
- fun(#disco_item{jid = #jid{luser = <<"">>,
- lserver = LServer,
- lresource = <<"">>}}) ->
- [LServer];
- (_) ->
- []
- end, Items),
- case List of
- [] ->
- received_awaiter(FromS, Waiter, LServiceS);
+ case str:tokens(ID, <<"/">>) of
+ [FromS = RServer, _] ->
+ case look_server(RServer) of
+ {cached, {Response, {wait_for_items, ID}}, _TS} ->
+ List = lists:flatmap(
+ fun(#disco_item{jid = #jid{luser = <<"">>,
+ lserver = LServer,
+ lresource = <<"">>}}) ->
+ [LServer];
+ (_) ->
+ []
+ end, Items),
+ case List of
+ [] ->
+ add_response(RServer, not_supported, cached);
+ _ ->
+ Random = randoms:get_string(),
+ ID2 = <<RServer/binary, $/, Random/binary>>,
+ [send_query_info(Item, LServiceS, ID2) || Item <- List],
+ add_response(RServer, Response,
+ {wait_for_items_info, ID2, List})
+ end;
_ ->
- [send_query_info(Item, LServiceS) || Item <- List],
- delo_waiter(Waiter),
- add_waiter(Waiter#waiter{awaiting =
- {List, LServiceS, info},
- renewal = false})
+ ok
end;
_ ->
ok
%%% Check protocol support: Receive response: Received awaiter
%%%-------------------------
-received_awaiter(JID, Waiter, LServiceS) ->
- {JIDs, LServiceS, _} = Waiter#waiter.awaiting,
- delo_waiter(Waiter),
- Group = Waiter#waiter.group,
- RServer = Group#group.server,
+received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) ->
case lists:delete(JID, JIDs) of
- [] ->
- case Waiter#waiter.renewal of
- false ->
- add_response(RServer, not_supported),
- From = Waiter#waiter.sender,
- Packet = Waiter#waiter.packet,
- AAttrs = Waiter#waiter.aattrs,
- Others = Group#group.others,
- Addresses = Waiter#waiter.addresses,
- [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses)
- || ToUser <- Group#group.dests];
- true ->
- send_query_info(RServer, LServiceS),
- add_waiter(Waiter#waiter{awaiting =
- {[RServer], LServiceS, info},
- renewal = false})
- end;
- JIDs2 ->
- add_waiter(Waiter#waiter{awaiting =
- {JIDs2, LServiceS, info},
- renewal = false})
+ [] ->
+ add_response(RServer, not_supported, cached);
+ JIDs2 ->
+ add_response(RServer, Response, {wait_for_items_info, ID, JIDs2})
end.
%%%-------------------------
[{ram_copies, [node()]},
{attributes, record_info(fields, multicastc)}]).
-add_response(RServer, Response) ->
+add_response(RServer, Response, State) ->
Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
mnesia:dirty_write(#multicastc{rserver = RServer,
- response = Response, ts = Secs}).
+ response = {Response, State}, ts = Secs}).
-search_server_on_cache(RServer, LServerS, _Maxmins)
+search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins)
when RServer == LServerS ->
- {cached, local_server};
-search_server_on_cache(RServer, _LServerS, Maxmins) ->
+ route_single;
+search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) ->
case look_server(RServer) of
- not_cached -> not_cached;
- {cached, Response, Ts} ->
- Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
- case is_obsolete(Response, Ts, Now, Maxmins) of
- false -> {cached, Response};
- true -> {obsolete, Response}
- end
+ not_cached ->
+ query_info(RServer, LServiceS, not_supported),
+ route_single;
+ {cached, {Response, State}, TS} ->
+ Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
+ Response2 =
+ case State of
+ cached ->
+ case is_obsolete(Response, TS, Now, Maxmins) of
+ false -> ok;
+ true ->
+ query_info(RServer, LServiceS, Response)
+ end,
+ Response;
+ _ ->
+ if
+ Now - TS > ?MAXTIME_CACHE_NEGOTIATING ->
+ query_info(RServer, LServiceS, not_supported),
+ not_supported;
+ true ->
+ Response
+ end
+ end,
+ case Response2 of
+ not_supported -> route_single;
+ {multicast_supported, Service, Limits} ->
+ {route_multicast, Service, Limits}
+ end
end.
+query_info(RServer, LServiceS, Response) ->
+ Random = randoms:get_string(),
+ ID = <<RServer/binary, $/, Random/binary>>,
+ send_query_info(RServer, LServiceS, ID),
+ add_response(RServer, Response, {wait_for_info, ID}).
+
look_server(RServer) ->
case mnesia:dirty_read(multicastc, RServer) of
[] -> not_cached;
try_stop -> purge_loop_finished
end.
-%%%-------------------------
-%%% Pool
-%%%-------------------------
-
-create_pool() ->
- catch
- begin
- ets:new(multicastp,
- [duplicate_bag, public, named_table, {keypos, 2}]),
- ets:give_away(multicastp, whereis(ejabberd), ok)
- end.
-
-add_waiter(Waiter) ->
- true = ets:insert(multicastp, Waiter).
-
-delo_waiter(Waiter) ->
- true = ets:delete_object(multicastp, Waiter).
-
--spec search_waiter(binary(), binary(), info | items) ->
- {found_waiter, #waiter{}} | waiter_not_found.
-
-search_waiter(JID, LServiceS, Type) ->
- Rs = ets:foldl(fun (W, Res) ->
- {JIDs, LServiceS1, Type1} = W#waiter.awaiting,
- case lists:member(JID, JIDs) and
- (LServiceS == LServiceS1)
- and (Type1 == Type)
- of
- true -> Res ++ [W];
- false -> Res
- end
- end,
- [], multicastp),
- case Rs of
- [R | _] -> {found_waiter, R};
- [] -> waiter_not_found
- end.
-
%%%-------------------------
%%% Limits: utils
%%%-------------------------
build_remote_limit_record(LimitOpts, SenderT) ->
build_limit_record(LimitOpts, SenderT).
+-spec build_limit_record(any(), local | remote) -> #limits{}.
build_limit_record(LimitOpts, SenderT) ->
Limits = [get_limit_value(Name, Default, LimitOpts)
|| {Name, Default} <- list_of_limits(SenderT)],
list_to_tuple([limits | Limits]).
+-spec get_limit_value(atom(), integer(), any()) -> limit_value().
get_limit_value(Name, Default, LimitOpts) ->
case lists:keysearch(Name, 1, LimitOpts) of
{value, {Name, Number}} -> {custom, Number};
type_of_stanza(Stanza) -> element(1, Stanza).
+-spec get_limit_number(message | presence, #limits{}) -> limit_value().
get_limit_number(message, Limits) ->
Limits#limits.message;
get_limit_number(presence, Limits) ->
Limits#limits.presence.
+-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}.
get_slimit_group(local, SLimits) ->
SLimits#service_limits.local;
get_slimit_group(remote, SLimits) ->