CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 (sid);
CREATE INDEX i_proxy65_jid ON proxy65 (jid_i);
+
+CREATE TABLE push_session (
+ username text NOT NULL,
+ timestamp bigint NOT NULL,
+ service text NOT NULL,
+ node text NOT NULL,
+ xml text NOT NULL
+);
+
+CREATE UNIQUE INDEX i_push_usn ON push_session (username, service, node);
+CREATE UNIQUE INDEX i_push_ut ON push_session (username, timestamp);
\r
CREATE INDEX [carboncopy_user] ON [carboncopy] (username)\r
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);\r
+\r
+CREATE TABLE [dbo].[push_session] (\r
+ [username] [varchar] (255) NOT NULL,\r
+ [timestamp] [bigint] NOT NULL,\r
+ [service] [varchar] (255) NOT NULL,\r
+ [node] [varchar] (255) NOT NULL,\r
+ [xml] [varchar] (255) NOT NULL\r
+);\r
+\r
+CREATE UNIQUE CLUSTERED INDEX [i_push_usn] ON [push_session] (username, service, node)\r
+WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);\r
+\r
+CREATE UNIQUE CLUSTERED INDEX [i_push_ut] ON [push_session] (username, timestamp)\r
+WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);\r
CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 (sid(191));
CREATE INDEX i_proxy65_jid ON proxy65 (jid_i(191));
+
+CREATE TABLE push_session (
+ username text NOT NULL,
+ timestamp bigint NOT NULL,
+ service text NOT NULL,
+ node text NOT NULL,
+ xml text NOT NULL
+);
+
+CREATE UNIQUE INDEX i_push_usn ON push_session (username(191), service(191), node(191));
+CREATE UNIQUE INDEX i_push_ut ON push_session (username(191), timestamp);
CREATE UNIQUE INDEX i_proxy65_sid ON proxy65 USING btree (sid);
CREATE INDEX i_proxy65_jid ON proxy65 USING btree (jid_i);
+
+CREATE TABLE push_session (
+ username text NOT NULL,
+ timestamp bigint NOT NULL,
+ service text NOT NULL,
+ node text NOT NULL,
+ xml text NOT NULL
+);
+
+CREATE UNIQUE INDEX i_push_usn ON push_session USING btree (username, service, node);
+CREATE UNIQUE INDEX i_push_ut ON push_session USING btree (username, timestamp);
-> any().
-callback store_session(binary(), binary(), timestamp(), jid(), binary(),
xdata())
- -> {ok, push_session()} | error.
+ -> {ok, push_session()} | {error, any()}.
-callback lookup_session(binary(), binary(), jid(), binary())
- -> {ok, push_session()} | error.
+ -> {ok, push_session()} | error | {error, any()}.
-callback lookup_session(binary(), binary(), timestamp())
- -> {ok, push_session()} | error.
+ -> {ok, push_session()} | error | {error, any()}.
-callback lookup_sessions(binary(), binary(), jid())
- -> {ok, [push_session()]} | error.
+ -> {ok, [push_session()]} | {error, any()}.
-callback lookup_sessions(binary(), binary())
- -> {ok, [push_session()]} | error.
+ -> {ok, [push_session()]} | {error, any()}.
-callback lookup_sessions(binary())
- -> {ok, [push_session()]} | error.
+ -> {ok, [push_session()]} | {error, any()}.
-callback delete_session(binary(), binary(), timestamp())
- -> ok | error.
+ -> ok | {error, any()}.
-callback delete_old_sessions(binary() | global, erlang:timestamp())
- -> any().
+ -> ok | {error, any()}.
-callback use_cache(binary())
-> boolean().
-callback cache_nodes(binary())
xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang));
process_iq(#iq{from = #jid{lserver = LServer} = JID,
to = #jid{lserver = LServer},
+ lang = Lang,
sub_els = [#push_enable{jid = PushJID,
node = Node,
xdata = XData}]} = IQ) ->
case enable(JID, PushJID, Node, XData) of
ok ->
xmpp:make_iq_result(IQ);
- error ->
- xmpp:make_error(IQ, xmpp:err_internal_server_error())
+ {error, db_failure} ->
+ Txt = <<"Database, failure">>,
+ xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang));
+ {error, notfound} ->
+ Txt = <<"User session not found">>,
+ xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang))
end;
process_iq(#iq{from = #jid{lserver = LServer} = JID,
to = #jid{lserver = LServer},
+ lang = Lang,
sub_els = [#push_disable{jid = PushJID,
node = Node}]} = IQ) ->
case disable(JID, PushJID, Node) of
ok ->
xmpp:make_iq_result(IQ);
- error ->
- xmpp:make_error(IQ, xmpp:err_item_not_found())
+ {error, db_failure} ->
+ Txt = <<"Database, failure">>,
+ xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang));
+ {error, notfound} ->
+ Txt = <<"Push record not found">>,
+ xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang))
end;
process_iq(IQ) ->
xmpp:make_error(IQ, xmpp:err_not_allowed()).
--spec enable(jid(), jid(), binary(), xdata()) -> ok | error.
+-spec enable(jid(), jid(), binary(), xdata()) -> ok | {error, notfound | db_failure}.
enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID,
PushJID, Node, XData) ->
case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of
?INFO_MSG("Enabling push notifications for ~s",
[jid:encode(JID)]),
ejabberd_c2s:cast(PID, push_enable);
- error ->
+ {error, _} ->
?ERROR_MSG("Cannot enable push for ~s: database error",
[jid:encode(JID)]),
- error
+ {error, db_failure}
end;
none ->
?WARNING_MSG("Cannot enable push for ~s: session not found",
[jid:encode(JID)]),
- error
+ {error, notfound}
end.
--spec disable(jid(), jid(), binary() | undefined) -> ok | error.
+-spec disable(jid(), jid(), binary() | undefined) -> ok | {error, notfound | db_failure}.
disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID,
PushJID, Node) ->
case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of
?WARNING_MSG("Session not found while disabling push for ~s",
[jid:encode(JID)])
end,
- if Node /= undefined ->
+ if Node /= <<>> ->
delete_session(LUser, LServer, PushJID, Node);
true ->
delete_sessions(LUser, LServer, PushJID)
c2s_handle_cast(State, _Msg) ->
State.
--spec remove_user(binary(), binary()) -> ok | error.
+-spec remove_user(binary(), binary()) -> ok | {error, any()}.
remove_user(LUser, LServer) ->
?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case lookup_session(LUser, LServer, TS) of
{ok, Client} ->
notify(LUser, LServer, [Client]);
- error ->
+ _Err ->
ok
end.
%% Internal functions.
%%--------------------------------------------------------------------
-spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata())
- -> {ok, push_session()} | error.
+ -> {ok, push_session()} | {error, any()}.
store_session(LUser, LServer, TS, PushJID, Node, XData) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
delete_session(LUser, LServer, PushJID, Node),
end.
-spec lookup_session(binary(), binary(), timestamp())
- -> {ok, push_session()} | error.
+ -> {ok, push_session()} | error | {error, any()}.
lookup_session(LUser, LServer, TS) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_cache(Mod, LServer) of
Mod:lookup_session(LUser, LServer, TS)
end.
--spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error.
+-spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | {error, any()}.
lookup_sessions(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_cache(Mod, LServer) of
Mod:lookup_sessions(LUser, LServer)
end.
--spec delete_session(binary(), binary(), timestamp()) -> ok | error.
+-spec delete_session(binary(), binary(), timestamp()) -> ok | {error, db_failure}.
delete_session(LUser, LServer, TS) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- ok = Mod:delete_session(LUser, LServer, TS),
- case use_cache(Mod, LServer) of
- true ->
- ets_cache:delete(?PUSH_CACHE, {LUser, LServer},
- cache_nodes(Mod, LServer)),
- ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS},
- cache_nodes(Mod, LServer));
- false ->
- ok
+ case Mod:delete_session(LUser, LServer, TS) of
+ ok ->
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:delete(?PUSH_CACHE, {LUser, LServer},
+ cache_nodes(Mod, LServer)),
+ ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS},
+ cache_nodes(Mod, LServer));
+ false ->
+ ok
+ end;
+ {error, _} ->
+ {error, db_failure}
end.
--spec delete_session(binary(), binary(), jid(), binary()) -> ok | error.
+-spec delete_session(binary(), binary(), jid(), binary()) -> ok | {error, notfound | db_failure}.
delete_session(LUser, LServer, PushJID, Node) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case Mod:lookup_session(LUser, LServer, PushJID, Node) of
{ok, {TS, _, _, _}} ->
delete_session(LUser, LServer, TS);
error ->
- error
+ {error, notfound};
+ {error, _} ->
+ {error, db_failure}
end.
--spec delete_sessions(binary(), binary(), jid()) -> ok | error.
+-spec delete_sessions(binary(), binary(), jid()) -> ok | {error, notfound | db_failure}.
delete_sessions(LUser, LServer, PushJID) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end,
delete_sessions(LUser, LServer, LookupFun, Mod).
--spec delete_sessions(binary(), binary(), fun(() -> ok | error), module())
- -> ok | error.
+-spec delete_sessions(binary(), binary(), fun(() -> any()), module())
+ -> ok | {error, _}.
delete_sessions(LUser, LServer, LookupFun, Mod) ->
case LookupFun() of
+ {ok, []} ->
+ {error, notfound};
{ok, Clients} ->
case use_cache(Mod, LServer) of
true ->
ok
end
end, Clients);
- error ->
- error
+ {error, _} ->
+ {error, db_failure}
end.
-spec drop_online_sessions(binary(), binary(), [push_session()])
{aborted, E} ->
?ERROR_MSG("Cannot store push session for ~s@~s: ~p",
[LUser, LServer, E]),
- error
+ {error, db_failure}
end.
lookup_session(LUser, LServer, PushJID, Node) ->
case mnesia:dirty_select(push_session, MatchSpec) of
[#push_session{timestamp = TS, xdata = XData}] ->
{ok, {TS, PushLJID, Node, XData}};
- _ ->
+ [] ->
?DEBUG("No push session found for ~s@~s (~p, ~s)",
[LUser, LServer, PushJID, Node]),
error
case mnesia:dirty_select(push_session, MatchSpec) of
[#push_session{service = PushLJID, node = Node, xdata = XData}] ->
{ok, {TS, PushLJID, Node, XData}};
- _ ->
+ [] ->
?DEBUG("No push session found for ~s@~s (~p)",
[LUser, LServer, TS]),
error
{aborted, E} ->
?ERROR_MSG("Cannot delete push session of ~s@~s: ~p",
[LUser, LServer, E]),
- error
+ {error, db_failure}
end.
delete_old_sessions(_LServer, Time) ->
ok;
{aborted, E} ->
?ERROR_MSG("Cannot delete old push sessions: ~p", [E]),
- error
+ {error, db_failure}
end.
%%--------------------------------------------------------------------
--- /dev/null
+%%%----------------------------------------------------------------------
+%%% ejabberd, Copyright (C) 2017 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+-module(mod_push_sql).
+-behaviour(mod_push).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+%% API
+-export([init/2, store_session/6, lookup_session/4, lookup_session/3,
+ lookup_sessions/3, lookup_sessions/2, lookup_sessions/1,
+ delete_session/3, delete_old_sessions/2]).
+
+-include("xmpp.hrl").
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ ok.
+
+store_session(LUser, LServer, NowTS, PushJID, Node, XData) ->
+ XML = case XData of
+ undefined -> <<>>;
+ _ -> fxml:element_to_binary(xmpp:encode(XData))
+ end,
+ TS = misc:now_to_usec(NowTS),
+ PushLJID = jid:tolower(PushJID),
+ Service = jid:encode(PushLJID),
+ case ?SQL_UPSERT(LServer, "push_session",
+ ["!username=%(LUser)s",
+ "!timestamp=%(TS)d",
+ "!service=%(Service)s",
+ "!node=%(Node)s",
+ "xml=%(XML)s"]) of
+ ok ->
+ {ok, {NowTS, PushLJID, Node, XData}};
+ Err ->
+ ?ERROR_MSG("Failed to update 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+lookup_session(LUser, LServer, PushJID, Node) ->
+ PushLJID = jid:tolower(PushJID),
+ Service = jid:encode(PushLJID),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(timestamp)d, @(xml)s from push_session "
+ "where username=%(LUser)s and service=%(Service)s "
+ "and node=%(Node)s")) of
+ {selected, [{TS, XML}]} ->
+ NowTS = misc:usec_to_now(TS),
+ XData = decode_xdata(XML, LUser, LServer),
+ {ok, {NowTS, PushLJID, Node, XData}};
+ {selected, []} ->
+ error;
+ Err ->
+ ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+lookup_session(LUser, LServer, NowTS) ->
+ TS = misc:now_to_usec(NowTS),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(service)s, @(node)s, @(xml)s "
+ "from push_session where username=%(LUser)s "
+ "and timestamp=%(TS)d")) of
+ {selected, [{Service, Node, XML}]} ->
+ PushLJID = jid:tolower(jid:decode(Service)),
+ XData = decode_xdata(XML, LUser, LServer),
+ {ok, {NowTS, PushLJID, Node, XData}};
+ {selected, []} ->
+ error;
+ Err ->
+ ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+lookup_sessions(LUser, LServer, PushJID) ->
+ PushLJID = jid:tolower(PushJID),
+ Service = jid:encode(PushLJID),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(timestamp)d, @(xml)s, @(node)s from push_session "
+ "where username=%(LUser)s and service=%(Service)s")) of
+ {selected, Rows} ->
+ {ok, lists:map(
+ fun({TS, XML, Node}) ->
+ NowTS = misc:usec_to_now(TS),
+ XData = decode_xdata(XML, LUser, LServer),
+ {NowTS, PushLJID, Node, XData}
+ end, Rows)};
+ Err ->
+ ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+lookup_sessions(LUser, LServer) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(timestamp)d, @(xml)s, @(node)s, @(service)s "
+ "from push_session where username=%(LUser)s")) of
+ {selected, Rows} ->
+ {ok, lists:map(
+ fun({TS, XML, Node, Service}) ->
+ NowTS = misc:usec_to_now(TS),
+ XData = decode_xdata(XML, LUser, LServer),
+ PushLJID = jid:tolower(jid:decode(Service)),
+ {NowTS, PushLJID,Node, XData}
+ end, Rows)};
+ Err ->
+ ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+lookup_sessions(LServer) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(username)s, @(timestamp)d, @(xml)s, "
+ "@(node)s, @(service)s from push_session")) of
+ {selected, Rows} ->
+ {ok, lists:map(
+ fun({LUser, TS, XML, Node, Service}) ->
+ NowTS = misc:usec_to_now(TS),
+ XData = decode_xdata(XML, LUser, LServer),
+ PushLJID = jid:tolower(jid:decode(Service)),
+ {NowTS, PushLJID, Node, XData}
+ end, Rows)};
+ Err ->
+ ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+delete_session(LUser, LServer, NowTS) ->
+ TS = misc:now_to_usec(NowTS),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from push_session where "
+ "username=%(LUser)s and timestamp=%(TS)d")) of
+ {updated, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+delete_old_sessions(LServer, Time) ->
+ TS = misc:now_to_usec(Time),
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from push_session where timestamp<%(TS)d")) of
+ {updated, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]),
+ {error, db_failure}
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+decode_xdata(<<>>, _LUser, _LServer) ->
+ undefined;
+decode_xdata(XML, LUser, LServer) ->
+ case fxml_stream:parse_element(XML) of
+ #xmlel{} = El ->
+ try xmpp:decode(El)
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode ~s for user ~s@~s "
+ "from table 'push_session': ~s",
+ [XML, LUser, LServer, xmpp:format_error(Why)]),
+ undefined
+ end;
+ Err ->
+ ?ERROR_MSG("Failed to decode ~s for user ~s@~s from "
+ "table 'push_session': ~p",
+ [XML, LUser, LServer, Err]),
+ undefined
+ end.
muc_tests:single_cases(),
offline_tests:single_cases(),
mam_tests:single_cases(),
+ push_tests:single_cases(),
test_unregister]},
muc_tests:master_slave_cases(),
privacy_tests:master_slave_cases(),
mam_tests:master_slave_cases(),
vcard_tests:master_slave_cases(),
announce_tests:master_slave_cases(),
- carbons_tests:master_slave_cases()].
+ carbons_tests:master_slave_cases(),
+ push_tests:master_slave_cases()].
ldap_tests() ->
[{ldap_tests, [sequence],
mod_disco: []
mod_ping: []
mod_proxy65: []
+ mod_push:
+ db_type: sql
+ mod_push_keepalive: []
mod_s2s_dialback: []
+ mod_stream_mgmt:
+ resume_timeout: 3
mod_legacy_auth: []
mod_register:
welcome_message:
mod_disco: []
mod_ping: []
mod_proxy65: []
+ mod_push:
+ db_type: sql
+ mod_push_keepalive: []
mod_s2s_dialback: []
+ mod_stream_mgmt:
+ resume_timeout: 3
mod_legacy_auth: []
mod_register:
welcome_message:
mod_disco: []
mod_ping: []
mod_proxy65: []
+ mod_push:
+ db_type: sql
+ mod_push_keepalive: []
mod_s2s_dialback: []
+ mod_stream_mgmt:
+ resume_timeout: 3
mod_legacy_auth: []
mod_register:
welcome_message: