+%% calls for parallel sending of last items
+ ]).
-define(PROCNAME, ejabberd_mod_pubsub).
-define(PLUGIN_PREFIX, "node_").
-define(TREE_PREFIX, "nodetree_").
pep_mapping = [],
+ pep_sendlast_offline,
nodetree = ?STDTREE,
- plugins = [?STDNODE]}).
+ plugins = [?STDNODE],
+ send_loop}).
%% API
?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
Access = gen_mod:get_opt(access_createnode, Opts, all),
+ PepOffline = gen_mod:get_opt(pep_sendlast_offline, Opts, false),
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
mod_disco:register_feature(ServerHost, ?NS_PUBSUB),
ejabberd_hooks:add(disco_sm_identity, ServerHost, ?MODULE, disco_sm_identity, 75),
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
init_nodes(Host, ServerHost),
- {ok, #state{host = Host,
+ State = #state{host = Host,
server_host = ServerHost,
access = Access,
pep_mapping = PepMapping,
+ pep_sendlast_offline = PepOffline,
nodetree = NodeTree,
- plugins = Plugins}}.
+ plugins = Plugins},
+ SendLoop = spawn(?MODULE, send_loop, [State]), %% TODO supervise that process
+ {ok, State#state{send_loop = SendLoop}}.
%% @spec (Host, ServerHost, Opts) -> Plugins
%% Host = mod_pubsub:host() Opts = [{Key,Value}]
+send_loop(State) ->
+ receive
+ {presence, JID, Pid} ->
+ Host =,
+ ServerHost = State#state.server_host,
+ LJID = jlib:jid_tolower(JID),
+ BJID = jlib:jid_remove_resource(LJID),
+ %% for each node From is subscribed to
+ %% and if the node is so configured, send the last published item to From
+ lists:foreach(fun(Type) ->
+ {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
+ lists:foreach(
+ fun({Node, subscribed, SubJID}) ->
+ if (SubJID == LJID) or (SubJID == BJID) ->
+ case tree_action(Host, get_node, [Host, Node, JID]) of
+ #pubsub_node{options = Options} ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ send_items(Host, Node, SubJID, last);
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ true ->
+ % resource not concerned about that subscription
+ ok
+ end;
+ (_) ->
+ ok
+ end, Subscriptions)
+ end, State#state.plugins),
+ %% and force send the last PEP events published by its offline and local contacts
+ %% only if pubsub is explicitely configured for that.
+ %% this is a hack in a sense that PEP should only be based on presence
+ %% and is not able to "store" events of remote users (via s2s)
+ %% this makes that hack only work for local domain by now
+ if State#state.pep_sendlast_offline ->
+ case catch ejabberd_c2s:get_subscribed(Pid) of
+ Contacts when is_list(Contacts) ->
+ {User, Server, Resource} = jlib:jid_tolower(JID),
+ lists:foreach(
+ fun({U, S, R}) -> %% local contacts
+ case ejabberd_sm:get_user_resources(U, S) of
+ [] -> %% offline
+ case S of
+ ServerHost -> %% local contact, so we may have pep items
+ PeerJID = jlib:make_jid(U, S, R),
+ handle_cast({presence, User, Server, [Resource], PeerJID}, State);
+ _ -> %% remote contact, no items available
+ ok
+ end;
+ _ -> %% online
+ % this is already handled by presence probe
+ ok
+ end;
+ (_) -> %% remote contacts
+ % we can not do anything in any cases
+ ok
+ end, Contacts);
+ _ ->
+ ok
+ end;
+ true ->
+ ok
+ end,
+ send_loop(State);
+ {presence, User, Server, Resources, JID} ->
+ Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
+ Host =,
+ ServerHost = State#state.server_host,
+ lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ lists:foreach(fun(Resource) ->
+ LJID = {User, Server, Resource},
+ case is_caps_notify(ServerHost, Node, LJID) of
+ true ->
+ Subscribed = case get_option(Options, access_model) of
+ open -> true;
+ presence -> true;
+ whitelist -> false; % subscribers are added manually
+ authorize -> false; % likewise
+ roster ->
+ Grps = get_option(Options, roster_groups_allowed, []),
+ {OU, OS, _} = Owner,
+ element(2, get_roster_info(OU, OS, LJID, Grps))
+ end,
+ if Subscribed ->
+ send_items(Owner, Node, LJID, last);
+ true ->
+ ok
+ end;
+ false ->
+ ok
+ end
+ end, Resources);
+ _ ->
+ ok
+ end
+ end, tree_action(Host, get_nodes, [Owner, JID])),
+ send_loop(State);
+ stop ->
+ ok
+ end.
%% -------
%% disco hooks handling functions
%% presence hooks handling functions
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, _Pid) ->
+presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, Pid) ->
Proc = gen_mod:get_module_proc(Server, ?PROCNAME),
- gen_server:cast(Proc, {presence, JID}),
+ gen_server:cast(Proc, {presence, JID, Pid}),
gen_server:cast(Proc, {presence, User, Server, [Resource], JID});
presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, #jid{lserver = Host} = JID, _Pid) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
%% Description: Handling cast messages
%% @private
-handle_cast({presence, JID}, State) ->
+handle_cast({presence, JID, Pid}, State) ->
%% A new resource is available. send last published items
- Host =,
- LJID = jlib:jid_tolower(JID),
- %% for each node From is subscribed to
- %% and if the node is so configured, send the last published item to From
- spawn(fun() ->
- lists:foreach(fun(Type) ->
- {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
- lists:foreach(
- fun({Node, subscribed, _SubJID}) ->
- case tree_action(Host, get_node, [Host, Node, JID]) of
- #pubsub_node{options = Options} ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- send_items(Host, Node, LJID, last);
- _ ->
- ok
- end;
- _ ->
- ok
- end;
- (_) ->
- ok
- end, Subscriptions)
- end, State#state.plugins)
- end),
+ State#state.send_loop ! {presence, JID, Pid},
{noreply, State};
handle_cast({presence, User, Server, Resources, JID}, State) ->
%% A new resource is available. send last published PEP items
- Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
- Host =,
- ServerHost = State#state.server_host,
- spawn(fun() ->
- lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- lists:foreach(fun(Resource) ->
- LJID = {User, Server, Resource},
- case is_caps_notify(ServerHost, Node, LJID) of
- true ->
- Subscribed = case get_option(Options, access_model) of
- open -> true;
- presence -> true;
- whitelist -> false; % subscribers are added manually
- authorize -> false; % likewise
- roster ->
- Grps = get_option(Options, roster_groups_allowed, []),
- {OU, OS, _} = Owner,
- element(2, get_roster_info(OU, OS, LJID, Grps))
- end,
- if Subscribed ->
- send_items(Owner, Node, LJID, last);
- true ->
- ok
- end;
- false ->
- ok
- end
- end, Resources);
- _ ->
- ok
- end
- end, tree_action(Host, get_nodes, [Owner, JID]))
- end),
+ State#state.send_loop ! {presence, User, Server, Resources, JID},
{noreply, State};
handle_cast({remove_user, LUser, LServer}, State) ->
terminate(_Reason, #state{host = Host,
server_host = ServerHost,
nodetree = TreePlugin,
- plugins = Plugins}) ->
+ plugins = Plugins,
+ send_loop = SendLoop}) ->
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
case lists:member(?PEPNODE, Plugins) of
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER),
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB),
+ SendLoop ! stop,
%% Note: Multiple Node Discovery not supported (mask on pubsub#type)
%% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
%% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
- %% TODO That is, remove name attribute
+ %% TODO That is, remove name attribute (or node?, please check)
Action =
fun(#pubsub_node{type = Type}) ->
NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
- Reply = [],
+ Reply = [], %% TODO EJAB-909
case transaction(Host, Node, Action, sync_dirty) of
{error, ?ERR_ITEM_NOT_FOUND} ->
%% handles auto-create feature