%% API
-export([get_nodes/0, call/4, multicall/3, multicall/4]).
-export([join/1, leave/1]).
+-export([node_id/0, get_node_by_id/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
erlang:halt(0)
end),
ok.
+
+-spec node_id() -> binary().
+node_id() ->
+ integer_to_binary(erlang:phash2(node())).
+
+-spec get_node_by_id(binary()) -> node().
+get_node_by_id(Hash) ->
+ try binary_to_integer(Hash) of
+ I -> match_node_id(I)
+ catch _:_ ->
+ node()
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec match_node_id(integer()) -> node().
+match_node_id(I) ->
+ match_node_id(I, get_nodes()).
+
+-spec match_node_id(integer(), [node()]) -> node().
+match_node_id(I, [Node|Nodes]) ->
+ case erlang:phash2(Node) of
+ I -> Node;
+ _ -> match_node_id(I, Nodes)
+ end;
+match_node_id(_I, []) ->
+ node().
-define(PROCNAME, ejabberd_mod_proxy65).
+-callback init() -> any().
+-callback register_stream(binary(), pid()) -> ok | {error, any()}.
+-callback unregister_stream(binary()) -> ok | {error, any()}.
+-callback activate_stream(binary(), binary(), pos_integer() | infinity, node()) ->
+ ok | {error, limit | conflict | notfound | term()}.
+
start(Host, Opts) ->
case mod_proxy65_service:add_listener(Host, Opts) of
{error, _} = Err -> erlang:error(Err);
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, infinity, supervisor, [?MODULE]},
- supervisor:start_child(ejabberd_sup, ChildSpec)
+ case supervisor:start_child(ejabberd_sup, ChildSpec) of
+ {error, _} = Err -> erlang:error(Err);
+ _ ->
+ Mod = gen_mod:ram_db_mod(global, ?MODULE),
+ Mod:init()
+ end
end.
stop(Host) ->
ejabberd_mod_proxy65_sup),
mod_proxy65_stream]},
transient, infinity, supervisor, [ejabberd_tmp_sup]},
- StreamManager = {mod_proxy65_sm,
- {mod_proxy65_sm, start_link, [Host, Opts]}, transient,
- 5000, worker, [mod_proxy65_sm]},
{ok,
{{one_for_one, 10, 1},
- [StreamManager, StreamSupervisor, Service]}}.
+ [StreamSupervisor, Service]}}.
depends(_Host, _Opts) ->
[].
fun (I) when is_integer(I), I > 0 -> I;
(infinity) -> infinity
end;
+mod_opt_type(ram_db_type) ->
+ fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
mod_opt_type(_) ->
[auth_type, recbuf, shaper, sndbuf,
access, host, hostname, ip, name, port,
- max_connections].
+ max_connections, ram_db_type].
--- /dev/null
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2017, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 16 Jan 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_proxy65_mnesia).
+-behaviour(gen_server).
+-behaviour(mod_proxy65).
+
+%% API
+-export([init/0, register_stream/2, unregister_stream/1, activate_stream/4]).
+-export([start_link/0]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("logger.hrl").
+
+-record(bytestream,
+ {sha1 = <<"">> :: binary() | '$1',
+ target :: pid() | '_',
+ initiator :: pid() | '_',
+ active = false :: boolean() | '_',
+ jid_i :: undefined | binary() | '_'}).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init() ->
+ Spec = {?MODULE, {?MODULE, start_link, []}, transient,
+ 5000, worker, [?MODULE]},
+ supervisor:start_child(ejabberd_sup, Spec).
+
+register_stream(SHA1, StreamPid) ->
+ F = fun () ->
+ case mnesia:read(bytestream, SHA1, write) of
+ [] ->
+ mnesia:write(#bytestream{sha1 = SHA1,
+ target = StreamPid});
+ [#bytestream{target = Pid, initiator = undefined} =
+ ByteStream] when is_pid(Pid), Pid /= StreamPid ->
+ mnesia:write(ByteStream#bytestream{
+ initiator = StreamPid})
+ end
+ end,
+ case mnesia:transaction(F) of
+ {atomic, ok} ->
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
+ {error, Reason}
+ end.
+
+unregister_stream(SHA1) ->
+ F = fun () -> mnesia:delete({bytestream, SHA1}) end,
+ case mnesia:transaction(F) of
+ {atomic, ok} ->
+ ok;
+ {aborted, Reason} ->
+ ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
+ {error, Reason}
+ end.
+
+activate_stream(SHA1, Initiator, MaxConnections, _Node) ->
+ case gen_server:call(?MODULE,
+ {activate_stream, SHA1, Initiator, MaxConnections}) of
+ {atomic, {ok, IPid, TPid}} ->
+ {ok, IPid, TPid};
+ {atomic, {limit, IPid, TPid}} ->
+ {error, {limit, IPid, TPid}};
+ {atomic, conflict} ->
+ {error, conflict};
+ {atomic, notfound} ->
+ {error, notfound};
+ Err ->
+ {error, Err}
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+ ejabberd_mnesia:create(?MODULE, bytestream,
+ [{ram_copies, [node()]},
+ {attributes, record_info(fields, bytestream)}]),
+ mnesia:add_table_copy(bytestream, node(), ram_copies),
+ {ok, #state{}}.
+
+handle_call({activate_stream, SHA1, Initiator, MaxConnections}, _From, State) ->
+ F = fun () ->
+ case mnesia:read(bytestream, SHA1, write) of
+ [#bytestream{target = TPid, initiator = IPid} =
+ ByteStream] when is_pid(TPid), is_pid(IPid) ->
+ ActiveFlag = ByteStream#bytestream.active,
+ if ActiveFlag == false ->
+ ConnsPerJID = mnesia:select(
+ bytestream,
+ [{#bytestream{sha1 = '$1',
+ jid_i = Initiator,
+ _ = '_'},
+ [], ['$1']}]),
+ if length(ConnsPerJID) < MaxConnections ->
+ mnesia:write(
+ ByteStream#bytestream{active = true,
+ jid_i = Initiator}),
+ {ok, IPid, TPid};
+ true ->
+ {limit, IPid, TPid}
+ end;
+ true ->
+ conflict
+ end;
+ _ ->
+ notfound
+ end
+ end,
+ Reply = mnesia:transaction(F),
+ {reply, Reply, State};
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
all),
case acl:match_rule(ServerHost, ACL, InitiatorJID) of
allow ->
+ Node = ejabberd_cluster:get_node_by_id(To#jid.lresource),
Target = jid:to_string(jid:tolower(TargetJID)),
Initiator = jid:to_string(jid:tolower(InitiatorJID)),
SHA1 = p1_sha:sha(<<SID/binary, Initiator/binary, Target/binary>>),
- case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID,
- TargetJID, ServerHost) of
- ok ->
+ Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+ MaxConnections = max_connections(ServerHost),
+ case Mod:activate_stream(SHA1, Initiator, MaxConnections, Node) of
+ {ok, InitiatorPid, TargetPid} ->
+ mod_proxy65_stream:activate(
+ {InitiatorPid, InitiatorJID}, {TargetPid, TargetJID}),
xmpp:make_iq_result(IQ);
- false ->
+ {error, notfound} ->
Txt = <<"Failed to activate bytestream">>,
xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang));
- limit ->
+ {error, {limit, InitiatorPid, TargetPid}} ->
+ mod_proxy65_stream:stop(InitiatorPid),
+ mod_proxy65_stream:stop(TargetPid),
Txt = <<"Too many active bytestreams">>,
xmpp:make_error(IQ, xmpp:err_resource_constraint(Txt, Lang));
- conflict ->
+ {error, conflict} ->
Txt = <<"Bytestream already activated">>,
xmpp:make_error(IQ, xmpp:err_conflict(Txt, Lang));
- Err ->
+ {error, Err} ->
?ERROR_MSG("failed to activate bytestream from ~s to ~s: ~p",
[Initiator, Target, Err]),
- xmpp:make_error(IQ, xmpp:err_internal_server_error())
+ Txt = <<"Database failure">>,
+ xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
end;
deny ->
Txt = <<"Denied by ACL">>,
xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang))
end.
+
%%%-------------------------
%%% Auxiliary functions.
%%%-------------------------
HostName = gen_mod:get_module_opt(ServerHost, mod_proxy65, hostname,
fun iolist_to_binary/1,
jlib:ip_to_list(IP)),
- #streamhost{jid = jid:make(Host),
+ Resource = ejabberd_cluster:node_id(),
+ #streamhost{jid = jid:make(<<"">>, Host, Resource),
host = HostName,
port = Port}.
{ok, Addr} -> Addr;
{error, _} -> {127, 0, 0, 1}
end.
+
+max_connections(ServerHost) ->
+ gen_mod:get_module_opt(ServerHost, mod_proxy65, max_connections,
+ fun(I) when is_integer(I), I>0 -> I;
+ (infinity) -> infinity
+ end, infinity).
+++ /dev/null
-%%%----------------------------------------------------------------------
-%%% File : mod_proxy65_sm.erl
-%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
-%%% Purpose : Bytestreams manager.
-%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(mod_proxy65_sm).
-
--author('xram@jabber.ru').
-
--behaviour(gen_server).
-
-%% gen_server callbacks.
--export([init/1, handle_info/2, handle_call/3,
- handle_cast/2, terminate/2, code_change/3]).
-
--export([start_link/2, register_stream/1,
- unregister_stream/1, activate_stream/4]).
-
--record(state, {max_connections = infinity :: non_neg_integer() | infinity}).
-
--record(bytestream,
- {sha1 = <<"">> :: binary() | '$1',
- target :: pid() | '_',
- initiator :: pid() | '_',
- active = false :: boolean() | '_',
- jid_i = {<<"">>, <<"">>, <<"">>} :: jid:ljid() | '_'}).
-
--define(PROCNAME, ejabberd_mod_proxy65_sm).
-
-%% Unused callbacks.
-handle_cast(_Request, State) -> {noreply, State}.
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-handle_info(_Info, State) -> {noreply, State}.
-
-%%----------------
-
-start_link(Host, Opts) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- gen_server:start_link({local, Proc}, ?MODULE, [Opts],
- []).
-
-init([Opts]) ->
- ejabberd_mnesia:create(?MODULE, bytestream,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, bytestream)}]),
- mnesia:add_table_copy(bytestream, node(), ram_copies),
- MaxConnections = gen_mod:get_opt(max_connections, Opts,
- fun(I) when is_integer(I), I>0 ->
- I;
- (infinity) ->
- infinity
- end, infinity),
- {ok, #state{max_connections = MaxConnections}}.
-
-terminate(_Reason, _State) -> ok.
-
-handle_call({activate, SHA1, IJid}, _From, State) ->
- MaxConns = State#state.max_connections,
- F = fun () ->
- case mnesia:read(bytestream, SHA1, write) of
- [#bytestream{target = TPid, initiator = IPid} =
- ByteStream]
- when is_pid(TPid), is_pid(IPid) ->
- ActiveFlag = ByteStream#bytestream.active,
- if ActiveFlag == false ->
- ConnsPerJID = mnesia:select(bytestream,
- [{#bytestream{sha1 =
- '$1',
- jid_i =
- IJid,
- _ = '_'},
- [], ['$1']}]),
- if length(ConnsPerJID) < MaxConns ->
- mnesia:write(ByteStream#bytestream{active =
- true,
- jid_i =
- IJid}),
- {ok, IPid, TPid};
- true -> {limit, IPid, TPid}
- end;
- true -> conflict
- end;
- _ -> false
- end
- end,
- Reply = mnesia:transaction(F),
- {reply, Reply, State};
-handle_call(_Request, _From, State) ->
- {reply, ok, State}.
-
-%%%----------------------
-%%% API.
-%%%----------------------
-%%%---------------------------------------------------
-%%% register_stream(SHA1) -> {atomic, ok} |
-%%% {atomic, error} |
-%%% transaction abort
-%%% SHA1 = string()
-%%%---------------------------------------------------
-register_stream(SHA1) when is_binary(SHA1) ->
- StreamPid = self(),
- F = fun () ->
- case mnesia:read(bytestream, SHA1, write) of
- [] ->
- mnesia:write(#bytestream{sha1 = SHA1,
- target = StreamPid});
- [#bytestream{target = Pid, initiator = undefined} =
- ByteStream]
- when is_pid(Pid), Pid /= StreamPid ->
- mnesia:write(ByteStream#bytestream{initiator =
- StreamPid});
- _ -> error
- end
- end,
- mnesia:transaction(F).
-
-%%%----------------------------------------------------
-%%% unregister_stream(SHA1) -> ok | transaction abort
-%%% SHA1 = string()
-%%%----------------------------------------------------
-unregister_stream(SHA1) when is_binary(SHA1) ->
- F = fun () -> mnesia:delete({bytestream, SHA1}) end,
- mnesia:transaction(F).
-
-%%%--------------------------------------------------------
-%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
-%%% false |
-%%% limit |
-%%% conflict |
-%%% error
-%%% SHA1 = string()
-%%% IJid = TJid = jid()
-%%% Host = string()
-%%%--------------------------------------------------------
-activate_stream(SHA1, IJid, TJid, Host)
- when is_binary(SHA1) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- case catch gen_server:call(Proc, {activate, SHA1, IJid})
- of
- {atomic, {ok, IPid, TPid}} ->
- mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
- {atomic, {limit, IPid, TPid}} ->
- mod_proxy65_stream:stop(IPid),
- mod_proxy65_stream:stop(TPid),
- limit;
- {atomic, conflict} -> conflict;
- {atomic, false} -> false;
- _ -> error
- end.
socket = Socket, shaper = Shaper, timer = TRef}}.
terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
- catch mod_proxy65_sm:unregister_stream(SHA1),
+ Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+ Mod:unregister_stream(SHA1),
if StateName == stream_established ->
?INFO_MSG("Bytestream terminated", []);
true -> ok
Request = mod_proxy65_lib:unpack_request(Packet),
case Request of
#s5_request{sha1 = SHA1, cmd = connect} ->
- case catch mod_proxy65_sm:register_stream(SHA1) of
- {atomic, ok} ->
+ Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+ case Mod:register_stream(SHA1, self()) of
+ ok ->
inet:setopts(Socket, [{active, false}]),
gen_tcp:send(Socket,
mod_proxy65_lib:make_reply(Request)),