]> granicus.if.org Git - ejabberd/commitdiff
Improve Riak pool management
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Wed, 9 Jul 2014 12:40:18 +0000 (16:40 +0400)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 10 Jul 2014 09:52:29 +0000 (13:52 +0400)
src/ejabberd_riak.erl
src/ejabberd_riak_sup.erl

index 04ff1ea11503b9668ac9a1b03f91fff5d35e2e6a..0b576ca6128c9a6a92ff3695d675ee9b06c3f358 100644 (file)
@@ -27,7 +27,7 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/3, make_bucket/1, put/1, put/2,
+-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
          get/1, get/2, get_by_index/3, delete/1, delete/2,
          count_by_index/3, get_by_index_range/4,
          get_keys/1, get_keys_by_index/3,
 %%% API
 %%%===================================================================
 %% @private
-start_link(Server, Port, _StartInterval) ->
-    gen_server:start_link(?MODULE, [Server, Port], []).
+start_link(Num, Server, Port, _StartInterval) ->
+    gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []).
+
+%% @private
+get_proc(I) ->
+    jlib:binary_to_atom(
+      iolist_to_binary(
+       [atom_to_list(?MODULE), $_, integer_to_list(I)])).
 
 -spec make_bucket(atom()) -> binary().
 %% @doc Makes a bucket from a table name
@@ -101,21 +107,21 @@ put_raw(Table, Key, Value, Indexes) ->
               true ->
                    Obj
            end,
-    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
+    catch riakc_pb_socket:put(get_random_pid(), Obj1).
 
 get_object_raw(Table, Key) ->
     Bucket = make_bucket(Table),
-    riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+    catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
 
 -spec get(atom()) -> {ok, [any()]} | {error, any()}.
 %% @doc Returns all objects from table `Table'
 get(Table) ->
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           Bucket,
-           [{map, {modfun, riak_kv_mapreduce, map_object_value},
-             none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                Bucket,
+                [{map, {modfun, riak_kv_mapreduce, map_object_value},
+                  none, true}]) of
         {ok, [{_, Objs}]} ->
             {ok, lists:flatmap(
                    fun(Obj) ->
@@ -131,7 +137,7 @@ get(Table) ->
                    end, Objs)};
         {error, notfound} ->
             {ok, []};
-        Error ->
+        {error, _} = Error ->
             Error
     end.
 
@@ -228,13 +234,13 @@ get_raw(Table, Key) ->
 %% @doc Returns a list of index values
 get_keys(Table) ->
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           Bucket,
-           [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                Bucket,
+                [{map, {modfun, ?MODULE, map_key}, none, true}]) of
         {ok, [{_, Keys}]} ->
             {ok, Keys};
-        Error ->
+        {error, _} = Error ->
             log_error(Error, get_keys, [{table, Table}]),
             Error
     end.
@@ -245,13 +251,13 @@ get_keys(Table) ->
 get_keys_by_index(Table, Index, Key) ->
     {NewIndex, NewKey} = encode_index_key(Index, Key),
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           {index, Bucket, NewIndex, NewKey},
-           [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                {index, Bucket, NewIndex, NewKey},
+                [{map, {modfun, ?MODULE, map_key}, none, true}]) of
         {ok, [{_, Keys}]} ->
             {ok, Keys};
-        Error ->
+        {error, _} = Error ->
             log_error(Error, get_keys_by_index, [{table, Table},
                                                  {index, Index},
                                                  {key, Key}]),
@@ -260,31 +266,31 @@ get_keys_by_index(Table, Index, Key) ->
 
 %% @hidden
 get_tables() ->
-    riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
+    catch riakc_pb_socket:list_buckets(get_random_pid()).
 
 get_by_index_raw(Table, Index, Key) ->
     Bucket = make_bucket(Table),
     case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
+           get_random_pid(),
            {index, Bucket, Index, Key},
            [{map, {modfun, riak_kv_mapreduce, map_object_value},
              none, true}]) of
         {ok, [{_, Objs}]} ->
             {ok, Objs};
-        Error ->
+        {error, _} = Error ->
             Error
     end.
 
 get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           {index, Bucket, Index, FromKey, ToKey},
-           [{map, {modfun, riak_kv_mapreduce, map_object_value},
-             none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                {index, Bucket, Index, FromKey, ToKey},
+                [{map, {modfun, riak_kv_mapreduce, map_object_value},
+                  none, true}]) of
         {ok, [{_, Objs}]} ->
             {ok, Objs};
-        Error ->
+        {error, _} = Error ->
             Error
     end.
 
@@ -292,14 +298,14 @@ get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
 %% @doc Returns the number of objects in the `Table'
 count(Table) ->
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           Bucket,
-           [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
-             none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                Bucket,
+                [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+                  none, true}]) of
         {ok, [{_, [Cnt]}]} ->
             {ok, Cnt};
