]> granicus.if.org Git - ejabberd/commitdiff
improve send last published items spawning
authorChristophe Romain <christophe.romain@process-one.net>
Wed, 22 Apr 2009 22:19:41 +0000 (22:19 +0000)
committerChristophe Romain <christophe.romain@process-one.net>
Wed, 22 Apr 2009 22:19:41 +0000 (22:19 +0000)
SVN Revision: 2036

ChangeLog
src/mod_pubsub/mod_pubsub.erl

index b95eb85127263f90d8718d3cf682a22b4640f2cf..ec4108c5310c7e3f29b576e0da2ea75367331110 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2009-04-23  Christophe Romain <christophe.romain@process-one.net>
+
+       * src/mod_pubsub/mod_pubsub.erl: improve send last published items
+       (not spawned as much) and allow to send last PEP items of our offline
+       contacts if configured for (fix discussion issue on standars ML)
+
 2009-04-22  Badlop  <badlop@process-one.net>
 
        * src/ejabberd.cfg.example: Fix English typos. Fix line length:
index 240a40b60482737e85eeea8fb5d2ef3e1a3b39db..1ce3f2cba4cba69a79edcc5659d76041a01669db 100644 (file)
         code_change/3
        ]).
 
+%% calls for parallel sending of last items
+-export([send_loop/1
+       ]).
+
 -define(PROCNAME, ejabberd_mod_pubsub).
 -define(PLUGIN_PREFIX, "node_").
 -define(TREE_PREFIX, "nodetree_").
                host,
                access,
                pep_mapping = [],
+               pep_sendlast_offline,
                nodetree = ?STDTREE,
-               plugins = [?STDNODE]}).
+               plugins = [?STDNODE],
+               send_loop}).
 
 %%====================================================================
 %% API
@@ -157,6 +163,7 @@ init([ServerHost, Opts]) ->
     ?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
     Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
     Access = gen_mod:get_opt(access_createnode, Opts, all),
+    PepOffline = gen_mod:get_opt(pep_sendlast_offline, Opts, false),
     IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
     mod_disco:register_feature(ServerHost, ?NS_PUBSUB),
     ejabberd_hooks:add(disco_sm_identity, ServerHost, ?MODULE, disco_sm_identity, 75),
@@ -188,12 +195,15 @@ init([ServerHost, Opts]) ->
     ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
     ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
     init_nodes(Host, ServerHost),
-    {ok, #state{host = Host,
+    State = #state{host = Host,
                server_host = ServerHost,
                access = Access,
                pep_mapping = PepMapping,
+               pep_sendlast_offline = PepOffline,
                nodetree = NodeTree,
-               plugins = Plugins}}.
+               plugins = Plugins},
+    SendLoop = spawn(?MODULE, send_loop, [State]),  %% TODO supervise that process
+    {ok, State#state{send_loop = SendLoop}}.
 
 %% @spec (Host, ServerHost, Opts) -> Plugins
 %%      Host = mod_pubsub:host()   Opts = [{Key,Value}]
@@ -308,6 +318,113 @@ update_database(Host) ->
            ok
     end.
 
+send_loop(State) ->
+    receive
+    {presence, JID, Pid} ->
+       Host = State#state.host,
+       ServerHost = State#state.server_host,
+       LJID = jlib:jid_tolower(JID),
+       BJID = jlib:jid_remove_resource(LJID),
+       %% for each node From is subscribed to
+       %% and if the node is so configured, send the last published item to From
+       lists:foreach(fun(Type) ->
+           {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
+           lists:foreach(
+               fun({Node, subscribed, SubJID}) -> 
+                   if (SubJID == LJID) or (SubJID == BJID) ->
+                       case tree_action(Host, get_node, [Host, Node, JID]) of
+                           #pubsub_node{options = Options} ->
+                               case get_option(Options, send_last_published_item) of
+                                   on_sub_and_presence ->
+                                       send_items(Host, Node, SubJID, last);
+                                   _ ->
+                                       ok
+                               end;
+                           _ ->
+                               ok
+                       end;
+                   true ->
+                       % resource not concerned about that subscription
+                       ok
+                   end;
+                  (_) ->
+                   ok
+               end, Subscriptions)
+       end, State#state.plugins),
+       %% and force send the last PEP events published by its offline and local contacts
+       %% only if pubsub is explicitely configured for that.
+       %% this is a hack in a sense that PEP should only be based on presence
+       %% and is not able to "store" events of remote users (via s2s)
+       %% this makes that hack only work for local domain by now
+       if State#state.pep_sendlast_offline ->
+           case catch ejabberd_c2s:get_subscribed(Pid) of
+           Contacts when is_list(Contacts) ->
+               {User, Server, Resource} = jlib:jid_tolower(JID),
+               lists:foreach(
+                   fun({U, S, R}) ->  %% local contacts
+                       case ejabberd_sm:get_user_resources(U, S) of
+                       [] -> %% offline
+                           case S of
+                           ServerHost -> %% local contact, so we may have pep items
+                               PeerJID = jlib:make_jid(U, S, R),
+                               handle_cast({presence, User, Server, [Resource], PeerJID}, State);
+                           _ -> %% remote contact, no items available
+                               ok
+                           end;
+                       _ -> %% online
+                           % this is already handled by presence probe
+                           ok
+                       end;
+                   (_) ->  %% remote contacts
+                       % we can not do anything in any cases
+                       ok
+               end, Contacts);
+           _ ->
+               ok
+           end;
+       true ->
+           ok
+       end,
+       send_loop(State);
+    {presence, User, Server, Resources, JID} ->
+       Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
+       Host = State#state.host,
+       ServerHost = State#state.server_host,
+       lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
+           case get_option(Options, send_last_published_item) of
+               on_sub_and_presence ->
+                   lists:foreach(fun(Resource) ->
+                       LJID = {User, Server, Resource},
+                       case is_caps_notify(ServerHost, Node, LJID) of
+                           true ->
+                               Subscribed = case get_option(Options, access_model) of
+                                       open -> true;
+                                       presence -> true;
+                                       whitelist -> false; % subscribers are added manually
+                                       authorize -> false; % likewise
+                                       roster ->
+                                           Grps = get_option(Options, roster_groups_allowed, []),
+                                           {OU, OS, _} = Owner,
+                                           element(2, get_roster_info(OU, OS, LJID, Grps))
+                               end,
+                               if Subscribed ->
+                                   send_items(Owner, Node, LJID, last);
+                               true ->
+                                   ok
+                               end;
+                           false ->
+                               ok
+                       end
+                   end, Resources);
+               _ ->
+                   ok
+           end
+       end, tree_action(Host, get_nodes, [Owner, JID])),
+       send_loop(State);
+    stop ->
+       ok
+    end.
+
 %% -------
 %% disco hooks handling functions
 %%
@@ -415,9 +532,9 @@ disco_sm_items(Acc, From, To, Node, _Lang) ->
 %% presence hooks handling functions
 %%
 
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, _Pid) ->
+presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, Pid) ->
     Proc = gen_mod:get_module_proc(Server, ?PROCNAME),
