]> granicus.if.org Git - ejabberd/commitdiff
pubsub: prevent blocking when sending lots of items, send last items to connected...
authorChristophe Romain <christophe.romain@process-one.net>
Wed, 8 Apr 2009 15:53:46 +0000 (15:53 +0000)
committerChristophe Romain <christophe.romain@process-one.net>
Wed, 8 Apr 2009 15:53:46 +0000 (15:53 +0000)
SVN Revision: 2005

src/mod_pubsub/mod_pubsub.erl
src/mod_pubsub/node_flat.erl

index 5310df12bddf20f48e81a8e992383cb83aa0a2de..e946a46a66ffb29cafefa04d321bdf279ba0d925 100644 (file)
@@ -75,6 +75,8 @@
         unsubscribe_node/5,
         publish_item/6,
         delete_item/4,
+        send_items/4,
+        broadcast_stanza/6,
         get_configure/5,
         set_configure/5,
         get_items/3,
@@ -479,27 +481,30 @@ handle_call(stop, _From, State) ->
 handle_cast({presence, JID}, State) ->
     %% A new resource is available. send last published items
     Host = State#state.host,
+    LJID = jlib:jid_tolower(JID),
     %% for each node From is subscribed to
     %% and if the node is so configured, send the last published item to From
-    lists:foreach(fun(Type) ->
-       {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
-       lists:foreach(
-           fun({Node, subscribed, SubJID}) -> 
-               case tree_action(Host, get_node, [Host, Node, JID]) of
-                   #pubsub_node{options = Options} ->
-                       case get_option(Options, send_last_published_item) of
-                           on_sub_and_presence ->
-                               send_last_item(Host, Node, SubJID);
-                           _ ->
-                               ok
-                       end;
-                   _ ->
-                       ok
-               end;
+    spawn(fun() ->
+       lists:foreach(fun(Type) ->
+           {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
+           lists:foreach(
+               fun({Node, subscribed, _SubJID}) -> 
+                   case tree_action(Host, get_node, [Host, Node, JID]) of
+                       #pubsub_node{options = Options} ->
+                           case get_option(Options, send_last_published_item) of
+                               on_sub_and_presence ->
+                                   send_items(Host, Node, LJID, last);
+                               _ ->
+                                   ok
+                           end;
+                       _ ->
+                           ok
+                   end;
                (_) ->
-               ok
-           end, Subscriptions)
-    end, State#state.plugins),
+                   ok
+               end, Subscriptions)
+       end, State#state.plugins)
+    end),
     {noreply, State};
 
 handle_cast({presence, User, Server, Resources, JID}, State) ->
@@ -507,36 +512,38 @@ handle_cast({presence, User, Server, Resources, JID}, State) ->
     Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
     Host = State#state.host,
     ServerHost = State#state.server_host,
-    lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
-       case get_option(Options, send_last_published_item) of
-           on_sub_and_presence ->
-               lists:foreach(fun(Resource) ->
-                   LJID = {User, Server, Resource},
-                   case is_caps_notify(ServerHost, Node, LJID) of
-                       true ->
-                           Subscribed = case get_option(Options, access_model) of
-                                   open -> true;
-                                   presence -> true;
-                                   whitelist -> false; % subscribers are added manually
-                                   authorize -> false; % likewise
-                                   roster ->
-                                       Grps = get_option(Options, roster_groups_allowed, []),
-                                       {OU, OS, _} = Owner,
-                                       element(2, get_roster_info(OU, OS, LJID, Grps))
-                           end,
-                           if Subscribed ->
-                               send_last_item(Owner, Node, LJID);
+    spawn(fun() ->
+       lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
+           case get_option(Options, send_last_published_item) of
+               on_sub_and_presence ->
+                   lists:foreach(fun(Resource) ->
+                       LJID = {User, Server, Resource},
+                       case is_caps_notify(ServerHost, Node, LJID) of
                            true ->
+                               Subscribed = case get_option(Options, access_model) of
+                                       open -> true;
+                                       presence -> true;
+                                       whitelist -> false; % subscribers are added manually
+                                       authorize -> false; % likewise
+                                       roster ->
+                                           Grps = get_option(Options, roster_groups_allowed, []),
+                                           {OU, OS, _} = Owner,
+                                           element(2, get_roster_info(OU, OS, LJID, Grps))
+                               end,
+                               if Subscribed ->
+                                   send_items(Owner, Node, LJID, last);
+                               true ->
+                                   ok
+                               end;
+                           false ->
                                ok
-                           end;
-                       false ->
-                           ok
-                   end
-               end, Resources);
-           _ ->
-               ok
-       end
-    end, tree_action(Host, get_nodes, [Owner, JID])),
+                       end
+                   end, Resources);
+               _ ->
+                   ok
+           end
+       end, tree_action(Host, get_nodes, [Owner, JID]))
+    end),
     {noreply, State};
 
 handle_cast({remove_user, LUser, LServer}, State) ->
@@ -836,10 +843,7 @@ iq_disco_items(Host, Item, From) ->
            transaction(Host, Node, Action, sync_dirty)
     end.
 
-iq_local(From, To, #iq{type = Type,
-                      sub_el = SubEl,
-                      xmlns = XMLNS,
-                      lang = Lang} = IQ) ->
+iq_local(From, To, #iq{type = Type, sub_el = SubEl, xmlns = XMLNS, lang = Lang} = IQ) ->
     ServerHost = To#jid.lserver,
     %% Accept IQs to server only from our own users.
     if
@@ -1091,14 +1095,14 @@ find_authorization_response(Packet) ->
 %% @spec (Host, JID, Node, Subscription) -> void
 %%      Host = mod_pubsub:host()
 %%      JID = jlib:jid()
-%%      Node = string()
+%%      SNode = string()
 %%      Subscription = atom()
 %%      Plugins = [Plugin::string()]
 %% @doc Send a message to JID with the supplied Subscription
-send_authorization_approval(Host, JID, Node, Subscription) ->
+send_authorization_approval(Host, JID, SNode, Subscription) ->
     Stanza = event_stanza(
        [{xmlelement, "subscription",
-        [{"node", Node},
+        [{"node", SNode},
          {"jid", jlib:jid_to_string(JID)},
          {"subscription", subscription_to_string(Subscription)}],
         []}]),
@@ -1450,7 +1454,7 @@ subscribe_node(Host, Node, From, JID) ->
        {error, Error} ->
            {error, Error};
        {result, {Result, subscribed, send_last}} ->
-           send_last_item(Host, Node, Subscriber),
+           send_items(Host, Node, Subscriber, last),
            case Result of
                default -> {result, Reply(subscribed)};
                _ -> {result, Result}
@@ -1539,10 +1543,10 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
                        PayloadSize > PayloadMaxSize ->
                            %% Entity attempts to publish very large payload
                            {error, extended_error(?ERR_NOT_ACCEPTABLE, "payload-too-big")};
-                       PayloadCount > 1 ->
+                       PayloadCount =/= 1 ->
                            %% Entity attempts to publish item with multiple payload elements
                            {error, extended_error(?ERR_BAD_REQUEST, "invalid-payload")};
-                       Payload == "" ->
+                       Payload == "" -> %% TODO better use PayloadSize == 0 ?
                            %% Publisher attempts to publish to payload node with no payload
                            {error, extended_error(?ERR_BAD_REQUEST, "payload-required")};
                        (DeliverPayloads == 0) and (PersistItems == 0) and (PayloadSize > 0) ->
@@ -1760,18 +1764,9 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) ->
                        end,
                    %% Generate the XML response (Item list), limiting the
                    %% number of items sent to MaxItems:
-                   ItemsEls = lists:map(
-                                   fun(#pubsub_item{itemid = {ItemId, _},
-                                                   payload = Payload}) ->
-                                           ItemAttrs = case ItemId of
-                                                           "" -> [];
-                                                           _ -> [{"id", ItemId}]
-                                                       end,
-                                           {xmlelement, "item", ItemAttrs, Payload}
-                                   end, lists:sublist(SendItems, MaxItems)),
                    {result, [{xmlelement, "pubsub", [{"xmlns", ?NS_PUBSUB}],
                                [{xmlelement, "items", [{"node", node_to_string(Node)}],
-                                   ItemsEls}]}]}
+                                 itemsEls(lists:sublist(SendItems, MaxItems))}]}]}
            end
     end.
 
