]> granicus.if.org Git - ejabberd/commitdiff
Implement database backend interface for mod_proxy65
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Mon, 16 Jan 2017 12:28:11 +0000 (15:28 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Mon, 16 Jan 2017 12:28:11 +0000 (15:28 +0300)
src/ejabberd_cluster.erl
src/mod_proxy65.erl
src/mod_proxy65_mnesia.erl [new file with mode: 0644]
src/mod_proxy65_service.erl
src/mod_proxy65_sm.erl [deleted file]
src/mod_proxy65_stream.erl

index 1e3f02a9e781aaff193b74cc97e440180a9c569f..5826d6d31be6bd442cdd0adda657384d371e5e0d 100644 (file)
@@ -28,6 +28,7 @@
 %% API
 -export([get_nodes/0, call/4, multicall/3, multicall/4]).
 -export([join/1, leave/1]).
+-export([node_id/0, get_node_by_id/1]).
 
 -include("ejabberd.hrl").
 -include("logger.hrl").
@@ -102,3 +103,31 @@ leave([Master|_], Node) ->
                 erlang:halt(0)
         end),
     ok.
+
+-spec node_id() -> binary().
+node_id() ->
+    integer_to_binary(erlang:phash2(node())).
+
+-spec get_node_by_id(binary()) -> node().
+get_node_by_id(Hash) ->
+    try binary_to_integer(Hash) of
+       I -> match_node_id(I)
+    catch _:_ ->
+           node()
+    end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec match_node_id(integer()) -> node().
+match_node_id(I) ->
+    match_node_id(I, get_nodes()).
+
+-spec match_node_id(integer(), [node()]) -> node().
+match_node_id(I, [Node|Nodes]) ->
+    case erlang:phash2(Node) of
+       I -> Node;
+       _ -> match_node_id(I, Nodes)
+    end;
+match_node_id(_I, []) ->
+    node().
index 2d0d9ae0a8b5f02130e946c57d8b6ec924d06008..0c403e23b7aaf46e307087684d7e77b6c38b549b 100644 (file)
 
 -define(PROCNAME, ejabberd_mod_proxy65).
 
+-callback init() -> any().
+-callback register_stream(binary(), pid()) -> ok | {error, any()}.
+-callback unregister_stream(binary()) -> ok | {error, any()}.
+-callback activate_stream(binary(), binary(), pos_integer() | infinity, node()) ->
+    ok | {error, limit | conflict | notfound | term()}.
+
 start(Host, Opts) ->
     case mod_proxy65_service:add_listener(Host, Opts) of
       {error, _} = Err -> erlang:error(Err);
@@ -50,7 +56,12 @@ start(Host, Opts) ->
          Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
          ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
                       transient, infinity, supervisor, [?MODULE]},
-         supervisor:start_child(ejabberd_sup, ChildSpec)
+         case supervisor:start_child(ejabberd_sup, ChildSpec) of
+             {error, _} = Err -> erlang:error(Err);
+             _ ->
+                 Mod = gen_mod:ram_db_mod(global, ?MODULE),
+                 Mod:init()
+         end
     end.
 
 stop(Host) ->
@@ -77,12 +88,9 @@ init([Host, Opts]) ->
                                                  ejabberd_mod_proxy65_sup),
                          mod_proxy65_stream]},
                        transient, infinity, supervisor, [ejabberd_tmp_sup]},
-    StreamManager = {mod_proxy65_sm,
-                    {mod_proxy65_sm, start_link, [Host, Opts]}, transient,
-                    5000, worker, [mod_proxy65_sm]},
     {ok,
      {{one_for_one, 10, 1},
-      [StreamManager, StreamSupervisor, Service]}}.
+      [StreamSupervisor, Service]}}.
 
 depends(_Host, _Opts) ->
     [].
@@ -112,7 +120,9 @@ mod_opt_type(max_connections) ->
     fun (I) when is_integer(I), I > 0 -> I;
        (infinity) -> infinity
     end;
