-behaviour(gen_server).
%% API
--export([start_link/3, make_bucket/1, put/1, put/2,
+-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
get/1, get/2, get_by_index/3, delete/1, delete/2,
count_by_index/3, get_by_index_range/4,
get_keys/1, get_keys_by_index/3,
%%% API
%%%===================================================================
%% @private
-start_link(Server, Port, _StartInterval) ->
- gen_server:start_link(?MODULE, [Server, Port], []).
+start_link(Num, Server, Port, _StartInterval) ->
+ gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []).
+
+%% @private
+get_proc(I) ->
+ jlib:binary_to_atom(
+ iolist_to_binary(
+ [atom_to_list(?MODULE), $_, integer_to_list(I)])).
-spec make_bucket(atom()) -> binary().
%% @doc Makes a bucket from a table name
true ->
Obj
end,
- riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
+ catch riakc_pb_socket:put(get_random_pid(), Obj1).
get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
- riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+ catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
-spec get(atom()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
get(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{map, {modfun, riak_kv_mapreduce, map_object_value},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
end, Objs)};
{error, notfound} ->
{ok, []};
- Error ->
+ {error, _} = Error ->
Error
end.
%% @doc Returns a list of index values
get_keys(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
- Error ->
+ {error, _} = Error ->
log_error(Error, get_keys, [{table, Table}]),
Error
end.
get_keys_by_index(Table, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, NewIndex, NewKey},
- [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, NewIndex, NewKey},
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
- Error ->
+ {error, _} = Error ->
log_error(Error, get_keys_by_index, [{table, Table},
{index, Index},
{key, Key}]),
%% @hidden
get_tables() ->
- riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
+ catch riakc_pb_socket:list_buckets(get_random_pid()).
get_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
+ get_random_pid(),
{index, Bucket, Index, Key},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
- Error ->
+ {error, _} = Error ->
Error
end.
get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, FromKey, ToKey},
- [{map, {modfun, riak_kv_mapreduce, map_object_value},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, Index, FromKey, ToKey},
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
- Error ->
+ {error, _} = Error ->
Error
end.
%% @doc Returns the number of objects in the `Table'
count(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+ none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
- Error ->
+ {error, _} = Error ->
log_error(Error, count, [{table, Table}]),
Error
end.
count_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, Key},
- [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, Index, Key},
+ [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+ none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
- Error ->
+ {error, _} = Error ->
Error
end.
delete_raw(Table, Key) ->
Bucket = make_bucket(Table),
- riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+ catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key).
-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
%% @doc Deletes objects by index
[auto_reconnect]) of
{ok, Pid} ->
erlang:monitor(process, Pid),
- ejabberd_riak_sup:add_pid(Pid),
{ok, #state{pid = Pid}};
Err ->
{stop, Err}
end.
%% @private
+handle_call(get_pid, _From, #state{pid = Pid} = State) ->
+ {reply, {ok, Pid}, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
{noreply, State}.
%% @private
-terminate(_Reason, State) ->
- ejabberd_riak_sup:remove_pid(State#state.pid),
+terminate(_Reason, _State) ->
ok.
%% @private
make_invalid_object(Val) ->
list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
+
+get_random_pid() ->
+ PoolPid = ejabberd_riak_sup:get_random_pid(),
+ case catch gen_server:call(PoolPid, get_pid) of
+ {ok, Pid} ->
+ Pid;
+ {'EXIT', {timeout, _}} ->
+ throw({error, timeout});
+ {'EXIT', Err} ->
+ throw({error, Err})
+ end.
-export([start/0,
start_link/0,
init/1,
- add_pid/1,
- remove_pid/1,
get_pids/0,
- get_random_pid/0
+ transform_options/1,
+ get_random_pid/0,
+ get_random_pid/1
]).
-include("ejabberd.hrl").
+-include("logger.hrl").
-define(DEFAULT_POOL_SIZE, 10).
-define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds
+-define(DEFAULT_RIAK_HOST, "127.0.0.1").
+-define(DEFAULT_RIAK_PORT, 8087).
% time to wait for the supervisor to start its child before returning
% a timeout error to the request
-define(CONNECT_TIMEOUT, 500). % milliseconds
-
--record(riak_pool, {undefined, pid}).
-
start() ->
- StartRiak = ejabberd_config:get_local_option(
- riak_server, fun(_) -> true end, false),
- if
- StartRiak ->
+ case lists:any(
+ fun(Host) ->
+ is_riak_configured(Host)
+ end, ?MYHOSTS) of
+ true ->
+ ejabberd:start_app(riakc),
do_start();
- true ->
- ok
+ false ->
+ ok
end.
+is_riak_configured(Host) ->
+ ServerConfigured = ejabberd_config:get_option(
+ {riak_server, Host},
+ fun(_) -> true end, false),
+ PortConfigured = ejabberd_config:get_option(
+ {riak_port, Host},
+ fun(_) -> true end, false),
+ Modules = ejabberd_config:get_option(
+ {modules, Host},
+ fun(L) when is_list(L) -> L end, []),
+ ModuleWithRiakDBConfigured = lists:any(
+ fun({_Module, Opts}) ->
+ gen_mod:db_type(Opts) == riak
+ end, Modules),
+ ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured.
+
do_start() ->
SupervisorName = ?MODULE,
ChildSpec =
end.
start_link() ->
- mnesia:create_table(riak_pool,
- [{ram_copies, [node()]},
- {type, bag},
- {local_content, true},
- {attributes, record_info(fields, riak_pool)}]),
- mnesia:add_table_copy(riak_pool, node(), ram_copies),
- F = fun() ->
- mnesia:delete({riak_pool, undefined})
- end,
- mnesia:ets(F),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- PoolSize =
- ejabberd_config:get_local_option(
- riak_pool_size,
- fun(N) when is_integer(N), N >= 1 -> N end,
- ?DEFAULT_POOL_SIZE),
- StartInterval =
- ejabberd_config:get_local_option(
- riak_start_interval,
- fun(N) when is_integer(N), N >= 1 -> N end,
- ?DEFAULT_RIAK_START_INTERVAL),
- {Server, Port} =
- ejabberd_config:get_local_option(
- riak_server,
- fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
- {binary_to_list(iolist_to_binary(S)), P}
- end, {"127.0.0.1", 8081}),
+ PoolSize = get_pool_size(),
+ StartInterval = get_start_interval(),
+ Server = get_riak_server(),
+ Port = get_riak_port(),
{ok, {{one_for_one, PoolSize*10, 1},
lists:map(
fun(I) ->
- {I,
+ {ejabberd_riak:get_proc(I),
{ejabberd_riak, start_link,
- [Server, Port, StartInterval*1000]},
- transient,
- 2000,
- worker,
- [?MODULE]}
+ [I, Server, Port, StartInterval*1000]},
+ transient, 2000, worker, [?MODULE]}
end, lists:seq(1, PoolSize))}}.
+get_start_interval() ->
+ ejabberd_config:get_option(
+ riak_start_interval,
+ fun(N) when is_integer(N), N >= 1 -> N end,
+ ?DEFAULT_RIAK_START_INTERVAL).
+
+get_pool_size() ->
+ ejabberd_config:get_option(
+ riak_pool_size,
+ fun(N) when is_integer(N), N >= 1 -> N end,
+ ?DEFAULT_POOL_SIZE).
+
+get_riak_server() ->
+ ejabberd_config:get_option(
+ riak_server,
+ fun(S) ->
+ binary_to_list(iolist_to_binary(S))
+ end, ?DEFAULT_RIAK_HOST).
+
+get_riak_port() ->
+ ejabberd_config:get_option(
+ riak_port,
+ fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
+ ?DEFAULT_RIAK_PORT).
+
get_pids() ->
- Rs = mnesia:dirty_read(riak_pool, undefined),
- [R#riak_pool.pid || R <- Rs].
+ [ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())].
get_random_pid() ->
- Pids = get_pids(),
- lists:nth(erlang:phash(now(), length(Pids)), Pids).
-
-add_pid(Pid) ->
- F = fun() ->
- mnesia:write(
- #riak_pool{pid = Pid})
- end,
- mnesia:ets(F).
-
-remove_pid(Pid) ->
- F = fun() ->
- mnesia:delete_object(
- #riak_pool{pid = Pid})
- end,
- mnesia:ets(F).
+ get_random_pid(now()).
+
+get_random_pid(Term) ->
+ I = erlang:phash2(Term, get_pool_size()) + 1,
+ ejabberd_riak:get_proc(I).
+
+transform_options(Opts) ->
+ lists:foldl(fun transform_options/2, [], Opts).
+
+transform_options({riak_server, {S, P}}, Opts) ->
+ [{riak_server, S}, {riak_port, P}|Opts];
+transform_options(Opt, Opts) ->
+ [Opt|Opts].