@@ -1781,14 +1776,6 @@ get_items(Host, Node, From) ->
        _ -> []
     end.
 
-%% @spec (Host, Node, LJID) -> any()
-%%      Host = host()
-%%      Node = pubsubNode()
-%%      LJID = {U, S, []}
-%% @doc <p>Resend the last item of a node to the user.</p>
-send_last_item(Host, Node, LJID) ->
-    send_items(Host, Node, LJID, last).
-
 %% @spec (Host, Node, LJID, Number) -> any()
 %%      Host = host()
 %%      Node = pubsubNode()
@@ -1821,17 +1808,9 @@ send_items(Host, Node, LJID, Number) ->
                    Items
            end
     end,
-    ItemsEls = lists:map(
-               fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
-                   ItemAttrs = case ItemId of
-                       "" -> [];
-                       _ -> [{"id", ItemId}]
-                   end,
-                   {xmlelement, "item", ItemAttrs, Payload}
-               end, ToSend),
     Stanza = event_stanza(
        [{xmlelement, "items", [{"node", node_to_string(Node)}],
-        ItemsEls}]),
+         itemsEls(ToSend)}]),
     ejabberd_router ! {route, service_jid(Host), jlib:make_jid(LJID), Stanza}.
 
 %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response}
@@ -2210,6 +2189,7 @@ event_stanza(Els) ->
 %%%%%% broadcast functions
 
 broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
