num :: pos_integer(),
pending_q :: p1_queue:queue()}).
--type redis_error() :: {error, binary() | atom()}.
+-type redis_error() :: {error, binary() | timeout | disconnected}.
+-type redis_reply() :: binary() | [binary()].
+-type redis_command() :: [binary()].
+-type redis_pipeline() :: [redis_command()].
+-type state() :: #state{}.
%%%===================================================================
%%% API
iolist_to_binary(
[atom_to_list(?MODULE), "_connection_", integer_to_list(I)])).
+-spec q(redis_command()) -> {ok, redis_reply()} | redis_error().
q(Command) ->
call(get_worker(), {q, Command}, ?MAX_RETRIES).
+-spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
qp(Pipeline) ->
call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
--spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
+-spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
multi(F) ->
case erlang:get(?TR_STACK) of
undefined ->
erlang:raise(E, R, erlang:get_stacktrace())
end;
_ ->
- {error, nested_transaction}
+ erlang:error(nested_transaction)
end.
-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
undefined ->
q([<<"GET">>, Key]);
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec set(iodata(), iodata()) -> ok | redis_error() | queued.
undefined ->
q([<<"SMEMBERS">>, Set]);
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec sismember(iodata(), iodata()) -> boolean() | redis_error().
{error, _} = Err -> Err
end;
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error().
Err
end;
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error().
undefined ->
q([<<"HGET">>, Key, Field]);
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued.
{error, _} = Err -> Err
end;
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error().
{error, _} = Err -> Err
end;
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error().
undefined ->
q([<<"HKEYS">>, Key]);
_ ->
- {error, transaction_unsupported}
+ erlang:error(transaction_unsupported)
end.
%%%===================================================================
%%%===================================================================
%%% Internal functions
%%%===================================================================
+-spec connect(state()) -> {ok, pid()} | {error, any()}.
connect(#state{num = Num}) ->
Server = ejabberd_config:get_option(redis_server,
fun iolist_to_list/1,
{error, Reason}
end.
--spec call({atom(), atom()}, {q | qp, list()}, integer()) ->
- {error, disconnected | timeout | binary()} | {ok, iodata()}.
+-spec call({atom(), atom()}, {q, redis_command()}, integer()) ->
+ {ok, redis_reply()} | redis_error();
+ ({atom(), atom()}, {qp, redis_pipeline()}, integer()) ->
+ {ok, [redis_reply()]} | redis_error().
call({Conn, Parent}, {F, Cmd}, Retries) ->
+ ?DEBUG("redis query: ~p", [Cmd]),
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
{error, Reason} when is_atom(Reason) ->
try exit(whereis(Conn), kill) catch _:_ -> ok end,
Res
end.
+-spec get_worker() -> {atom(), atom()}.
get_worker() ->
Time = p1_time_compat:system_time(),
I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
{get_connection(I), get_proc(I)}.
+-spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
+ {ok, [redis_reply()]} | {error, binary()}.
get_result([{error, _} = Err|_]) ->
Err;
get_result([{ok, _} = OK]) ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued.
+-spec decode_pairs([binary()]) -> [{binary(), binary()}].
decode_pairs(Pairs) ->
decode_pairs(Pairs, []).
+-spec decode_pairs([binary()], [{binary(), binary()}]) -> [{binary(), binary()}].
decode_pairs([Field, Val|Pairs], Acc) ->
decode_pairs(Pairs, [{Field, Val}|Acc]);
decode_pairs([], Acc) ->
dec_bool(<<$1>>) -> true;
dec_bool(<<$0>>) -> false.
+-spec reply(T) -> {ok, T} | queued.
reply(Val) ->
case erlang:get(?TR_STACK) of
undefined -> {ok, Val};
_ -> queued
end.
+-spec iolist_to_list(iodata()) -> string().
iolist_to_list(IOList) ->
binary_to_list(iolist_to_binary(IOList)).
+-spec max_fsm_queue() -> pos_integer().
max_fsm_queue() ->
proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE).
Type
end.
+-spec flush_queue(p1_queue:queue()) -> p1_queue:queue().
flush_queue(Q) ->
CurrTime = p1_time_compat:monotonic_time(milli_seconds),
p1_queue:dropwhile(
true
end, Q).
+-spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue().
clean_queue(Q, CurrTime) ->
Q1 = p1_queue:dropwhile(
fun({_From, Time}) ->