+mod_opt_type(ram_db_type) ->
+    fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
 mod_opt_type(_) ->
     [auth_type, recbuf, shaper, sndbuf,
      access, host, hostname, ip, name, port,
-     max_connections].
+     max_connections, ram_db_type].
diff --git a/src/mod_proxy65_mnesia.erl b/src/mod_proxy65_mnesia.erl
new file mode 100644 (file)
index 0000000..e50b29c
--- /dev/null
@@ -0,0 +1,145 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2017, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 16 Jan 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_proxy65_mnesia).
+-behaviour(gen_server).
+-behaviour(mod_proxy65).
+
+%% API
+-export([init/0, register_stream/2, unregister_stream/1, activate_stream/4]).
+-export([start_link/0]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+        terminate/2, code_change/3]).
+
+-include("logger.hrl").
+
+-record(bytestream,
+       {sha1 = <<"">> :: binary() | '$1',
+         target :: pid() | '_',
+         initiator :: pid() | '_',
+         active = false :: boolean() | '_',
+         jid_i :: undefined | binary() | '_'}).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init() ->
+    Spec = {?MODULE, {?MODULE, start_link, []}, transient,
+           5000, worker, [?MODULE]},
+    supervisor:start_child(ejabberd_sup, Spec).
+
+register_stream(SHA1, StreamPid) ->
+    F = fun () ->
+               case mnesia:read(bytestream, SHA1, write) of
+                   [] ->
+                       mnesia:write(#bytestream{sha1 = SHA1,
+                                                target = StreamPid});
+                   [#bytestream{target = Pid, initiator = undefined} =
+                        ByteStream] when is_pid(Pid), Pid /= StreamPid ->
+                       mnesia:write(ByteStream#bytestream{
+                                      initiator = StreamPid})
+               end
+       end,
+    case mnesia:transaction(F) of
+       {atomic, ok} ->
+           ok;
+       {aborted, Reason} ->
+           ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
+           {error, Reason}
+    end.
+
+unregister_stream(SHA1) ->
+    F = fun () -> mnesia:delete({bytestream, SHA1}) end,
+    case mnesia:transaction(F) of
+       {atomic, ok} ->
+           ok;
+       {aborted, Reason} ->
+           ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
+           {error, Reason}
+    end.
+
+activate_stream(SHA1, Initiator, MaxConnections, _Node) ->
+    case gen_server:call(?MODULE,
+                        {activate_stream, SHA1, Initiator, MaxConnections}) of
+       {atomic, {ok, IPid, TPid}} ->
+           {ok, IPid, TPid};
+       {atomic, {limit, IPid, TPid}} ->
+           {error, {limit, IPid, TPid}};
+       {atomic, conflict} ->
+           {error, conflict};
+       {atomic, notfound} ->
+           {error, notfound};
+       Err ->
+           {error, Err}
+    end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+    ejabberd_mnesia:create(?MODULE, bytestream,
+                          [{ram_copies, [node()]},
+                           {attributes, record_info(fields, bytestream)}]),
+    mnesia:add_table_copy(bytestream, node(), ram_copies),
+    {ok, #state{}}.
+
+handle_call({activate_stream, SHA1, Initiator, MaxConnections}, _From, State) ->
+    F = fun () ->
+               case mnesia:read(bytestream, SHA1, write) of
+                   [#bytestream{target = TPid, initiator = IPid} =
+                        ByteStream] when is_pid(TPid), is_pid(IPid) ->
+                       ActiveFlag = ByteStream#bytestream.active,
+                       if ActiveFlag == false ->
+                               ConnsPerJID = mnesia:select(
+                                               bytestream,
+                                               [{#bytestream{sha1 = '$1',
+                                                             jid_i = Initiator,
+                                                             _ = '_'},
+                                                 [], ['$1']}]),
+                               if length(ConnsPerJID) < MaxConnections ->
+                                       mnesia:write(
+                                         ByteStream#bytestream{active = true,
+                                                               jid_i = Initiator}),
+                                       {ok, IPid, TPid};
+                                  true ->
+                                       {limit, IPid, TPid}
+                               end;
+                          true ->
+                               conflict
+                       end;
+                   _ ->
+                       notfound
+               end
+       end,
+    Reply = mnesia:transaction(F),
+    {reply, Reply, State};
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
index 0f69086e0c3d8a3d156d2dcf4e5b7f415e73256f..db844cb81beaba4d71b2ab2ddcc6bcab7a20d219 100644 (file)
@@ -175,31 +175,39 @@ process_bytestreams(#iq{type = set, lang = Lang, from = InitiatorJID, to = To,
                                 all),
     case acl:match_rule(ServerHost, ACL, InitiatorJID) of
        allow ->
+           Node = ejabberd_cluster:get_node_by_id(To#jid.lresource),
            Target = jid:to_string(jid:tolower(TargetJID)),
            Initiator = jid:to_string(jid:tolower(InitiatorJID)),
            SHA1 = p1_sha:sha(<<SID/binary, Initiator/binary, Target/binary>>),
-           case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID,
-                                               TargetJID, ServerHost) of
-               ok ->
+           Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+           MaxConnections = max_connections(ServerHost),
+           case Mod:activate_stream(SHA1, Initiator, MaxConnections, Node) of
+               {ok, InitiatorPid, TargetPid} ->
+                   mod_proxy65_stream:activate(
+                     {InitiatorPid, InitiatorJID}, {TargetPid, TargetJID}),
                    xmpp:make_iq_result(IQ);