-    gen_server:cast(Proc, {presence, JID}),
+    gen_server:cast(Proc, {presence, JID, Pid}),
     gen_server:cast(Proc, {presence, User, Server, [Resource], JID});
 presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, #jid{lserver = Host} = JID, _Pid) ->
     Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
@@ -478,72 +595,14 @@ handle_call(stop, _From, State) ->
 %% Description: Handling cast messages
 %%--------------------------------------------------------------------
 %% @private
-handle_cast({presence, JID}, State) ->
+handle_cast({presence, JID, Pid}, State) ->
     %% A new resource is available. send last published items
-    Host = State#state.host,
-    LJID = jlib:jid_tolower(JID),
-    %% for each node From is subscribed to
-    %% and if the node is so configured, send the last published item to From
-    spawn(fun() ->
-       lists:foreach(fun(Type) ->
-           {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
-           lists:foreach(
-               fun({Node, subscribed, _SubJID}) -> 
-                   case tree_action(Host, get_node, [Host, Node, JID]) of
-                       #pubsub_node{options = Options} ->
-                           case get_option(Options, send_last_published_item) of
-                               on_sub_and_presence ->
-                                   send_items(Host, Node, LJID, last);
-                               _ ->
-                                   ok
-                           end;
-                       _ ->
-                           ok
-                   end;
-               (_) ->
-                   ok
-               end, Subscriptions)
-       end, State#state.plugins)
-    end),
+    State#state.send_loop ! {presence, JID, Pid},
     {noreply, State};
 
 handle_cast({presence, User, Server, Resources, JID}, State) ->
     %% A new resource is available. send last published PEP items
-    Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
-    Host = State#state.host,
-    ServerHost = State#state.server_host,
-    spawn(fun() ->
-       lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
-           case get_option(Options, send_last_published_item) of
-               on_sub_and_presence ->
-                   lists:foreach(fun(Resource) ->
-                       LJID = {User, Server, Resource},
-                       case is_caps_notify(ServerHost, Node, LJID) of
-                           true ->
-                               Subscribed = case get_option(Options, access_model) of
-                                       open -> true;
-                                       presence -> true;
-                                       whitelist -> false; % subscribers are added manually
-                                       authorize -> false; % likewise
-                                       roster ->
-                                           Grps = get_option(Options, roster_groups_allowed, []),
-                                           {OU, OS, _} = Owner,
-                                           element(2, get_roster_info(OU, OS, LJID, Grps))
-                               end,
-                               if Subscribed ->
-                                   send_items(Owner, Node, LJID, last);
-                               true ->
-                                   ok
-                               end;
-                           false ->
-                               ok
-                       end
-                   end, Resources);
-               _ ->
-                   ok
-           end
-       end, tree_action(Host, get_nodes, [Owner, JID]))
-    end),
+    State#state.send_loop ! {presence, User, Server, Resources, JID},
     {noreply, State};
 
 handle_cast({remove_user, LUser, LServer}, State) ->
@@ -600,7 +659,8 @@ handle_info(_Info, State) ->
 terminate(_Reason, #state{host = Host,
                          server_host = ServerHost,
                          nodetree = TreePlugin,
-                         plugins = Plugins}) ->
+                         plugins = Plugins,
+                         send_loop = SendLoop}) ->
     terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
     ejabberd_router:unregister_route(Host),
     case lists:member(?PEPNODE, Plugins) of
@@ -622,6 +682,7 @@ terminate(_Reason, #state{host = Host,
     gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB),
     gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER),
     mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB),
+    SendLoop ! stop,
     ok.
 
 %%--------------------------------------------------------------------
@@ -820,7 +881,7 @@ iq_disco_items(Host, Item, From) ->
            %% Note: Multiple Node Discovery not supported (mask on pubsub#type)
            %% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
            %% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
-           %% TODO That is, remove name attribute
+           %% TODO That is, remove name attribute (or node?, please check)
            Action =
                fun(#pubsub_node{type = Type}) ->
                        NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
@@ -1563,7 +1624,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
                    end
            end,
     ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
-    Reply = [],
+    Reply = [], %% TODO EJAB-909
     case transaction(Host, Node, Action, sync_dirty) of
        {error, ?ERR_ITEM_NOT_FOUND} ->
            %% handles auto-create feature