-        Error ->
+        {error, _} = Error ->
             log_error(Error, count, [{table, Table}]),
             Error
     end.
@@ -324,14 +330,14 @@ count_by_index(Tab, Index, Key) ->
 
 count_by_index_raw(Table, Index, Key) ->
     Bucket = make_bucket(Table),
-    case riakc_pb_socket:mapred(
-           ejabberd_riak_sup:get_random_pid(),
-           {index, Bucket, Index, Key},
-           [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
-             none, true}]) of
+    case catch riakc_pb_socket:mapred(
+                get_random_pid(),
+                {index, Bucket, Index, Key},
+                [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+                  none, true}]) of
         {ok, [{_, [Cnt]}]} ->
             {ok, Cnt};
-        Error ->
+        {error, _} = Error ->
             Error
     end.
 
@@ -368,7 +374,7 @@ delete(Table, Key) when is_atom(Table) ->
 
 delete_raw(Table, Key) ->
     Bucket = make_bucket(Table),
-    riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+    catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key).
 
 -spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
 %% @doc Deletes objects by index
@@ -407,13 +413,14 @@ init([Server, Port]) ->
            [auto_reconnect]) of
         {ok, Pid} ->
             erlang:monitor(process, Pid),
-            ejabberd_riak_sup:add_pid(Pid),
             {ok, #state{pid = Pid}};
         Err ->
             {stop, Err}
     end.
 
 %% @private
+handle_call(get_pid, _From, #state{pid = Pid} = State) ->
+    {reply, {ok, Pid}, State};
 handle_call(_Request, _From, State) ->
     Reply = ok,
     {reply, Reply, State}.
@@ -430,8 +437,7 @@ handle_info(_Info, State) ->
     {noreply, State}.
 
 %% @private
-terminate(_Reason, State) ->
-    ejabberd_riak_sup:remove_pid(State#state.pid),
+terminate(_Reason, _State) ->
     ok.
 
 %% @private
@@ -486,3 +492,14 @@ log_error(_, _, _) ->
 
 make_invalid_object(Val) ->
     list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
+
+get_random_pid() ->
+    PoolPid = ejabberd_riak_sup:get_random_pid(),
+    case catch gen_server:call(PoolPid, get_pid) of
+       {ok, Pid} ->
+           Pid;
+       {'EXIT', {timeout, _}} ->
+           throw({error, timeout});
+       {'EXIT', Err} ->
+           throw({error, Err})
+    end.
index d19b9fbe9ff643a89a890e1c1bbd2d90d5314bb7..a066a3c8cc57cd36cb53c68e820d524a18e4c960 100644 (file)
 -export([start/0,
          start_link/0,
         init/1,
-        add_pid/1,
-        remove_pid/1,
         get_pids/0,
-        get_random_pid/0
+         transform_options/1,
+        get_random_pid/0,
+        get_random_pid/1
        ]).
 
 -include("ejabberd.hrl").
+-include("logger.hrl").
 
 -define(DEFAULT_POOL_SIZE, 10).
 -define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds
+-define(DEFAULT_RIAK_HOST, "127.0.0.1").
+-define(DEFAULT_RIAK_PORT, 8087).
 
 % time to wait for the supervisor to start its child before returning
 % a timeout error to the request
 -define(CONNECT_TIMEOUT, 500). % milliseconds
 
-
--record(riak_pool, {undefined, pid}).
-
 start() ->
-    StartRiak = ejabberd_config:get_local_option(
-                  riak_server, fun(_) -> true end, false),
-    if
-        StartRiak ->
+    case lists:any(
+          fun(Host) ->
+                  is_riak_configured(Host)
+          end, ?MYHOSTS) of
+       true ->
+           ejabberd:start_app(riakc),
             do_start();
-        true ->
-            ok
+       false ->
+           ok
     end.
 
+is_riak_configured(Host) ->
+    ServerConfigured = ejabberd_config:get_option(
+                        {riak_server, Host},
+                        fun(_) -> true end, false),
+    PortConfigured = ejabberd_config:get_option(
+                      {riak_port, Host},
+                      fun(_) -> true end, false),
+    Modules = ejabberd_config:get_option(
+               {modules, Host},
+               fun(L) when is_list(L) -> L end, []),
+    ModuleWithRiakDBConfigured = lists:any(
+                                  fun({_Module, Opts}) ->
+                                          gen_mod:db_type(Opts) == riak
+                                  end, Modules),
+    ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured.
+
 do_start() ->
     SupervisorName = ?MODULE,
     ChildSpec =
@@ -79,65 +97,61 @@ do_start() ->
     end.
 
 start_link() ->
-    mnesia:create_table(riak_pool,
-                       [{ram_copies, [node()]},
-                        {type, bag},
-                        {local_content, true},
-                        {attributes, record_info(fields, riak_pool)}]),
-    mnesia:add_table_copy(riak_pool, node(), ram_copies),
-    F = fun() ->
-               mnesia:delete({riak_pool, undefined})
-       end,
-    mnesia:ets(F),
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    PoolSize =
-        ejabberd_config:get_local_option(
-          riak_pool_size,
-          fun(N) when is_integer(N), N >= 1 -> N end,
-          ?DEFAULT_POOL_SIZE),
-    StartInterval =
-        ejabberd_config:get_local_option(
-          riak_start_interval,
-          fun(N) when is_integer(N), N >= 1 -> N end,
-          ?DEFAULT_RIAK_START_INTERVAL),
-    {Server, Port} =
-        ejabberd_config:get_local_option(
-          riak_server,
-          fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
-                  {binary_to_list(iolist_to_binary(S)), P}
-          end, {"127.0.0.1", 8081}),
+    PoolSize = get_pool_size(),
+    StartInterval = get_start_interval(),
+    Server = get_riak_server(),
+    Port = get_riak_port(),
     {ok, {{one_for_one, PoolSize*10, 1},
          lists:map(
            fun(I) ->
-                   {I,
+                   {ejabberd_riak:get_proc(I),
                     {ejabberd_riak, start_link,
-                      [Server, Port, StartInterval*1000]},
-                    transient,
-                     2000,
-                    worker,
-                    [?MODULE]}
+                      [I, Server, Port, StartInterval*1000]},
+                    transient, 2000, worker, [?MODULE]}
            end, lists:seq(1, PoolSize))}}.
 
+get_start_interval() ->
+    ejabberd_config:get_option(
+      riak_start_interval,
+      fun(N) when is_integer(N), N >= 1 -> N end,
+      ?DEFAULT_RIAK_START_INTERVAL).
+
+get_pool_size() ->
+    ejabberd_config:get_option(
+      riak_pool_size,
+      fun(N) when is_integer(N), N >= 1 -> N end,
+      ?DEFAULT_POOL_SIZE).
+
+get_riak_server() ->
+    ejabberd_config:get_option(
+      riak_server,
+      fun(S) ->
+             binary_to_list(iolist_to_binary(S))
+      end, ?DEFAULT_RIAK_HOST).
+
+get_riak_port() ->
+    ejabberd_config:get_option(
+      riak_port,
+      fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
+      ?DEFAULT_RIAK_PORT).
+
 get_pids() ->
-    Rs = mnesia:dirty_read(riak_pool, undefined),
-    [R#riak_pool.pid || R <- Rs].
+    [ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())].
 
 get_random_pid() ->
-    Pids = get_pids(),
-    lists:nth(erlang:phash(now(), length(Pids)), Pids).
-
-add_pid(Pid) ->
-    F = fun() ->
-               mnesia:write(
-                 #riak_pool{pid = Pid})
-       end,
-    mnesia:ets(F).
-
-remove_pid(Pid) ->
-    F = fun() ->
-               mnesia:delete_object(
-                 #riak_pool{pid = Pid})
-       end,
-    mnesia:ets(F).
+    get_random_pid(now()).
+
+get_random_pid(Term) ->
+    I = erlang:phash2(Term, get_pool_size()) + 1,
+    ejabberd_riak:get_proc(I).
+
+transform_options(Opts) ->
+    lists:foldl(fun transform_options/2, [], Opts).
+
+transform_options({riak_server, {S, P}}, Opts) ->
+    [{riak_server, S}, {riak_port, P}|Opts];
+transform_options(Opt, Opts) ->
+    [Opt|Opts].