-               false ->
+               {error, notfound} ->
                    Txt = <<"Failed to activate bytestream">>,
                    xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang));
-               limit ->
+               {error, {limit, InitiatorPid, TargetPid}} ->
+                   mod_proxy65_stream:stop(InitiatorPid),
+                   mod_proxy65_stream:stop(TargetPid),
                    Txt = <<"Too many active bytestreams">>,
                    xmpp:make_error(IQ, xmpp:err_resource_constraint(Txt, Lang));
-               conflict ->
+               {error, conflict} ->
                    Txt = <<"Bytestream already activated">>,
                    xmpp:make_error(IQ, xmpp:err_conflict(Txt, Lang));
-               Err ->
+               {error, Err} ->
                    ?ERROR_MSG("failed to activate bytestream from ~s to ~s: ~p",
                               [Initiator, Target, Err]),
-                   xmpp:make_error(IQ, xmpp:err_internal_server_error())
+                   Txt = <<"Database failure">>,
+                   xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
            end;
        deny ->
            Txt = <<"Denied by ACL">>,
            xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang))
     end.
+
 %%%-------------------------
 %%% Auxiliary functions.
 %%%-------------------------
@@ -219,7 +227,8 @@ get_streamhost(Host, ServerHost) ->
     HostName = gen_mod:get_module_opt(ServerHost, mod_proxy65, hostname,
                                      fun iolist_to_binary/1,
                                      jlib:ip_to_list(IP)),
-    #streamhost{jid = jid:make(Host),
+    Resource = ejabberd_cluster:node_id(),
+    #streamhost{jid = jid:make(<<"">>, Host, Resource),
                host = HostName,
                port = Port}.
 
@@ -246,3 +255,9 @@ get_my_ip() ->
       {ok, Addr} -> Addr;
       {error, _} -> {127, 0, 0, 1}
     end.
