+ send_items/4,
+ broadcast_stanza/6,
handle_cast({presence, JID}, State) ->
%% A new resource is available. send last published items
Host = State#state.host,
+ LJID = jlib:jid_tolower(JID),
%% for each node From is subscribed to
%% and if the node is so configured, send the last published item to From
- lists:foreach(fun(Type) ->
- {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
- lists:foreach(
- fun({Node, subscribed, SubJID}) ->
- case tree_action(Host, get_node, [Host, Node, JID]) of
- #pubsub_node{options = Options} ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- send_last_item(Host, Node, SubJID);
- _ ->
- ok
- end;
- _ ->
- ok
- end;
+ spawn(fun() ->
+ lists:foreach(fun(Type) ->
+ {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
+ lists:foreach(
+ fun({Node, subscribed, _SubJID}) ->
+ case tree_action(Host, get_node, [Host, Node, JID]) of
+ #pubsub_node{options = Options} ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ send_items(Host, Node, LJID, last);
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
(_) ->
- ok
- end, Subscriptions)
- end, State#state.plugins),
+ ok
+ end, Subscriptions)
+ end, State#state.plugins)
+ end),
{noreply, State};
handle_cast({presence, User, Server, Resources, JID}, State) ->
Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
Host = State#state.host,
ServerHost = State#state.server_host,
- lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- lists:foreach(fun(Resource) ->
- LJID = {User, Server, Resource},
- case is_caps_notify(ServerHost, Node, LJID) of
- true ->
- Subscribed = case get_option(Options, access_model) of
- open -> true;
- presence -> true;
- whitelist -> false; % subscribers are added manually
- authorize -> false; % likewise
- roster ->
- Grps = get_option(Options, roster_groups_allowed, []),
- {OU, OS, _} = Owner,
- element(2, get_roster_info(OU, OS, LJID, Grps))
- end,
- if Subscribed ->
- send_last_item(Owner, Node, LJID);
+ spawn(fun() ->
+ lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ lists:foreach(fun(Resource) ->
+ LJID = {User, Server, Resource},
+ case is_caps_notify(ServerHost, Node, LJID) of
true ->
+ Subscribed = case get_option(Options, access_model) of
+ open -> true;
+ presence -> true;
+ whitelist -> false; % subscribers are added manually
+ authorize -> false; % likewise
+ roster ->
+ Grps = get_option(Options, roster_groups_allowed, []),
+ {OU, OS, _} = Owner,
+ element(2, get_roster_info(OU, OS, LJID, Grps))
+ end,
+ if Subscribed ->
+ send_items(Owner, Node, LJID, last);
+ true ->
+ ok
+ end;
+ false ->
- end;
- false ->
- ok
- end
- end, Resources);
- _ ->
- ok
- end
- end, tree_action(Host, get_nodes, [Owner, JID])),
+ end
+ end, Resources);
+ _ ->
+ ok
+ end
+ end, tree_action(Host, get_nodes, [Owner, JID]))
+ end),
{noreply, State};
handle_cast({remove_user, LUser, LServer}, State) ->
transaction(Host, Node, Action, sync_dirty)
-iq_local(From, To, #iq{type = Type,
- sub_el = SubEl,
- xmlns = XMLNS,
- lang = Lang} = IQ) ->
+iq_local(From, To, #iq{type = Type, sub_el = SubEl, xmlns = XMLNS, lang = Lang} = IQ) ->
ServerHost = To#jid.lserver,
%% Accept IQs to server only from our own users.
%% @spec (Host, JID, Node, Subscription) -> void
%% Host = mod_pubsub:host()
%% JID = jlib:jid()
-%% Node = string()
+%% SNode = string()
%% Subscription = atom()
%% Plugins = [Plugin::string()]
%% @doc Send a message to JID with the supplied Subscription
-send_authorization_approval(Host, JID, Node, Subscription) ->
+send_authorization_approval(Host, JID, SNode, Subscription) ->
Stanza = event_stanza(
[{xmlelement, "subscription",
- [{"node", Node},
+ [{"node", SNode},
{"jid", jlib:jid_to_string(JID)},
{"subscription", subscription_to_string(Subscription)}],
{error, Error} ->
{error, Error};
{result, {Result, subscribed, send_last}} ->
- send_last_item(Host, Node, Subscriber),
+ send_items(Host, Node, Subscriber, last),
case Result of
default -> {result, Reply(subscribed)};
_ -> {result, Result}
PayloadSize > PayloadMaxSize ->
%% Entity attempts to publish very large payload
{error, extended_error(?ERR_NOT_ACCEPTABLE, "payload-too-big")};
- PayloadCount > 1 ->
+ PayloadCount =/= 1 ->
%% Entity attempts to publish item with multiple payload elements
{error, extended_error(?ERR_BAD_REQUEST, "invalid-payload")};
- Payload == "" ->
+ Payload == "" -> %% TODO better use PayloadSize == 0 ?
%% Publisher attempts to publish to payload node with no payload
{error, extended_error(?ERR_BAD_REQUEST, "payload-required")};
(DeliverPayloads == 0) and (PersistItems == 0) and (PayloadSize > 0) ->
%% Generate the XML response (Item list), limiting the
%% number of items sent to MaxItems:
- ItemsEls = lists:map(
- fun(#pubsub_item{itemid = {ItemId, _},
- payload = Payload}) ->
- ItemAttrs = case ItemId of
- "" -> [];
- _ -> [{"id", ItemId}]
- end,
- {xmlelement, "item", ItemAttrs, Payload}
- end, lists:sublist(SendItems, MaxItems)),
{result, [{xmlelement, "pubsub", [{"xmlns", ?NS_PUBSUB}],
[{xmlelement, "items", [{"node", node_to_string(Node)}],
- ItemsEls}]}]}
+ itemsEls(lists:sublist(SendItems, MaxItems))}]}]}
_ -> []
-%% @spec (Host, Node, LJID) -> any()
-%% Host = host()
-%% Node = pubsubNode()
-%% LJID = {U, S, []}
-%% @doc <p>Resend the last item of a node to the user.</p>
-send_last_item(Host, Node, LJID) ->
- send_items(Host, Node, LJID, last).
%% @spec (Host, Node, LJID, Number) -> any()
%% Host = host()
%% Node = pubsubNode()
- ItemsEls = lists:map(
- fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
- ItemAttrs = case ItemId of
- "" -> [];
- _ -> [{"id", ItemId}]
- end,
- {xmlelement, "item", ItemAttrs, Payload}
- end, ToSend),
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
- ItemsEls}]),
+ itemsEls(ToSend)}]),
ejabberd_router ! {route, service_jid(Host), jlib:make_jid(LJID), Stanza}.
%% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response}
%%%%%% broadcast functions
broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
+ %broadcast(Host, Node, none, true, "items", ItemEls)
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case node_call(Type, get_states, [Host, Node]) of
true -> Payload;
false -> []
- ItemAttrs = case ItemId of
- "" -> [];
- _ -> [{"id", ItemId}]
- end,
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
- [{xmlelement, "item", ItemAttrs, Content}]}]),
- broadcast_stanza(Host, Options, States, Stanza),
- broadcast_by_caps(Host, Node, Type, Stanza),
+ [{xmlelement, "item", itemAttr(ItemId), Content}]}]),
+ broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
broadcast_retract_items(Host, Node, ItemIds) ->
broadcast_retract_items(Host, Node, ItemIds, false).
broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
+ %broadcast(Host, Node, notify_retract, ForceNotify, "retract", RetractEls)
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case (get_option(Options, notify_retract) or ForceNotify) of
{result, []} ->
{result, false};
{result, States} ->
- RetractEls = lists:map(
- fun(ItemId) ->
- ItemAttrs = case ItemId of
- "" -> [];
- _ -> [{"id", ItemId}]
- end,
- {xmlelement, "retract", ItemAttrs, []}
- end, ItemIds),
+ RetractEls = [{xmlelement, "retract", itemAttr(ItemId), []} || ItemId <- ItemIds],
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
- broadcast_stanza(Host, Options, States, Stanza),
- broadcast_by_caps(Host, Node, Type, Stanza),
+ broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
transaction(Host, Node, Action, sync_dirty).
broadcast_purge_node(Host, Node) ->
+ %broadcast(Host, Node, notify_retract, false, "purge", [])
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case get_option(Options, notify_retract) of
{result, false};
{result, States} ->
Stanza = event_stanza(
- [{xmlelement, "purge", [{"node", node_to_string(Node)}], []}]),
- broadcast_stanza(Host, Options, States, Stanza),
- broadcast_by_caps(Host, Node, Type, Stanza),
+ [{xmlelement, "purge", [{"node", node_to_string(Node)}],
+ []}]),
+ broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
transaction(Host, Node, Action, sync_dirty).
broadcast_removed_node(Host, Node) ->
+ %broadcast(Host, Node, notify_delete, false, "delete", [])
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case get_option(Options, notify_delete) of
{result, false};
{result, States} ->
Stanza = event_stanza(
- [{xmlelement, "delete", [{"node", node_to_string(Node)}], []}]),
- broadcast_stanza(Host, Options, States, Stanza),
- broadcast_by_caps(Host, Node, Type, Stanza),
+ [{xmlelement, "delete", [{"node", node_to_string(Node)}],
+ []}]),
+ broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
transaction(Host, Node, Action, sync_dirty).
broadcast_config_notification(Host, Node, Lang) ->
+ %broadcast(Host, Node, notify_config, false, "items", ConfigEls)
Action =
fun(#pubsub_node{options = Options, owners = Owners, type = Type}) ->
case get_option(Options, notify_config) of
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
- [{xmlelement, "item", [{"id", "configuration"}],
- Content}]}]),
- broadcast_stanza(Host, Options, States, Stanza),
- broadcast_by_caps(Host, Node, Type, Stanza),
+ [{xmlelement, "item", itemAttr("configuration"), Content}]}]),
+ broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
transaction(Host, Node, Action, sync_dirty).
-broadcast_stanza(Host, NodeOpts, States, Stanza) ->
- PresenceDelivery = get_option(NodeOpts, presence_based_delivery),
- BroadcastAll = get_option(NodeOpts, broadcast_all_resources), %% XXX this is not standard
+% TODO: merge broadcast code that way
+%broadcast(Host, Node, Feature, Force, ElName, SubEls) ->
+% Action =
+% fun(#pubsub_node{options = Options, type = Type}) ->
+% case (get_option(Options, Feature) or Force) of
+% true ->
+% case node_call(Type, get_states, [Host, Node]) of
+% {result, []} ->
+% {result, false};
+% {result, States} ->
+% Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]),
+% broadcast_stanza(Host, Node, Type, Options, States, Stanza),
+% {result, true};
+% _ ->
+% {result, false}
+% end;
+% _ ->
+% {result, false}
+% end
+% end,
+% transaction(Host, Node, Action, sync_dirty).
+broadcast_stanza(Host, Node, _Type, Options, States, Stanza) ->
+ AccessModel = get_option(Options, access_model),
+ PresenceDelivery = get_option(Options, presence_based_delivery),
+ BroadcastAll = get_option(Options, broadcast_all_resources), %% XXX this is not standard, but usefull
From = service_jid(Host),
+ %% Handles explicit subscriptions
lists:foreach(fun(#pubsub_state{stateid = {LJID, _}, subscription = Subs}) ->
case is_to_deliver(LJID, Subs, PresenceDelivery) of
true ->
- JIDs = case BroadcastAll of
- true -> ejabberd_sm:get_user_resources(element(1, LJID), element(2, LJID));
- false -> [LJID]
+ To = case BroadcastAll of
+ true -> jlib:jid_remove_resource(LJID);
+ false -> LJID
- lists:foreach(fun(JID) ->
- ejabberd_router ! {route, From, jlib:make_jid(JID), Stanza}
- end, JIDs);
+ ejabberd_router ! {route, From, jlib:make_jid(To), Stanza};
false ->
- end, States).
-%% broadcast Stanza to all contacts of the user that are advertising
-%% interest in this kind of Node.
-broadcast_by_caps({LUser, LServer, LResource}, Node, _Type, Stanza) ->
- SenderResource = case LResource of
- [] -> hd(user_resources(LUser, LServer));
- _ -> LResource
- end,
- case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
- C2SPid when is_pid(C2SPid) ->
- %% set the from address on the notification to the bare JID of the account owner
- %% Also, add "replyto" if entity has presence subscription to the account owner
- %% See XEP-0163 1.1 section 4.3.1
- Sender = jlib:make_jid(LUser, LServer, ""),
- %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used
- case catch ejabberd_c2s:get_subscribed(C2SPid) of
- Contacts when is_list(Contacts) ->
- lists:foreach(fun({U, S, _}) ->
- JIDs = lists:foldl(fun(R, Acc) ->
- LJID = {U, S, R},
- case is_caps_notify(LServer, Node, LJID) of
- true -> [LJID | Acc];
- false -> Acc
- end
- end, [], user_resources(U, S)),
- lists:foreach(fun(JID) ->
- ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
- end, JIDs)
- end, Contacts);
+ end, States),
+ %% Handles implicit presence subscriptions
+ case Host of
+ {LUser, LServer, LResource} ->
+ SenderResource = case LResource of
+ [] ->
+ case user_resources(LUser, LServer) of
+ [Resource|_] -> Resource;
+ _ -> ""
+ end;
_ ->
- ok
+ LResource
- ok;
+ case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
+ C2SPid when is_pid(C2SPid) ->
+ %% set the from address on the notification to the bare JID of the account owner
+ %% Also, add "replyto" if entity has presence subscription to the account owner
+ %% See XEP-0163 1.1 section 4.3.1
+ Sender = jlib:make_jid(LUser, LServer, ""),
+ %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used
+ case catch ejabberd_c2s:get_subscribed(C2SPid) of
+ Contacts when is_list(Contacts) ->
+ lists:foreach(fun({U, S, _}) ->
+ spawn(fun() ->
+ JIDs = lists:foldl(fun(R, Acc) ->
+ LJID = {U, S, R},
+ case is_caps_notify(LServer, Node, LJID) of
+ true -> [LJID | Acc];
+ false -> Acc
+ end
+ end, [], user_resources(U, S)),
+ lists:foreach(fun(JID) ->
+ ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
+ end, JIDs)
+ end)
+ end, Contacts);
+ _ ->
+ ok
+ end,
+ ok;
+ _ ->
+ ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
+ ok
+ end;
_ ->
- ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
- end;
-broadcast_by_caps(_, _, _, _) ->
- ok.
+ end.
%% If we don't know the resource, just pick first if any
%% If no resource available, check if caps anyway (remote online)
{T1, T2, T3} = now(),
lists:flatten(io_lib:fwrite("~.16B~.16B~.16B", [T1, T2, T3])).
+% node attributes %%% TODO to be used
+nodeAttr(Node) ->
+ [{"node", node_to_string(Node)}].
+% item attributes
+itemAttr([]) -> [];
+itemAttr(ItemId) -> [{"id", ItemId}].
+% build item elements from item list
+itemsEls(Items) ->
+ lists:map(fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
+ {xmlelement, "item", itemAttr(ItemId), Payload}
+ end, Items).