+    %broadcast(Host, Node, none, true, "items", ItemEls)
     Action =
        fun(#pubsub_node{options = Options, type = Type}) ->
            case node_call(Type, get_states, [Host, Node]) of
@@ -2220,15 +2200,10 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
                        true -> Payload;
                        false -> []
                    end,
-                   ItemAttrs = case ItemId of
-                       "" -> [];
-                       _ -> [{"id", ItemId}]
-                   end,
                    Stanza = event_stanza(
                        [{xmlelement, "items", [{"node", node_to_string(Node)}],
-                        [{xmlelement, "item", ItemAttrs, Content}]}]),
-                   broadcast_stanza(Host, Options, States, Stanza),
-                   broadcast_by_caps(Host, Node, Type, Stanza),
+                        [{xmlelement, "item", itemAttr(ItemId), Content}]}]),
+                   broadcast_stanza(Host, Node, Type, Options, States, Stanza),
                    {result, true};
                _ ->
                    {result, false}
@@ -2239,6 +2214,7 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
 broadcast_retract_items(Host, Node, ItemIds) ->
     broadcast_retract_items(Host, Node, ItemIds, false).
 broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
+    %broadcast(Host, Node, notify_retract, ForceNotify, "retract", RetractEls)
     Action =
        fun(#pubsub_node{options = Options, type = Type}) ->
            case (get_option(Options, notify_retract) or ForceNotify) of
@@ -2247,19 +2223,11 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
                        {result, []} -> 
                            {result, false};
                        {result, States} ->
-                           RetractEls = lists:map(
-                               fun(ItemId) ->
-                                   ItemAttrs = case ItemId of
-                                       "" -> [];
-                                       _ -> [{"id", ItemId}]
-                                   end,
-                                   {xmlelement, "retract", ItemAttrs, []}
-                               end, ItemIds),
+                           RetractEls = [{xmlelement, "retract", itemAttr(ItemId), []} || ItemId <- ItemIds],
                            Stanza = event_stanza(
                                [{xmlelement, "items", [{"node", node_to_string(Node)}],
                                 RetractEls}]),
-                           broadcast_stanza(Host, Options, States, Stanza),
-                           broadcast_by_caps(Host, Node, Type, Stanza),
+                           broadcast_stanza(Host, Node, Type, Options, States, Stanza),
                            {result, true};
                        _ ->
                            {result, false}
@@ -2271,6 +2239,7 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
     transaction(Host, Node, Action, sync_dirty).
 
 broadcast_purge_node(Host, Node) ->
+    %broadcast(Host, Node, notify_retract, false, "purge", [])
     Action =
        fun(#pubsub_node{options = Options, type = Type}) ->
            case get_option(Options, notify_retract) of
@@ -2280,9 +2249,9 @@ broadcast_purge_node(Host, Node) ->
                            {result, false};
                        {result, States} ->
                            Stanza = event_stanza(
-                               [{xmlelement, "purge", [{"node", node_to_string(Node)}], []}]),
-                           broadcast_stanza(Host, Options, States, Stanza),
-                           broadcast_by_caps(Host, Node, Type, Stanza),
+                               [{xmlelement, "purge", [{"node", node_to_string(Node)}],
+                                []}]),
+                           broadcast_stanza(Host, Node, Type, Options, States, Stanza),
                            {result, true};
                        _ -> 
                            {result, false}
@@ -2294,6 +2263,7 @@ broadcast_purge_node(Host, Node) ->
     transaction(Host, Node, Action, sync_dirty).
 
 broadcast_removed_node(Host, Node) ->
+    %broadcast(Host, Node, notify_delete, false, "delete", [])
     Action =
        fun(#pubsub_node{options = Options, type = Type}) ->
            case get_option(Options, notify_delete) of
@@ -2303,9 +2273,9 @@ broadcast_removed_node(Host, Node) ->
                            {result, false};
                        {result, States} ->
                            Stanza = event_stanza(
-                               [{xmlelement, "delete", [{"node", node_to_string(Node)}], []}]),
-                           broadcast_stanza(Host, Options, States, Stanza),
-                           broadcast_by_caps(Host, Node, Type, Stanza),
+                               [{xmlelement, "delete", [{"node", node_to_string(Node)}],
+                                []}]),
+                           broadcast_stanza(Host, Node, Type, Options, States, Stanza),
                            {result, true};
                        _ ->
                            {result, false}
@@ -2317,6 +2287,7 @@ broadcast_removed_node(Host, Node) ->
     transaction(Host, Node, Action, sync_dirty).
 
 broadcast_config_notification(Host, Node, Lang) ->
+    %broadcast(Host, Node, notify_config, false, "items", ConfigEls)
     Action =
        fun(#pubsub_node{options = Options, owners = Owners, type = Type}) ->
            case get_option(Options, notify_config) of
@@ -2334,10 +2305,8 @@ broadcast_config_notification(Host, Node, Lang) ->
                            end,
                            Stanza = event_stanza(
                                [{xmlelement, "items", [{"node", node_to_string(Node)}],
-                                [{xmlelement, "item", [{"id", "configuration"}],
-                                 Content}]}]),
-                           broadcast_stanza(Host, Options, States, Stanza),
-                           broadcast_by_caps(Host, Node, Type, Stanza),
+                                [{xmlelement, "item", itemAttr("configuration"), Content}]}]),
+                           broadcast_stanza(Host, Node, Type, Options, States, Stanza),
                            {result, true};
                        _ -> 
                            {result, false}