+
+max_connections(ServerHost) ->
+    gen_mod:get_module_opt(ServerHost, mod_proxy65, max_connections,
+                          fun(I) when is_integer(I), I>0 -> I;
+                             (infinity) -> infinity
+                          end, infinity).
diff --git a/src/mod_proxy65_sm.erl b/src/mod_proxy65_sm.erl
deleted file mode 100644 (file)
index b1d33b5..0000000
+++ /dev/null
@@ -1,171 +0,0 @@
-%%%----------------------------------------------------------------------
-%%% File    : mod_proxy65_sm.erl
-%%% Author  : Evgeniy Khramtsov <xram@jabber.ru>
-%%% Purpose : Bytestreams manager.
-%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2016   ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(mod_proxy65_sm).
-
--author('xram@jabber.ru').
-
--behaviour(gen_server).
-
-%% gen_server callbacks.
--export([init/1, handle_info/2, handle_call/3,
-        handle_cast/2, terminate/2, code_change/3]).
-
--export([start_link/2, register_stream/1,
-        unregister_stream/1, activate_stream/4]).
-
--record(state, {max_connections = infinity :: non_neg_integer() | infinity}).
-
--record(bytestream,
-       {sha1 = <<"">> :: binary() | '$1',
-         target :: pid() | '_',
-         initiator :: pid() | '_',
-         active = false :: boolean() | '_',
-         jid_i = {<<"">>, <<"">>, <<"">>} :: jid:ljid() | '_'}).
-
--define(PROCNAME, ejabberd_mod_proxy65_sm).
-
-%% Unused callbacks.
-handle_cast(_Request, State) -> {noreply, State}.
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-handle_info(_Info, State) -> {noreply, State}.
-
-%%----------------
-
-start_link(Host, Opts) ->
-    Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
-    gen_server:start_link({local, Proc}, ?MODULE, [Opts],
-                         []).
-
-init([Opts]) ->
-    ejabberd_mnesia:create(?MODULE, bytestream,
-                       [{ram_copies, [node()]},
-                        {attributes, record_info(fields, bytestream)}]),
-    mnesia:add_table_copy(bytestream, node(), ram_copies),
-    MaxConnections = gen_mod:get_opt(max_connections, Opts,
-                                     fun(I) when is_integer(I), I>0 ->
-                                             I;
-                                        (infinity) ->
-                                             infinity
-                                     end, infinity),
-    {ok, #state{max_connections = MaxConnections}}.
-
-terminate(_Reason, _State) -> ok.
-
-handle_call({activate, SHA1, IJid}, _From, State) ->
-    MaxConns = State#state.max_connections,
-    F = fun () ->
-               case mnesia:read(bytestream, SHA1, write) of
-                 [#bytestream{target = TPid, initiator = IPid} =
-                      ByteStream]
-                     when is_pid(TPid), is_pid(IPid) ->
-                     ActiveFlag = ByteStream#bytestream.active,
-                     if ActiveFlag == false ->
-                            ConnsPerJID = mnesia:select(bytestream,
-                                                        [{#bytestream{sha1 =
-                                                                          '$1',
-                                                                      jid_i =
-                                                                          IJid,
-                                                                      _ = '_'},
-                                                          [], ['$1']}]),
-                            if length(ConnsPerJID) < MaxConns ->
-                                   mnesia:write(ByteStream#bytestream{active =
-                                                                          true,
-                                                                      jid_i =
-                                                                          IJid}),
-                                   {ok, IPid, TPid};
-                               true -> {limit, IPid, TPid}
-                            end;
-                        true -> conflict
-                     end;
-                 _ -> false
-               end
-       end,
-    Reply = mnesia:transaction(F),
-    {reply, Reply, State};
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-%%%----------------------
-%%% API.
-%%%----------------------
-%%%---------------------------------------------------
-%%% register_stream(SHA1) -> {atomic, ok}      |
-%%%                          {atomic, error}   |
-%%%                          transaction abort
-%%% SHA1 = string()
-%%%---------------------------------------------------
-register_stream(SHA1) when is_binary(SHA1) ->
-    StreamPid = self(),
-    F = fun () ->
-               case mnesia:read(bytestream, SHA1, write) of
-                 [] ->
-                     mnesia:write(#bytestream{sha1 = SHA1,
-                                              target = StreamPid});
-                 [#bytestream{target = Pid, initiator = undefined} =
-                      ByteStream]
-                     when is_pid(Pid), Pid /= StreamPid ->
-                     mnesia:write(ByteStream#bytestream{initiator =
-                                                            StreamPid});
-                 _ -> error
-               end
-       end,
-    mnesia:transaction(F).
-
-%%%----------------------------------------------------
-%%% unregister_stream(SHA1) -> ok | transaction abort
-%%% SHA1 = string()
-%%%----------------------------------------------------
-unregister_stream(SHA1) when is_binary(SHA1) ->
-    F = fun () -> mnesia:delete({bytestream, SHA1}) end,
-    mnesia:transaction(F).
-
-%%%--------------------------------------------------------
-%%% activate_stream(SHA1, IJid, TJid, Host) -> ok       |
-%%%                                            false    |
-%%%                                            limit    |
-%%%                                            conflict |
-%%%                                            error
-%%% SHA1 = string()
-%%% IJid = TJid = jid()
-%%% Host = string()
-%%%--------------------------------------------------------
-activate_stream(SHA1, IJid, TJid, Host)
-    when is_binary(SHA1) ->
-    Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
-    case catch gen_server:call(Proc, {activate, SHA1, IJid})
-       of
-      {atomic, {ok, IPid, TPid}} ->
-         mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
-      {atomic, {limit, IPid, TPid}} ->
-         mod_proxy65_stream:stop(IPid),
-         mod_proxy65_stream:stop(TPid),
-         limit;
-      {atomic, conflict} -> conflict;
-      {atomic, false} -> false;
-      _ -> error
-    end.
index e6362d48c2329b5da20d0122e092da199275c844..484f41327da0968b31a9b23433215432700f1b75 100644 (file)
@@ -99,7 +99,8 @@ init([Socket, Host, Opts]) ->
            socket = Socket, shaper = Shaper, timer = TRef}}.
 
 terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
-    catch mod_proxy65_sm:unregister_stream(SHA1),
+    Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+    Mod:unregister_stream(SHA1),
     if StateName == stream_established ->
           ?INFO_MSG("Bytestream terminated", []);
        true -> ok
@@ -168,8 +169,9 @@ wait_for_request(Packet,
     Request = mod_proxy65_lib:unpack_request(Packet),
     case Request of
       #s5_request{sha1 = SHA1, cmd = connect} ->
-         case catch mod_proxy65_sm:register_stream(SHA1) of
-           {atomic, ok} ->
+         Mod = gen_mod:ram_db_mod(global, mod_proxy65),
+         case Mod:register_stream(SHA1, self()) of
+           ok ->
                inet:setopts(Socket, [{active, false}]),
                gen_tcp:send(Socket,
                             mod_proxy65_lib:make_reply(Request)),