-export([get_commands_spec/0, delete_old_sessions/1]).
%% API (used by mod_push_keepalive).
--export([notify/2, notify/4, notify/6]).
+-export([notify/2, notify/4, notify/6, is_message_with_body/1]).
%% For IQ callbacks
-export([delete_session/3]).
case p1_queue:len(Queue) of
Len when Len > 0 ->
?DEBUG("Notifying client of unacknowledged stanza(s)", []),
- Pkt = queue_find(fun is_message_with_body/1, Queue),
+ Pkt = mod_stream_mgmt:queue_find(fun is_message_with_body/1, Queue),
notify(State, Pkt),
State;
0 ->
sub_els = [PubSub]},
ejabberd_router:route_iq(IQ, HandleResponse).
+%%--------------------------------------------------------------------
+%% Miscellaneous.
+%%--------------------------------------------------------------------
+-spec is_message_with_body(stanza()) -> boolean().
+is_message_with_body(#message{} = Msg) ->
+ get_body_text(Msg) /= none;
+is_message_with_body(_Stanza) ->
+ false.
+
%%--------------------------------------------------------------------
%% Internal functions.
%%--------------------------------------------------------------------
[Client || {TS, _, _, _} = Client <- Clients,
lists:keyfind(TS, 1, SessIDs) == false].
--spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue())
- -> stanza() | none.
-queue_find(Pred, Queue) ->
- case p1_queue:out(Queue) of
- {{value, {_, _, Pkt}}, Queue1} ->
- case Pred(Pkt) of
- true ->
- Pkt;
- false ->
- queue_find(Pred, Queue1)
- end;
- {empty, _Queue1} ->
- none
- end.
-
-spec make_summary(binary(), xmpp_element() | xmlel() | none)
-> xdata() | undefined.
make_summary(Host, #message{from = From} = Pkt) ->
make_summary(_Host, _Pkt) ->
undefined.
--spec is_message_with_body(stanza()) -> boolean().
-is_message_with_body(#message{} = Msg) ->
- get_body_text(Msg) /= none;
-is_message_with_body(_Stanza) ->
- false.
-
-spec get_body_text(message()) -> binary() | none.
get_body_text(#message{body = Body} = Msg) ->
case xmpp:get_text(Body) of
%%--------------------------------------------------------------------
-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state().
c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State,
- _Pkt, _SendResult) ->
- maybe_restore_resume_timeout(State);
+ Pkt, _SendResult) ->
+ case mod_push:is_message_with_body(Pkt) of
+ true ->
+ maybe_restore_resume_timeout(State);
+ false ->
+ State
+ end;
c2s_stanza(State, _Pkt, _SendResult) ->
State.
-spec c2s_session_pending(c2s_state()) -> c2s_state().
c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) ->
- case p1_queue:len(Queue) of
- 0 ->
+ case mod_stream_mgmt:queue_find(fun mod_push:is_message_with_body/1,
+ Queue) of
+ none ->
State1 = maybe_adjust_resume_timeout(State),
maybe_start_wakeup_timer(State1);
- _ ->
+ _Msg ->
State
end;
c2s_session_pending(State) ->
c2s_unbinded_packet/2, c2s_closed/2, c2s_terminated/2,
c2s_handle_send/3, c2s_handle_info/2, c2s_handle_call/3,
c2s_handle_recv/3]).
-%% adjust pending session timeout
--export([get_resume_timeout/1, set_resume_timeout/2]).
+%% adjust pending session timeout / access queue
+-export([get_resume_timeout/1, set_resume_timeout/2, queue_find/2]).
-include("ejabberd.hrl").
-include("xmpp.hrl").
State.
%%%===================================================================
-%%% Adjust pending session timeout
+%%% Adjust pending session timeout / access queue
%%%===================================================================
-spec get_resume_timeout(state()) -> non_neg_integer().
get_resume_timeout(#{mgmt_timeout := Timeout}) ->
State1 = restart_pending_timer(State, Timeout),
State1#{mgmt_timeout => Timeout}.
+-spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue())
+ -> stanza() | none.
+queue_find(Pred, Queue) ->
+ case p1_queue:out(Queue) of
+ {{value, {_, _, Pkt}}, Queue1} ->
+ case Pred(Pkt) of
+ true ->
+ Pkt;
+ false ->
+ queue_find(Pred, Queue1)
+ end;
+ {empty, _Queue1} ->
+ none
+ end.
+
%%%===================================================================
%%% Internal functions
%%%===================================================================