@@ -2348,63 +2317,92 @@ broadcast_config_notification(Host, Node, Lang) ->
        end,
     transaction(Host, Node, Action, sync_dirty).
 
-broadcast_stanza(Host, NodeOpts, States, Stanza) ->
-    PresenceDelivery = get_option(NodeOpts, presence_based_delivery),
-    BroadcastAll = get_option(NodeOpts, broadcast_all_resources), %% XXX this is not standard
+% TODO: merge broadcast code that way
+%broadcast(Host, Node, Feature, Force, ElName, SubEls) ->
+%    Action =
+%      fun(#pubsub_node{options = Options, type = Type}) ->
+%          case (get_option(Options, Feature) or Force) of
+%              true ->
+%                  case node_call(Type, get_states, [Host, Node]) of
+%                      {result, []} -> 
+%                          {result, false};
+%                      {result, States} ->
+%                          Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]),
+%                          broadcast_stanza(Host, Node, Type, Options, States, Stanza),
+%                          {result, true};
+%                      _ ->
+%                          {result, false}
+%                  end;
+%              _ ->
+%                  {result, false}
+%          end
+%      end,
+%    transaction(Host, Node, Action, sync_dirty).
+
+broadcast_stanza(Host, Node, _Type, Options, States, Stanza) ->
+    AccessModel = get_option(Options, access_model),
+    PresenceDelivery = get_option(Options, presence_based_delivery),
+    BroadcastAll = get_option(Options, broadcast_all_resources), %% XXX this is not standard, but usefull
     From = service_jid(Host),
+    %% Handles explicit subscriptions
     lists:foreach(fun(#pubsub_state{stateid = {LJID, _}, subscription = Subs}) ->
        case is_to_deliver(LJID, Subs, PresenceDelivery) of
            true ->
-               JIDs = case BroadcastAll of
-                   true -> ejabberd_sm:get_user_resources(element(1, LJID), element(2, LJID));
-                   false -> [LJID]
+               To = case BroadcastAll of
+                   true -> jlib:jid_remove_resource(LJID);
+                   false -> LJID
                end,
-               lists:foreach(fun(JID) ->
-                   ejabberd_router ! {route, From, jlib:make_jid(JID), Stanza}
-               end, JIDs);
+               ejabberd_router ! {route, From, jlib:make_jid(To), Stanza};
            false ->
                ok
        end
-    end, States).
-
-%% broadcast Stanza to all contacts of the user that are advertising
-%% interest in this kind of Node.
-broadcast_by_caps({LUser, LServer, LResource}, Node, _Type, Stanza) ->
-    SenderResource = case LResource of
-       [] -> hd(user_resources(LUser, LServer));
-       _ -> LResource
-    end,
-    case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
-       C2SPid when is_pid(C2SPid) ->
-           %% set the from address on the notification to the bare JID of the account owner
-           %% Also, add "replyto" if entity has presence subscription to the account owner
-           %% See XEP-0163 1.1 section 4.3.1
-           Sender = jlib:make_jid(LUser, LServer, ""),
-           %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource),  % This has to be used
-           case catch ejabberd_c2s:get_subscribed(C2SPid) of
-               Contacts when is_list(Contacts) ->
-                   lists:foreach(fun({U, S, _}) ->
-                       JIDs = lists:foldl(fun(R, Acc) ->
-                           LJID = {U, S, R}, 
-                           case is_caps_notify(LServer, Node, LJID) of
-                               true -> [LJID | Acc];
-                               false -> Acc
-                           end
-                       end, [], user_resources(U, S)),
-                       lists:foreach(fun(JID) ->
-                           ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
-                       end, JIDs)
-                   end, Contacts);
+    end, States),
+    %% Handles implicit presence subscriptions
+    case Host of
+       {LUser, LServer, LResource} ->
+           SenderResource = case LResource of
+               [] -> 
+                   case user_resources(LUser, LServer) of
+                       [Resource|_] -> Resource;
+                       _ -> ""
+                   end;
                _ ->
