]> granicus.if.org Git - ejabberd/commitdiff
PubSub: refactor send_last_items remove send_loop
authorChristophe Romain <christophe.romain@process-one.net>
Fri, 11 Aug 2017 08:20:33 +0000 (10:20 +0200)
committerChristophe Romain <christophe.romain@process-one.net>
Fri, 11 Aug 2017 08:20:33 +0000 (10:20 +0200)
src/mod_pubsub.erl

index 194d12444c773b671762bfc8d9b6c2601ff11971..054770829ec4272ced0fb1b74b478e7d8af3b0a8 100644 (file)
@@ -52,7 +52,7 @@
 %% exports for hooks
 -export([presence_probe/3, caps_add/3, caps_update/3,
     in_subscription/6, out_subscription/4,
-    on_user_offline/3, remove_user/2,
+    on_user_online/1, on_user_offline/2, remove_user/2,
     disco_local_identity/5, disco_local_features/5,
     disco_local_items/5, disco_sm_identity/5,
     disco_sm_features/5, disco_sm_items/5,
 %% API and gen_server callbacks
 -export([start/2, stop/1, init/1,
     handle_call/3, handle_cast/2, handle_info/2,
-    terminate/2, code_change/3, depends/2, export/1]).
-
--export([send_loop/1, mod_opt_type/1]).
-
--define(LOOPNAME, ejabberd_mod_pubsub_loop).
+    terminate/2, code_change/3, depends/2, export/1, mod_opt_type/1]).
 
 %%====================================================================
 %% API
@@ -300,7 +296,9 @@ init([ServerHost, Opts]) ->
                                                ?MODULE, process_commands, IQDisc),
                  Plugins
          end, Hosts),
-    ejabberd_hooks:add(sm_remove_connection_hook, ServerHost,
+    ejabberd_hooks:add(c2s_session_opened, ServerHost,
+       ?MODULE, on_user_online, 75),
+    ejabberd_hooks:add(c2s_terminated, ServerHost,
        ?MODULE, on_user_offline, 75),
     ejabberd_hooks:add(disco_local_identity, ServerHost,
        ?MODULE, disco_local_identity, 75),
@@ -337,34 +335,16 @@ init([ServerHost, Opts]) ->
        false ->
            ok
     end,
-    {_, State} = init_send_loop(ServerHost, Hosts),
-    {ok, State}.
-
-init_send_loop(ServerHost, Hosts) ->
     NodeTree = config(ServerHost, nodetree),
     Plugins = config(ServerHost, plugins),
-    LastItemCache = config(ServerHost, last_item_cache),
-    MaxItemsNode = config(ServerHost, max_items_node),
     PepMapping = config(ServerHost, pep_mapping),
-    PepOffline = config(ServerHost, ignore_pep_from_offline),
-    Access = config(ServerHost, access),
     DBType = gen_mod:db_type(ServerHost, ?MODULE),
-    State = #state{hosts = Hosts, server_host = ServerHost,
-           access = Access, pep_mapping = PepMapping,
-           ignore_pep_from_offline = PepOffline,
-           last_item_cache = LastItemCache,
-           max_items_node = MaxItemsNode, nodetree = NodeTree,
-           plugins = Plugins, db_type = DBType},
-    Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME),
-    Pid = case whereis(Proc) of
-       undefined ->
-           SendLoop = spawn(?MODULE, send_loop, [State]),
-           register(Proc, SendLoop),
-           SendLoop;
-       Loop ->
-           Loop
-    end,
-    {Pid, State}.
+    {ok, #state{hosts = Hosts, server_host = ServerHost,
+               access = Access, pep_mapping = PepMapping,
+               ignore_pep_from_offline = PepOffline,
+               last_item_cache = LastItemCache,
+               max_items_node = MaxItemsNode, nodetree = NodeTree,
+               plugins = Plugins, db_type = DBType}}.
 
 depends(ServerHost, Opts) ->
     Host = gen_mod:get_opt_host(ServerHost, Opts, <<"pubsub.@HOST@">>),
@@ -414,94 +394,6 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
     TreePlugin:terminate(Host, ServerHost),
     ok.
 
-get_subscribed(User, Server) ->
-    Items = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]),
-    lists:filtermap(
-      fun(#roster{jid = LJID, subscription = Sub})
-           when Sub == both orelse Sub == from ->
-             {true, LJID};
-        (_) ->
-             false
-      end, Items).
-
-send_loop(State) ->
-    receive
-       {presence, JID, _Pid} ->
-           ServerHost = State#state.server_host,
-           Host = host(State#state.server_host),
-           DBType = State#state.db_type,
-           LJID = jid:tolower(JID),
-           BJID = jid:remove_resource(LJID),
-           lists:foreach(
-               fun(PType) ->
-                       Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
-                       lists:foreach(
-                           fun({NodeRec, _, _, SubJID}) ->
-                                   {_, Node} = NodeRec#pubsub_node.nodeid,
-                                   Nidx = NodeRec#pubsub_node.id,
-                                   Options = NodeRec#pubsub_node.options,
-                                   [send_items(Host, Node, Nidx, PType, Options, SubJID, last)
-                                    || NodeRec#pubsub_node.type == PType]
-                           end,
-                           lists:usort(Subs))
-               end,
-               State#state.plugins),
-           if not State#state.ignore_pep_from_offline ->
-                   {User, Server, Resource} = LJID,
-                   Contacts = get_subscribed(User, Server),
-                   lists:foreach(
-                     fun({U, S, R}) when S == ServerHost ->
-                             case user_resources(U, S) of
-                                 [] -> %% offline
-                                     PeerJID = jid:make(U, S, R),
-                                     self() !  {presence, User, Server, [Resource], PeerJID};
-                                 _ -> %% online
-                                     %% this is already handled by presence probe
-                                     ok
-                             end;
-                        (_) ->
-                             %% we can not do anything in any cases
-                             ok
-                     end, Contacts);
-               true ->
-                   ok
-           end,
-           send_loop(State);
-       {presence, User, Server, Resources, JID} ->
-           spawn(fun() ->
-                       Host = host(State#state.server_host),
-                       Owner = jid:remove_resource(jid:tolower(JID)),
-                       lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
-                                   case match_option(Options, send_last_published_item, on_sub_and_presence) of
-                                       true ->
-                                           lists:foreach(fun(Resource) ->
-                                                       LJID = {User, Server, Resource},
-                                                       Subscribed = case get_option(Options, access_model) of
-                                                           open -> true;
-                                                           presence -> true;
-                                                           whitelist -> false; % subscribers are added manually
-                                                           authorize -> false; % likewise
-                                                           roster ->
-                                                               Grps = get_option(Options, roster_groups_allowed, []),
-                                                               {OU, OS, _} = Owner,
-                                                               element(2, get_roster_info(OU, OS, LJID, Grps))
-                                                       end,
-                                                       if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, LJID, last);
-                                                           true -> ok
-                                                       end
-                                               end,
-                                               Resources);
-                                       _ ->
-                                           ok
-                                   end
-                           end,
-                           tree_action(Host, get_nodes, [Owner, JID]))
-               end),
-           send_loop(State);
-       stop ->
-           ok
-    end.
-
 %% -------
 %% disco hooks handling functions
 %%
@@ -660,12 +552,12 @@ disco_items(Host, Node, From) ->
     end.
 
 %% -------
-%% presence hooks handling functions
+%% presence and session hooks handling functions
 %%
 
 -spec caps_add(jid(), jid(), [binary()]) -> ok.
-caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features)
-       when Host =/= S ->
+caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
+  when S1 =/= S2 ->
     %% When a remote contact goes online while the local user is offline, the
     %% remote contact won't receive last items from the local user even if
     %% ignore_pep_from_offline is set to false. To work around this issue a bit,
@@ -675,30 +567,36 @@ caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID
     %% contact becomes available; the former is also executed when the local
     %% user goes online (because that triggers the contact to send a presence
     %% packet with CAPS).
-    presence(Host, {presence, U, S, [R], JID});
+    send_last_pep(To, From);
 caps_add(_From, _To, _Feature) ->
     ok.
 
 -spec caps_update(jid(), jid(), [binary()]) -> ok.
-caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) ->
-    presence(Host, {presence, U, S, [R], JID}).
+caps_update(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
+  when S1 =/= S2 ->
+    send_last_pep(To, From).
 
 -spec presence_probe(jid(), jid(), pid()) -> ok.
 presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
     %% ignore presence_probe from my other ressources
-    %% to not get duplicated last items
     ok;
-presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, Pid) ->
-    presence(S, {presence, From, Pid}),
-    presence(S, {presence, From#jid.luser, S, [From#jid.lresource], To});
+presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, _Pid) ->
+    send_last_pep(To, From);
 presence_probe(_From, _To, _Pid) ->
-    %% ignore presence_probe from remote contacts,
-    %% those are handled via caps_add
+    %% ignore presence_probe from remote contacts, those are handled via caps_add
     ok.
 
-presence(ServerHost, Presence) ->
-    gen_mod:get_module_proc(ServerHost, ?LOOPNAME) ! Presence,
-    ok.
+-spec on_user_online(ejabberd_c2s:state()) -> ejabberd_c2s:state().
+on_user_online(C2SState) ->
+    JID = maps:get(jid, C2SState),
+    send_last_items(JID),
+    C2SState.
+
+-spec on_user_offline(ejabberd_c2s:state(), atom()) -> ejabberd_c2s:state().
+on_user_offline(C2SState, _Reason) ->
+    JID = maps:get(jid, C2SState),
+    purge_offline(jid:tolower(JID)),
+    C2SState.
 
 %% -------
 %% subscription hooks handling functions
@@ -707,14 +605,8 @@ presence(ServerHost, Presence) ->
 -spec out_subscription(
        binary(), binary(), jid(),
        subscribed | unsubscribed | subscribe | unsubscribe) -> boolean().
-out_subscription(User, Server, JID, subscribed) ->
-    Owner = jid:make(User, Server),
-    {PUser, PServer, PResource} = jid:tolower(JID),
-    PResources = case PResource of
-       <<>> -> user_resources(PUser, PServer);
-       _ -> [PResource]
-    end,
-    presence(Server, {presence, PUser, PServer, PResources, Owner}),
+out_subscription(User, Server, To, subscribed) ->
+    send_last_pep(jid:make(User, Server), To),
     true;
 out_subscription(_, _, _, _) ->
     true.
@@ -883,7 +775,9 @@ terminate(_Reason,
        false ->
            ok
     end,
-    ejabberd_hooks:delete(sm_remove_connection_hook, ServerHost,
+    ejabberd_hooks:delete(c2s_session_opened, ServerHost,
+       ?MODULE, on_user_online, 75),
+    ejabberd_hooks:delete(c2s_terminated, ServerHost,
        ?MODULE, on_user_offline, 75),
     ejabberd_hooks:delete(disco_local_identity, ServerHost,
        ?MODULE, disco_local_identity, 75),
@@ -901,12 +795,6 @@ terminate(_Reason,
        ?MODULE, remove_user, 50),
     ejabberd_hooks:delete(c2s_handle_info, ServerHost,
        ?MODULE, c2s_handle_info, 50),
-    case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of
-       undefined ->
-           ?ERROR_MSG("~s process is dead, pubsub was broken", [?LOOPNAME]);
-       Pid ->
-           Pid ! stop
-    end,
     lists:foreach(
       fun(Host) ->
              gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
@@ -1797,7 +1685,7 @@ subscribe_node(Host, Node, From, JID, Configuration) ->
            Nidx = TNode#pubsub_node.id,
            Type = TNode#pubsub_node.type,
            Options = TNode#pubsub_node.options,
-           send_items(Host, Node, Nidx, Type, Options, Subscriber, last),
+           send_items(Host, Node, Nidx, Type, Options, Subscriber, 1),
            ServerHost = serverhost(Host),
            ejabberd_hooks:run(pubsub_subscribe_node, ServerHost,
                [ServerHost, Host, Node, Subscriber, SubId]),
@@ -2069,8 +1957,6 @@ purge_node(Host, Node, Owner) ->
 %% @doc <p>Return the items of a given node.</p>
 %% <p>The number of items to return is limited by MaxItems.</p>
 %% <p>The permission are not checked in this function.</p>
-%% @todo We probably need to check that the user doing the query has the right
-%% to read the items.
 -spec get_items(host(), binary(), jid(), binary(),
                binary(), [binary()], undefined | rsm_set()) ->
                       {result, pubsub()} | {error, stanza_error()}.
@@ -2153,43 +2039,23 @@ get_allowed_items_call(Host, Nidx, From, Type, Options, Owners, RSM) ->
     {PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups),
     node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]).
 
-get_last_items(Host, Type, Nidx, LJID, Count) ->
+get_last_items(Host, Type, Nidx, LJID, 1) ->
     case get_cached_item(Host, Nidx) of
        undefined ->
-           case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
+           case node_action(Host, Type, get_last_items, [Nidx, LJID, 1]) of
                {result, Items} -> Items;
-             _ -> []
+               _ -> []
            end;
        LastItem ->
            [LastItem]
-    end.
-
-%% @doc <p>Resend the items of a node to the user.</p>
-%% @todo use cache-last-item feature
-send_items(Host, Node, Nidx, Type, Options, LJID, last) ->
-    case get_last_items(Host, Type, Nidx, LJID, 1) of
-       [LastItem] ->
-           Stanza = items_event_stanza(Node, Options, [LastItem]),
-           dispatch_items(Host, LJID, Node, Stanza);
-       _ ->
-           ok
     end;
-send_items(Host, Node, Nidx, Type, Options, LJID, Number) when Number > 0 ->
-    Stanza = items_event_stanza(Node, Options, get_last_items(Host, Type, Nidx, Number, LJID)),
-    dispatch_items(Host, LJID, Node, Stanza);
-send_items(Host, Node, _Nidx, _Type, Options, LJID, _) ->
-    Stanza = items_event_stanza(Node, Options, []),
-    dispatch_items(Host, LJID, Node, Stanza).
-
-dispatch_items({FromU, FromS, FromR}, To, Node, Stanza) ->
-    SenderResource = user_resource(FromU, FromS, FromR),
-    ejabberd_sm:route(jid:make(FromU, FromS, SenderResource),
-                     {send_filtered, {pep_message, <<((Node))/binary, "+notify">>},
-                      jid:make(FromU, FromS), jid:make(To),
-                      Stanza});
-dispatch_items(From, To, _Node, Stanza) ->
-    ejabberd_router:route(
-      xmpp:set_from_to(Stanza, service_jid(From), jid:make(To))).
+get_last_items(Host, Type, Nidx, LJID, Count) when Count > 1 ->
+    case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
+       {result, Items} -> Items;
+       _ -> []
+    end;
+get_last_items(_Host, _Type, _Nidx, _LJID, _Count) ->
+    [].
 
 %% @doc <p>Return the list of affiliations as an XMPP response.</p>
 -spec get_affiliations(host(), binary(), jid(), [binary()]) ->
@@ -2536,14 +2402,15 @@ get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) ->
     {result, Subs} = node_action(Host, PType,
            get_entity_subscriptions_for_send_last,
            [Host, JID]),
-    [{Node, Sub, SubId, SubJID}
+    [{Node, SubId, SubJID}
        || {Node, Sub, SubId, SubJID} <- Subs,
            Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
+           % sql version already filter result by on_sub_and_presence
 get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) ->
     {result, Subs} = node_action(Host, PType,
            get_entity_subscriptions,
            [Host, JID]),
-    [{Node, Sub, SubId, SubJID}
+    [{Node, SubId, SubJID}
        || {Node, Sub, SubId, SubJID} <- Subs,
            Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID),
            match_option(Node, send_last_published_item, on_sub_and_presence)].
@@ -2932,8 +2799,9 @@ get_options_for_subs(Host, Nidx, Subs, true) ->
 broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
     NotificationType = get_option(NodeOptions, notification_type, headline),
     BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull
-    From = service_jid(Host),
-    Stanza = add_message_type(BaseStanza, NotificationType),
+    Stanza = add_message_type(
+              xmpp:set_from(BaseStanza, service_jid(Host)),
+              NotificationType),
     %% Handles explicit subscriptions
     SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth),
     lists:foreach(fun ({LJID, _NodeName, SubIDs}) ->
@@ -2956,7 +2824,7 @@ broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType
                end,
                lists:foreach(fun(To) ->
                            ejabberd_router:route(
-                             xmpp:set_from_to(StanzaToSend, From, jid:make(To)))
+                             xmpp:set_to(StanzaToSend, jid:make(To)))
                    end, LJIDs)
        end, SubIDsByJID).
 
@@ -2965,55 +2833,145 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, Nidx, Type, NodeO
     %% Handles implicit presence subscriptions
     SenderResource = user_resource(LUser, LServer, LResource),
     NotificationType = get_option(NodeOptions, notification_type, headline),
-    Stanza = add_message_type(BaseStanza, NotificationType),
+    Stanza = add_message_type(
+              xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
+              NotificationType),
     %% set the from address on the notification to the bare JID of the account owner
     %% Also, add "replyto" if entity has presence subscription to the account owner
     %% See XEP-0163 1.1 section 4.3.1
     ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
                      {pep_message, <<((Node))/binary, "+notify">>,
-                      jid:make(LUser, LServer),
                       add_extended_headers(
                         Stanza, extended_headers([Publisher]))});
 broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
     broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM).
 
 -spec c2s_handle_info(ejabberd_c2s:state(), term()) -> ejabberd_c2s:state().
-c2s_handle_info(#{server := Server} = C2SState,
-               {pep_message, Feature, From, Packet}) ->
-    LServer = jid:nameprep(Server),
-    lists:foreach(
-      fun({USR, Caps}) ->
-             Features = mod_caps:get_features(LServer, Caps),
-             case lists:member(Feature, Features) of
-                 true ->
-                     To = jid:make(USR),
-                     NewPacket = xmpp:set_from_to(Packet, From, To),
-                     ejabberd_router:route(NewPacket);
-                 false ->
-                     ok
-             end
-      end, mod_caps:list_features(C2SState)),
+c2s_handle_info(#{lserver := LServer} = C2SState,
+               {pep_message, Feature, Packet}) ->
+    [maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet)
+     || {USR, Caps} <- mod_caps:list_features(C2SState)],
     {stop, C2SState};
-c2s_handle_info(#{server := Server} = C2SState,
-               {send_filtered, {pep_message, Feature}, From, To, Packet}) ->
-    LServer = jid:nameprep(Server),
-    case mod_caps:get_user_caps(To, C2SState) of
-       {ok, Caps} ->
-           Features = mod_caps:get_features(LServer, Caps),
-           case lists:member(Feature, Features) of
-               true ->
-                   NewPacket = xmpp:set_from_to(Packet, From, To),
-                   ejabberd_router:route(NewPacket);
-               false ->
-                   ok
-           end;
-       error ->
-           ok
+c2s_handle_info(#{lserver := LServer} = C2SState,
+               {pep_message, Feature, Packet, USR}) ->
+    case mod_caps:get_user_caps(USR, C2SState) of
+       {ok, Caps} -> maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet);
+       error -> ok
     end,
     {stop, C2SState};
 c2s_handle_info(C2SState, _) ->
     C2SState.
 
+send_items(Host, Node, Nidx, Type, Options, LJID, Number) ->
+    send_items(Host, Node, Nidx, Type, Options, Host, LJID, LJID, Number).
+send_items(Host, Node, Nidx, Type, Options, Publisher, SubLJID, ToLJID, Number) ->
+    case get_last_items(Host, Type, Nidx, SubLJID, Number) of
+       [] ->
+           ok;
+       Items ->
+           Stanza = items_event_stanza(Node, Options, Items),
+           send_stanza(Host, Publisher, ToLJID, Node, Stanza)
+    end.
+
+send_stanza({LUser, LServer, LResource}, Publisher, USR, Node, BaseStanza) ->
+    SenderResource = user_resource(LUser, LServer, LResource),
+    Stanza = xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
+    USRs = case USR of
+              {PUser, PServer, <<>>} ->
+                  [{PUser, PServer, PRessource}
+                   || PRessource <- user_resources(PUser, PServer)];
+              _ ->
+                  [USR]
+          end,
+    [ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
+                     {pep_message, <<((Node))/binary, "+notify">>,
+                      add_extended_headers(
+                        Stanza, extended_headers([Publisher])),
+                      To}) || To <- USRs];
+send_stanza(Host, _Publisher, USR, _Node, Stanza) ->
+    ejabberd_router:route(
+      xmpp:set_from_to(Stanza, service_jid(Host), jid:make(USR))).
+
+maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet) ->
+    Features = mod_caps:get_features(LServer, Caps),
+    case lists:member(Feature, Features) of
+       true ->
+           ejabberd_router:route(xmpp:set_to(Packet, jid:make(USR)));
+       false ->
+           ok
+    end.
+
+send_last_items(JID) ->
+    ?DEBUG("~s", [jid:to_string(JID)]),
+    ServerHost = JID#jid.lserver,
+    Host = host(ServerHost),
+    DBType = config(ServerHost, db_type),
+    LJID = jid:tolower(JID),
+    BJID = jid:remove_resource(LJID),
+    lists:foreach(
+      fun(PType) ->
+             Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
+             lists:foreach(
+               fun({#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx,
+                                 options = Options}, _, SubJID})
+                     when Type == PType->
+                       send_items(Host, Node, Nidx, PType, Options, Host, SubJID, LJID, 1);
+                  (_) ->
+                       ok
+               end,
+               lists:usort(Subs))
+      end, config(ServerHost, plugins)).
+% pep_from_offline hack can not work anymore, as sender c2s does not
+% exists when sender is offline, so we can't get match receiver caps
+% does it make sens to send PEP from an offline contact anyway ?
+%    case config(ServerHost, ignore_pep_from_offline) of
+%      false ->
+%          Roster = ejabberd_hooks:run_fold(roster_get, ServerHost, [],
+%                                           [{JID#jid.luser, ServerHost}]),
+%          lists:foreach(
+%            fun(#roster{jid = {U, S, R}, subscription = Sub})
+%                  when Sub == both orelse Sub == from,
+%                       S == ServerHost ->
+%                    case user_resources(U, S) of
+%                        [] -> send_last_pep(jid:make(U, S, R), JID);
+%                        _ -> ok %% this is already handled by presence probe
+%                    end;
+%               (_) ->
+%                    ok %% we can not do anything in any cases
+%            end, Roster);
+%      true ->
+%          ok
+%    end.
+send_last_pep(From, To) ->
+    ?DEBUG("~s -> ~s", [jid:to_string(From), jid:to_string(To)]),
+    ServerHost = From#jid.lserver,
+    Host = host(ServerHost),
+    Publisher = jid:tolower(From),
+    Owner = jid:remove_resource(Publisher),
+    lists:foreach(
+      fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
+             case match_option(Options, send_last_published_item, on_sub_and_presence) of
+                 true ->
+                     LJID = jid:tolower(To),
+                     Subscribed = case get_option(Options, access_model) of
+                                      open -> true;
+                                      presence -> true;
+                                      whitelist -> false; % subscribers are added manually
+                                      authorize -> false; % likewise
+                                      roster ->
+                                          Grps = get_option(Options, roster_groups_allowed, []),
+                                          {OU, OS, _} = Owner,
+                                          element(2, get_roster_info(OU, OS, LJID, Grps))
+                                  end,
+                     if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, Publisher, LJID, LJID, 1);
+                        true -> ok
+                     end;
+                 _ ->
+                     ok
+             end
+      end,
+      tree_action(Host, get_nodes, [Owner, From])).
+
 subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
     NodesToDeliver = fun (Depth, Node, Subs, Acc) ->
        NodeName = case Node#pubsub_node.nodeid of
@@ -3186,9 +3144,6 @@ node_owners_call(_Host, _Type, _Nidx, Owners) ->
 %% @doc <p>Return the maximum number of items for a given node.</p>
 %% <p>Unlimited means that there is no limit in the number of items that can
 %% be stored.</p>
-%% @todo In practice, the current data structure means that we cannot manage
-%% millions of items on a given node. This should be addressed in a new
-%% version.
 -spec max_items(host(), [{atom(), any()}]) -> non_neg_integer().
 max_items(Host, Options) ->
     case get_option(Options, persist_items) of
@@ -3792,14 +3747,6 @@ subid_shim(SubIds) ->
 extended_headers(Jids) ->
     [#address{type = replyto, jid = Jid} || Jid <- Jids].
 
--spec on_user_offline(ejabberd_sm:sid(), jid(), ejabberd_sm:info()) -> ok.
-on_user_offline(_, JID, _) ->
-    {User, Server, Resource} = jid:tolower(JID),
-    case user_resources(User, Server) of
-       [] -> purge_offline({User, Server, Resource});
-       _ -> ok
-    end.
-
 -spec purge_offline(ljid()) -> ok.
 purge_offline(LJID) ->
     Host = host(element(2, LJID)),
@@ -3840,7 +3787,7 @@ purge_offline(LJID) ->
                            end
                    end, lists:usort(lists:flatten(Affs)));
        {Error, _} ->
-           ?DEBUG("on_user_offline ~p", [Error])
+           ?ERROR_MSG("can not purge offline: ~p", [Error])
     end.
 
 -spec purge_offline(host(), ljid(), binary()) -> ok | {error, stanza_error()}.
@@ -3852,13 +3799,13 @@ purge_offline(Host, LJID, Node) ->
        {result, {[], _}} ->
            ok;
        {result, {Items, _}} ->
-           {User, Server, _} = LJID,
+           {User, Server, Resource} = LJID,
            PublishModel = get_option(Options, publish_model),
            ForceNotify = get_option(Options, notify_retract),
            {_, NodeId} = Node#pubsub_node.nodeid,
            lists:foreach(fun
-                   (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, _}}})
-                           when (U == User) and (S == Server) ->
+                   (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, R}}})
+                           when (U == User) and (S == Server) and (R == Resource) ->
                        case node_action(Host, Type, delete_item, [Nidx, {U, S, <<>>}, PublishModel, ItemId]) of
                            {result, {_, broadcast}} ->
                                broadcast_retract_items(Host, NodeId, Nidx, Type, Options, [ItemId], ForceNotify),