]> granicus.if.org Git - ejabberd/commitdiff
Add Redis pool support
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 6 Apr 2017 14:56:37 +0000 (17:56 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 6 Apr 2017 14:56:37 +0000 (17:56 +0300)
Fixes #1624

src/ejabberd_redis.erl
src/ejabberd_redis_sup.erl [new file with mode: 0644]
src/ejabberd_sup.erl

index ec5d73596ecd083f9a4031c53787ebfe39e6fc60..8eeb196070cd1abe1f16a80f80a137be4fdfdaaf 100644 (file)
 %%%----------------------------------------------------------------------
 
 -module(ejabberd_redis).
-
--behaviour(gen_server).
--behaviour(ejabberd_config).
+-ifndef(GEN_SERVER).
+-define(GEN_SERVER, gen_server).
+-endif.
+-behaviour(?GEN_SERVER).
 
 -compile({no_auto_import, [get/1, put/2]}).
 
 %% API
--export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
+-export([start_link/1, get_proc/1, q/1, qp/1]).
 %% Commands
 -export([multi/1, get/1, set/2, del/1,
         sadd/2, srem/2, smembers/1, sismember/2, scard/1,
 -define(SERVER, ?MODULE).
 -define(PROCNAME, 'ejabberd_redis_client').
 -define(TR_STACK, redis_transaction_stack).
+-define(DEFAULT_MAX_QUEUE, 5000).
+-define(MAX_RETRIES, 1).
+-define(CALL_TIMEOUT, 60*1000). %% 60 seconds
 
 -include("logger.hrl").
 -include("ejabberd.hrl").
 
--record(state, {connection :: {pid(), reference()} | undefined}).
+-record(state, {connection :: pid() | undefined,
+               num :: pos_integer(),
+               pending_q :: p1_queue:queue()}).
 
 -type redis_error() :: {error, binary() | atom()}.
 
 %%%===================================================================
 %%% API
 %%%===================================================================
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(I) ->
+    ?GEN_SERVER:start_link({local, get_proc(I)}, ?MODULE, [I], []).
+
+get_proc(I) ->
+    aux:binary_to_atom(
+      iolist_to_binary(
+       [atom_to_list(?MODULE), $_, integer_to_list(I)])).
+
+get_connection(I) ->
+    aux:binary_to_atom(
+      iolist_to_binary(
+       [atom_to_list(?MODULE), "_connection_", integer_to_list(I)])).
 
 q(Command) ->
-    try eredis:q(?PROCNAME, Command)
-    catch _:{noproc, _} -> {error, disconnected};
-         _:{timeout, _} -> {error, timeout}
-    end.
+    call(get_worker(), {q, Command}, ?MAX_RETRIES).
 
 qp(Pipeline) ->
-    try eredis:qp(?PROCNAME, Pipeline)
-    catch _:{noproc, _} -> {error, disconnected};
-         _:{timeout, _} -> {error, timeout}
-    end.
+    call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
 
 -spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
 multi(F) ->
@@ -91,14 +101,6 @@ multi(F) ->
            {error, nested_transaction}
     end.
 
-config_reloaded() ->
-    case is_redis_configured() of
-       true ->
-           ?MODULE ! connect;
-       false ->
-           ?MODULE ! disconnect
-    end.
-
 -spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
 get(Key) ->
     case erlang:get(?TR_STACK) of
@@ -274,56 +276,63 @@ hkeys(Key) ->
 %%%===================================================================
 %%% gen_server callbacks
 %%%===================================================================
-init([]) ->
-    ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
+init([I]) ->
     process_flag(trap_exit, true),
-    {_, State} = handle_info(connect, #state{}),
-    {ok, State}.
-
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
+    QueueType = get_queue_type(),
+    Limit = max_fsm_queue(),
+    self() ! connect,
+    {ok, #state{num = I, pending_q = p1_queue:new(QueueType, Limit)}}.
+
+handle_call(connect, From, #state{connection = undefined,
+                                 pending_q = Q} = State) ->
+    CurrTime = p1_time_compat:monotonic_time(milli_seconds),
+    Q2 = try p1_queue:in({From, CurrTime}, Q)
+        catch error:full ->
+                Q1 = clean_queue(Q, CurrTime),
+                p1_queue:in({From, CurrTime}, Q1)
+        end,
+    {noreply, State#state{pending_q = Q2}};
+handle_call(connect, From, #state{connection = Pid} = State) ->
+    case is_process_alive(Pid) of
+       true ->
+           {reply, ok, State};
+       false ->
+           self() ! connect,
+           handle_call(connect, From, State#state{connection = undefined})
+    end;
+handle_call(Request, _From, State) ->
+    ?WARNING_MSG("unexepected call: ~p", [Request]),
+    {noreply, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info(connect, #state{connection = undefined} = State) ->
-    NewState = case is_redis_configured() of
-                  true ->
-                      case connect() of
-                          {ok, Connection} ->
-                              State#state{connection = Connection};
-                          {error, _} ->
-                              State
-                      end;
-                  false ->
+    NewState = case connect(State) of
+                  {ok, Connection} ->
+                      Q1 = flush_queue(State#state.pending_q),
+                      State#state{connection = Connection, pending_q = Q1};
+                  {error, _} ->
                       State
               end,
     {noreply, NewState};
 handle_info(connect, State) ->
     %% Already connected
     {noreply, State};
-handle_info(disconnect, #state{connection = {Pid, MRef}} = State) ->
-    ?INFO_MSG("Disconnecting from Redis server", []),
-    erlang:demonitor(MRef, [flush]),
-    eredis:stop(Pid),
-    {noreply, State#state{connection = undefined}};
-handle_info(disconnect, State) ->
-    %% Not connected
-    {noreply, State};
-handle_info({'DOWN', MRef, _Type, Pid, Reason},
-           #state{connection = {Pid, MRef}} = State) ->
-    ?INFO_MSG("Redis connection has failed: ~p", [Reason]),
-    connect(),
-    {noreply, State#state{connection = undefined}};
-handle_info({'EXIT', _, _}, State) ->
-    {noreply, State};
+handle_info({'EXIT', Pid, _}, State) ->
+    case State#state.connection of
+       Pid ->
+           self() ! connect,
+           {noreply, State#state{connection = undefined}};
+       _ ->
+           {noreply, State}
+    end;
 handle_info(Info, State) ->
-    ?INFO_MSG("unexpected info = ~p", [Info]),
+    ?WARNING_MSG("unexpected info = ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
-    ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 20).
+    ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -331,37 +340,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-is_redis_configured() ->
-    lists:any(fun is_redis_configured/1, ?MYHOSTS).
-
-is_redis_configured(Host) ->
-    ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
-    PortConfigured = ejabberd_config:has_option({redis_port, Host}),
-    DBConfigured = ejabberd_config:has_option({redis_db, Host}),
-    PassConfigured = ejabberd_config:has_option({redis_password, Host}),
-    ReconnTimeoutConfigured = ejabberd_config:has_option(
-                               {redis_reconnect_timeout, Host}),
-    ConnTimeoutConfigured = ejabberd_config:has_option(
-                             {redis_connect_timeout, Host}),
-    Modules = ejabberd_config:get_option(
-               {modules, Host},
-               fun(L) when is_list(L) -> L end, []),
-    SMConfigured = ejabberd_config:get_option(
-                    {sm_db_type, Host},
-                    fun(V) -> V end) == redis,
-    ModuleWithRedisDBConfigured =
-       lists:any(
-         fun({Module, Opts}) ->
-                 gen_mod:db_type(Host, Opts, Module) == redis
-         end, Modules),
-    ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
-       ReconnTimeoutConfigured or ConnTimeoutConfigured or
-       SMConfigured or ModuleWithRedisDBConfigured.
-
-iolist_to_list(IOList) ->
-    binary_to_list(iolist_to_binary(IOList)).
-
-connect() ->
+connect(#state{num = Num}) ->
     Server = ejabberd_config:get_option(redis_server,
                                        fun iolist_to_list/1,
                                        "localhost"),
@@ -377,36 +356,60 @@ connect() ->
     Pass = ejabberd_config:get_option(redis_password,
                                      fun iolist_to_list/1,
                                      ""),
-    ReconnTimeout = timer:seconds(
-                     ejabberd_config:get_option(
-                       redis_reconnect_timeout,
-                       fun(I) when is_integer(I), I>0 -> I end,
-                       1)),
     ConnTimeout = timer:seconds(
                    ejabberd_config:get_option(
                      redis_connect_timeout,
                      fun(I) when is_integer(I), I>0 -> I end,
                      1)),
     try case eredis:start_link(Server, Port, DB, Pass,
-                              ReconnTimeout, ConnTimeout) of
+                              no_reconnect, ConnTimeout) of
            {ok, Client} ->
-               ?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
-               unlink(Client),
-               MRef = erlang:monitor(process, Client),
-               register(?PROCNAME, Client),
-               {ok, {Client, MRef}};
+               ?DEBUG("Connection #~p established to Redis at ~s:~p",
+                      [Num, Server, Port]),
+               register(get_connection(Num), Client),
+               {ok, Client};
            {error, Why} ->
                erlang:error(Why)
        end
     catch _:Reason ->
-           Timeout = 10,
-           ?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
+           Timeout = randoms:uniform(
+                       min(10, ejabberd_redis_sup:get_pool_size())),
+           ?ERROR_MSG("Redis connection #~p at ~s:~p has failed: ~p; "
                       "reconnecting in ~p seconds",
-                      [Server, Port, Reason, Timeout]),
+                      [Num, Server, Port, Reason, Timeout]),
            erlang:send_after(timer:seconds(Timeout), self(), connect),
            {error, Reason}
     end.
 
+-spec call({atom(), atom()}, {q | qp, list()}, integer()) ->
+      {error, disconnected | timeout | binary()} | {ok, iodata()}.
+call({Conn, Parent}, {F, Cmd}, Retries) ->
+    Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
+             {error, Reason} when is_atom(Reason) ->
+                 try exit(whereis(Conn), kill) catch _:_ -> ok end,
+                 {error, disconnected};
+             Other ->
+                 Other
+         catch exit:{timeout, _} -> {error, timeout};
+               exit:{_, {gen_server, call, _}} -> {error, disconnected}
+         end,
+    case Res of
+       {error, disconnected} when Retries > 0 ->
+           try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of
+               ok -> call({Conn, Parent}, {F, Cmd}, Retries-1);
+               {error, _} = Err -> Err
+           catch exit:{timeout, _} -> {error, timeout};
+                 exit:{_, {?GEN_SERVER, call, _}} -> {error, disconnected}
+           end;
+       _ ->
+           Res
+    end.
+
+get_worker() ->
+    Time = p1_time_compat:system_time(),
+    I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
+    {get_connection(I), get_proc(I)}.
+
 get_result([{error, _} = Err|_]) ->
     Err;
 get_result([{ok, _} = OK]) ->
@@ -436,16 +439,51 @@ reply(Val) ->
        _ -> queued
     end.
 
-opt_type(redis_connect_timeout) ->
-    fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(redis_db) ->
-    fun (I) when is_integer(I), I >= 0 -> I end;
-opt_type(redis_password) -> fun iolist_to_list/1;
-opt_type(redis_port) ->
-    fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
-opt_type(redis_reconnect_timeout) ->
-    fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(redis_server) -> fun iolist_to_list/1;
-opt_type(_) ->
-    [redis_connect_timeout, redis_db, redis_password,
-     redis_port, redis_reconnect_timeout, redis_server].
+iolist_to_list(IOList) ->
+    binary_to_list(iolist_to_binary(IOList)).
+
+max_fsm_queue() ->
+    proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE).
+
+fsm_limit_opts() ->
+    ejabberd_config:fsm_limit_opts([]).
+
+get_queue_type() ->
+    case ejabberd_config:get_option(
+          redis_queue_type,
+          ejabberd_redis_sup:opt_type(redis_queue_type)) of
+       undefined ->
+           ejabberd_config:default_queue_type(global);
+       Type ->
+           Type
+    end.
+
+flush_queue(Q) ->
+    CurrTime = p1_time_compat:monotonic_time(milli_seconds),
+    p1_queue:dropwhile(
+      fun({From, Time}) ->
+             if (CurrTime - Time) >= ?CALL_TIMEOUT ->
+                     ok;
+                true ->
+                     ?GEN_SERVER:reply(From, ok)
+             end,
+             true
+      end, Q).
+
+clean_queue(Q, CurrTime) ->
+    Q1 = p1_queue:dropwhile(
+          fun({_From, Time}) ->
+                  (CurrTime - Time) >= ?CALL_TIMEOUT
+          end, Q),
+    Len = p1_queue:len(Q1),
+    Limit = p1_queue:get_limit(Q1),
+    if Len >= Limit ->
+           ?ERROR_MSG("Redis request queue is overloaded", []),
+           p1_queue:dropwhile(
+             fun({From, _Time}) ->
+                     ?GEN_SERVER:reply(From, {error, disconnected}),
+                     true
+             end, Q1);
+       true ->
+           Q1
+    end.
diff --git a/src/ejabberd_redis_sup.erl b/src/ejabberd_redis_sup.erl
new file mode 100644 (file)
index 0000000..23330f8
--- /dev/null
@@ -0,0 +1,159 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created :  6 Apr 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2017   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%-------------------------------------------------------------------
+-module(ejabberd_redis_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0, get_pool_size/0,
+        host_up/1, config_reloaded/0, opt_type/1]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+
+-define(DEFAULT_POOL_SIZE, 10).
+
+%%%===================================================================
+%%% API functions
+%%%===================================================================
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+host_up(Host) ->
+    case is_redis_configured(Host) of
+       true ->
+           ejabberd:start_app(eredis),
+           lists:foreach(
+             fun(Spec) ->
+                     supervisor:start_child(?MODULE, Spec)
+             end, get_specs());
+       false ->
+           ok
+    end.
+
+config_reloaded() ->
+    case is_redis_configured() of
+       true ->
+           ejabberd:start_app(eredis),
+           lists:foreach(
+             fun(Spec) ->
+                     supervisor:start_child(?MODULE, Spec)
+             end, get_specs()),
+           PoolSize = get_pool_size(),
+           lists:foreach(
+             fun({Id, _, _, _}) when Id > PoolSize ->
+                     supervisor:terminate_child(?MODULE, Id),
+                     supervisor:delete_child(?MODULE, Id);
+                (_) ->
+                     ok
+             end, supervisor:which_children(?MODULE));
+       false ->
+           lists:foreach(
+             fun({Id, _, _, _}) ->
+                     supervisor:terminate_child(?MODULE, Id),
+                     supervisor:delete_child(?MODULE, Id)
+             end, supervisor:which_children(?MODULE))
+    end.
+
+%%%===================================================================
+%%% Supervisor callbacks
+%%%===================================================================
+init([]) ->
+    ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
+    ejabberd_hooks:add(host_up, ?MODULE, host_up, 20),
+    Specs = case is_redis_configured() of
+               true ->
+                   ejabberd:start_app(eredis),
+                   get_specs();
+               false ->
+                   []
+           end,
+    {ok, {{one_for_one, 500, 1}, Specs}}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+is_redis_configured() ->
+    lists:any(fun is_redis_configured/1, ?MYHOSTS).
+
+is_redis_configured(Host) ->
+    ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
+    PortConfigured = ejabberd_config:has_option({redis_port, Host}),
+    DBConfigured = ejabberd_config:has_option({redis_db, Host}),
+    PassConfigured = ejabberd_config:has_option({redis_password, Host}),
+    PoolSize = ejabberd_config:has_option({redis_pool_size, Host}),
+    ConnTimeoutConfigured = ejabberd_config:has_option(
+                             {redis_connect_timeout, Host}),
+    Modules = ejabberd_config:get_option(
+               {modules, Host},
+               fun(L) when is_list(L) -> L end, []),
+    SMConfigured = ejabberd_config:get_option(
+                    {sm_db_type, Host},
+                    fun(V) -> V end) == redis,
+    RouterConfigured = ejabberd_config:get_option(
+                        {router_db_type, Host},
+                        fun(V) -> V end) == redis,
+    ModuleWithRedisDBConfigured =
+       lists:any(
+         fun({Module, Opts}) ->
+                 gen_mod:db_type(Host, Opts, Module) == redis
+         end, Modules),
+    ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
+       PoolSize or ConnTimeoutConfigured or
+       SMConfigured or RouterConfigured or ModuleWithRedisDBConfigured.
+
+get_specs() ->
+    lists:map(
+      fun(I) ->
+             {I, {ejabberd_redis, start_link, [I]},
+              transient, 2000, worker, [?MODULE]}
+      end, lists:seq(1, get_pool_size())).
+
+get_pool_size() ->
+    ejabberd_config:get_option(
+      redis_pool_size,
+      fun(N) when is_integer(N), N >= 1 -> N end,
+      ?DEFAULT_POOL_SIZE).
+
+iolist_to_list(IOList) ->
+    binary_to_list(iolist_to_binary(IOList)).
+
+opt_type(redis_connect_timeout) ->
+    fun (I) when is_integer(I), I > 0 -> I end;
+opt_type(redis_db) ->
+    fun (I) when is_integer(I), I >= 0 -> I end;
+opt_type(redis_password) -> fun iolist_to_list/1;
+opt_type(redis_port) ->
+    fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
+opt_type(redis_server) -> fun iolist_to_list/1;
+opt_type(redis_pool_size) ->
+    fun (I) when is_integer(I), I > 0 -> I end;
+opt_type(redis_queue_type) ->
+    fun(ram) -> ram; (file) -> file end;
+opt_type(_) ->
+    [redis_connect_timeout, redis_db, redis_password,
+     redis_port, redis_pool_size, redis_server,
+     redis_pool_size, redis_queue_type].
index 0a33a5c76ca397f9e76f86a7796710ec27b3af0c..26e1f9e2e5e7f18e1478e935499ec9abaa471869 100644 (file)
@@ -115,8 +115,9 @@ init([]) ->
     RiakSupervisor = {ejabberd_riak_sup,
                     {ejabberd_riak_sup, start_link, []},
                      permanent, infinity, supervisor, [ejabberd_riak_sup]},
-    Redis = {ejabberd_redis, {ejabberd_redis, start_link, []},
-            permanent, 5000, worker, [ejabberd_redis]},
+    RedisSupervisor = {ejabberd_redis_sup,
+                      {ejabberd_redis_sup, start_link, []},
+                      permanent, infinity, supervisor, [ejabberd_redis_sup]},
     Router = {ejabberd_router, {ejabberd_router, start_link, []},
              permanent, 5000, worker, [ejabberd_router]},
     RouterMulticast = {ejabberd_router_multicast,
@@ -168,7 +169,7 @@ init([]) ->
           BackendSupervisor,
           SQLSupervisor,
           RiakSupervisor,
-          Redis,
+          RedisSupervisor,
           Router,
           RouterMulticast,
           Local,