-                   ok
+                   LResource
            end,
-           ok;
+           case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
+               C2SPid when is_pid(C2SPid) ->
+                   %% set the from address on the notification to the bare JID of the account owner
+                   %% Also, add "replyto" if entity has presence subscription to the account owner
+                   %% See XEP-0163 1.1 section 4.3.1
+                   Sender = jlib:make_jid(LUser, LServer, ""),
+                   %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource),  % This has to be used
+                   case catch ejabberd_c2s:get_subscribed(C2SPid) of
+                       Contacts when is_list(Contacts) ->
+                           lists:foreach(fun({U, S, _}) ->
+                               spawn(fun() ->
+                                   JIDs = lists:foldl(fun(R, Acc) ->
+                                       LJID = {U, S, R}, 
+                                       case is_caps_notify(LServer, Node, LJID) of
+                                           true -> [LJID | Acc];
+                                           false -> Acc
+                                       end
+                                   end, [], user_resources(U, S)),
+                                   lists:foreach(fun(JID) ->
+                                       ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
+                                   end, JIDs)
+                               end)
+                           end, Contacts);
+                       _ ->
+                           ok
+                   end,
+                   ok;
+               _ ->
+                   ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
+                   ok
+           end;
        _ ->
-           ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
            ok
-    end;
-broadcast_by_caps(_, _, _, _) ->
-    ok.
+    end.
 
 %% If we don't know the resource, just pick first if any
 %% If no resource available, check if caps anyway (remote online)
@@ -2861,3 +2859,16 @@ uniqid() ->
     {T1, T2, T3} = now(),
     lists:flatten(io_lib:fwrite("~.16B~.16B~.16B", [T1, T2, T3])).
 
+% node attributes %%% TODO to be used
+nodeAttr(Node) ->
+    [{"node", node_to_string(Node)}].
+
+% item attributes
+itemAttr([]) -> [];
+itemAttr(ItemId) -> [{"id", ItemId}].
+
+% build item elements from item list
+itemsEls(Items) ->
+    lists:map(fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
+       {xmlelement, "item", itemAttr(ItemId), Payload}
+    end, Items).
index 776ede57466aefddb1a3236d160290955055c45f..a5abb26c1dafe6e4dd17dbe00721b9e01d6d5717 100644 (file)
@@ -16,7 +16,7 @@
 %%% This software is copyright 2006-2009, ProcessOne.
 %%%
 %%% @copyright 2006-2009 ProcessOne
-%%% @author Christophe romain <christophe.romain@process-one.net>
+%%% @author Christophe Romain <christophe.romain@process-one.net>
 %%%   [http://www.process-one.net/]
 %%% @version {@vsn}, {@date} {@time}
 %%% @end
@@ -81,7 +81,7 @@ options() ->
      {roster_groups_allowed, []},
      {publish_model, publishers},
      {max_payload_size, ?MAX_PAYLOAD_SIZE},
-     {send_last_published_item, never},
+     {send_last_published_item, on_sub_and_presence},
      {deliver_notifications, true},
      {presence_based_delivery, false}].
 
@@ -166,7 +166,7 @@ get_items(Host, Node, JID, AccessModel, PresenceSubscription, RosterGroup, SubId
 
 get_item(Host, Node, ItemId) ->
     node_default:get_item(Host, Node, ItemId).
-       
+
 get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId) ->
     node_default:get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId).