-export([get_commands_spec/0, delete_old_sessions/1]).
%% API (used by mod_push_keepalive).
--export([notify/2, notify/4, notify/6, is_message_with_body/1]).
+-export([notify/3, notify/5, notify/7, is_incoming_chat_msg/1]).
%% For IQ callbacks
-export([delete_session/3]).
-type timestamp() :: erlang:timestamp().
-type push_session() :: {timestamp(), ljid(), binary(), xdata()}.
-type err_reason() :: notfound | db_failure.
+-type direction() :: send | recv | undefined.
-callback init(binary(), gen_mod:opts())
-> any().
c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State,
Pkt, _SendResult) ->
?DEBUG("Notifying client of stanza", []),
- notify(State, xmpp_util:unwrap_carbon(Pkt)),
+ notify(State, unwrap_carbon(Pkt), get_direction(Pkt)),
State;
c2s_stanza(State, _Pkt, _SendResult) ->
State.
-spec mam_message(message() | drop, binary(), binary(), jid(),
chat | groupchat, recv | send) -> message().
-mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, _Dir) ->
+mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, Dir) ->
case lookup_sessions(LUser, LServer) of
{ok, [_|_] = Clients} ->
case drop_online_sessions(LUser, LServer, Clients) of
[_|_] = Clients1 ->
?DEBUG("Notifying ~s@~s of MAM message", [LUser, LServer]),
- notify(LUser, LServer, Clients1, Pkt);
+ notify(LUser, LServer, Clients1, Pkt, Dir);
[] ->
ok
end;
case lookup_sessions(LUser, LServer) of
{ok, [_|_] = Clients} ->
?DEBUG("Notifying ~s@~s of offline message", [LUser, LServer]),
- notify(LUser, LServer, Clients, Pkt);
+ notify(LUser, LServer, Clients, Pkt, recv);
_ ->
ok
end,
case p1_queue:len(Queue) of
Len when Len > 0 ->
?DEBUG("Notifying client of unacknowledged stanza(s)", []),
- Pkt = mod_stream_mgmt:queue_find(fun is_message_with_body/1, Queue),
- notify(State, xmpp_util:unwrap_carbon(Pkt)),
+ {Pkt, Dir} = case mod_stream_mgmt:queue_find(
+ fun is_incoming_chat_msg/1, Queue) of
+ none -> {none, undefined};
+ Pkt0 -> {unwrap_carbon(Pkt0), get_direction(Pkt0)}
+ end,
+ notify(State, Pkt, Dir),
State;
0 ->
State
%%--------------------------------------------------------------------
%% Generate push notifications.
%%--------------------------------------------------------------------
--spec notify(c2s_state(), xmpp_element() | xmlel() | none) -> ok.
-notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}, Pkt) ->
+-spec notify(c2s_state(), xmpp_element() | xmlel() | none, direction()) -> ok.
+notify(#{jid := #jid{luser = LUser, lserver = LServer},
+ sid := {TS, _}},
+ Pkt, Dir) ->
case lookup_session(LUser, LServer, TS) of
{ok, Client} ->
- notify(LUser, LServer, [Client], Pkt);
+ notify(LUser, LServer, [Client], Pkt, Dir);
_Err ->
ok
end.
-spec notify(binary(), binary(), [push_session()],
- xmpp_element() | xmlel() | none) -> ok.
-notify(LUser, LServer, Clients, Pkt) ->
+ xmpp_element() | xmlel() | none, direction()) -> ok.
+notify(LUser, LServer, Clients, Pkt, Dir) ->
lists:foreach(
fun({TS, PushLJID, Node, XData}) ->
HandleResponse = fun(#iq{type = result}) ->
(timeout) ->
ok % Hmm.
end,
- notify(LServer, PushLJID, Node, XData, Pkt, HandleResponse)
+ notify(LServer, PushLJID, Node, XData, Pkt, Dir, HandleResponse)
end, Clients).
-spec notify(binary(), ljid(), binary(), xdata(),
- xmpp_element() | xmlel() | none,
+ xmpp_element() | xmlel() | none, direction(),
fun((iq() | timeout) -> any())) -> ok.
-notify(LServer, PushLJID, Node, XData, Pkt, HandleResponse) ->
+notify(LServer, PushLJID, Node, XData, Pkt, Dir, HandleResponse) ->
From = jid:make(LServer),
- Summary = make_summary(LServer, Pkt),
+ Summary = make_summary(LServer, Pkt, Dir),
Item = #ps_item{sub_els = [#push_notification{xdata = Summary}]},
PubSub = #pubsub{publish = #ps_publish{node = Node, items = [Item]},
publish_options = XData},
%%--------------------------------------------------------------------
%% Miscellaneous.
%%--------------------------------------------------------------------
--spec is_message_with_body(stanza()) -> boolean().
-is_message_with_body(#message{} = Msg) ->
- Msg1 = xmpp_util:unwrap_carbon(Msg),
- get_body_text(Msg1) /= none;
-is_message_with_body(_Stanza) ->
+-spec is_incoming_chat_msg(stanza()) -> boolean().
+is_incoming_chat_msg(#message{} = Msg) ->
+ case get_direction(Msg) of
+ recv -> get_body_text(unwrap_carbon(Msg)) /= none;
+ send -> false
+ end;
+is_incoming_chat_msg(_Stanza) ->
false.
%%--------------------------------------------------------------------
[Client || {TS, _, _, _} = Client <- Clients,
lists:keyfind(TS, 1, SessIDs) == false].
--spec make_summary(binary(), xmpp_element() | xmlel() | none)
+-spec make_summary(binary(), xmpp_element() | xmlel() | none, direction())
-> xdata() | undefined.
-make_summary(Host, #message{from = From} = Pkt) ->
+make_summary(Host, #message{from = From} = Pkt, recv) ->
case {gen_mod:get_module_opt(Host, ?MODULE, include_sender),
gen_mod:get_module_opt(Host, ?MODULE, include_body)} of
{false, false} ->
#xdata{type = submit, fields = push_summary:encode(Fields2)}
end
end;
-make_summary(_Host, _Pkt) ->
+make_summary(_Host, _Pkt, _Dir) ->
+ undefined.
+
+-spec unwrap_carbon(stanza()) -> stanza().
+unwrap_carbon(#message{meta = #{carbon_copy := true}} = Msg) ->
+ xmpp_util:unwrap_carbon(Msg);
+unwrap_carbon(Stanza) ->
+ Stanza.
+
+-spec get_direction(stanza()) -> direction().
+get_direction(#message{meta = #{carbon_copy := true},
+ from = #jid{luser = U, lserver = S},
+ to = #jid{luser = U, lserver = S}}) ->
+ send;
+get_direction(#message{}) ->
+ recv;
+get_direction(_Stanza) ->
undefined.
-spec get_body_text(message()) -> binary() | none.
-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state().
c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State,
Pkt, _SendResult) ->
- case mod_push:is_message_with_body(Pkt) of
+ case mod_push:is_incoming_chat_msg(Pkt) of
true ->
maybe_restore_resume_timeout(State);
false ->
-spec c2s_session_pending(c2s_state()) -> c2s_state().
c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) ->
- case mod_stream_mgmt:queue_find(fun mod_push:is_message_with_body/1,
+ case mod_stream_mgmt:queue_find(fun mod_push:is_incoming_chat_msg/1,
Queue) of
none ->
State1 = maybe_adjust_resume_timeout(State),
c2s_handle_info(#{push_enabled := true, mgmt_state := pending,
jid := JID} = State, {timeout, _, push_keepalive}) ->
?INFO_MSG("Waking ~s before session times out", [jid:encode(JID)]),
- mod_push:notify(State, none),
+ mod_push:notify(State, none, undefined),
{stop, State};
c2s_handle_info(State, _) ->
State.
IgnoreResponse = fun(_) -> ok end,
lists:foreach(fun({_, PushLJID, Node, XData}) ->
mod_push:notify(LServer, PushLJID, Node,
- XData, none, IgnoreResponse)
+ XData, none, undefined,
+ IgnoreResponse)
end, Sessions);
error ->
error