%% API
-export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
%% Commands
--export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, scard/1]).
+-export([multi/1, get/1, set/2, del/1,
+ sadd/2, srem/2, smembers/1, sismember/2, scard/1,
+ hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
q(Command) ->
try eredis:q(?PROCNAME, Command)
- catch _:Reason -> {error, Reason}
+ catch _:{noproc, _} -> {error, disconnected};
+ _:{timeout, _} -> {error, timeout}
end.
qp(Pipeline) ->
try eredis:qp(?PROCNAME, Pipeline)
- catch _:Reason -> {error, Reason}
+ catch _:{noproc, _} -> {error, disconnected};
+ _:{timeout, _} -> {error, timeout}
end.
-spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
?MODULE ! disconnect
end.
+-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
get(Key) ->
case erlang:get(?TR_STACK) of
undefined ->
{error, _} = Err -> Err
end;
Stack ->
- erlang:put(?TR_STACK, [Cmd|Stack]),
- queued
+ tr_enq(Cmd, Stack)
end.
-spec del(list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+del([]) ->
+ reply(0);
del(Keys) ->
Cmd = [<<"DEL">>|Keys],
case erlang:get(?TR_STACK) of
{error, _} = Err -> Err
end;
Stack ->
- erlang:put(?TR_STACK, [Cmd|Stack]),
- queued
+ tr_enq(Cmd, Stack)
end.
-spec sadd(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+sadd(_Set, []) ->
+ reply(0);
sadd(Set, Members) ->
Cmd = [<<"SADD">>, Set|Members],
case erlang:get(?TR_STACK) of
{error, _} = Err -> Err
end;
Stack ->
- erlang:put(?TR_STACK, [Cmd|Stack]),
- queued
+ tr_enq(Cmd, Stack)
end.
-spec srem(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+srem(_Set, []) ->
+ reply(0);
srem(Set, Members) ->
Cmd = [<<"SREM">>, Set|Members],
case erlang:get(?TR_STACK) of
{error, _} = Err -> Err
end;
Stack ->
- erlang:put(?TR_STACK, [Cmd|Stack]),
- queued
+ tr_enq(Cmd, Stack)
end.
-spec smembers(iodata()) -> {ok, [binary()]} | redis_error().
{error, transaction_unsupported}
end.
+-spec sismember(iodata(), iodata()) -> boolean() | redis_error().
+sismember(Set, Member) ->
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q([<<"SISMEMBER">>, Set, Member]) of
+ {ok, Flag} -> {ok, dec_bool(Flag)};
+ {error, _} = Err -> Err
+ end;
+ _ ->
+ {error, transaction_unsupported}
+ end.
+
-spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error().
scard(Set) ->
case erlang:get(?TR_STACK) of
{error, transaction_unsupported}
end.
+-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error().
+hget(Key, Field) ->
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ q([<<"HGET">>, Key, Field]);
+ _ ->
+ {error, transaction_unsupported}
+ end.
+
+-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued.
+hset(Key, Field, Val) ->
+ Cmd = [<<"HSET">>, Key, Field, Val],
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q(Cmd) of
+ {ok, Flag} -> {ok, dec_bool(Flag)};
+ {error, _} = Err -> Err
+ end;
+ Stack ->
+ tr_enq(Cmd, Stack)
+ end.
+
+-spec hdel(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+hdel(_Key, []) ->
+ reply(0);
+hdel(Key, Fields) ->
+ Cmd = [<<"HDEL">>, Key|Fields],
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q(Cmd) of
+ {ok, N} -> {ok, binary_to_integer(N)};
+ {error, _} = Err -> Err
+ end;
+ Stack ->
+ tr_enq(Cmd, Stack)
+ end.
+
+-spec hgetall(iodata()) -> {ok, [{binary(), binary()}]} | redis_error().
+hgetall(Key) ->
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q([<<"HGETALL">>, Key]) of
+ {ok, Pairs} -> {ok, decode_pairs(Pairs)};
+ {error, _} = Err -> Err
+ end;
+ _ ->
+ {error, transaction_unsupported}
+ end.
+
+-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error().
+hlen(Key) ->
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q([<<"HLEN">>, Key]) of
+ {ok, N} -> {ok, binary_to_integer(N)};
+ {error, _} = Err -> Err
+ end;
+ _ ->
+ {error, transaction_unsupported}
+ end.
+
+-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error().
+hkeys(Key) ->
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ q([<<"HKEYS">>, Key]);
+ _ ->
+ {error, transaction_unsupported}
+ end.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
get_result([_|T]) ->
get_result(T).
+-spec tr_enq([iodata()], list()) -> queued.
+tr_enq(Cmd, Stack) ->
+ erlang:put(?TR_STACK, [Cmd|Stack]),
+ queued.
+
+decode_pairs(Pairs) ->
+ decode_pairs(Pairs, []).
+
+decode_pairs([Field, Val|Pairs], Acc) ->
+ decode_pairs(Pairs, [{Field, Val}|Acc]);
+decode_pairs([], Acc) ->
+ lists:reverse(Acc).
+
+dec_bool(<<$1>>) -> true;
+dec_bool(<<$0>>) -> false.
+
+reply(Val) ->
+ case erlang:get(?TR_STACK) of
+ undefined -> {ok, Val};
+ _ -> queued
+ end.
+
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_db) ->
DomKey = domain_key(Domain),
PidKey = term_to_binary(Pid),
T = term_to_binary({ServerHost, LocalHint}),
- case ejabberd_redis:qp([["HSET", DomKey, PidKey, T],
- ["SADD", ?ROUTES_KEY, Domain]]) of
- [{ok, _}, {ok, _}] ->
+ case ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:hset(DomKey, PidKey, T),
+ ejabberd_redis:sadd(?ROUTES_KEY, [Domain])
+ end) of
+ {ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to register route in redis: ~p", [Err]),
DomKey = domain_key(Domain),
PidKey = term_to_binary(Pid),
try
- {ok, _} = ejabberd_redis:q(["HDEL", DomKey, PidKey]),
- {ok, Num} = ejabberd_redis:q(["HLEN", DomKey]),
- case binary_to_integer(Num) of
- 0 ->
- {ok, _} = ejabberd_redis:q(["SREM", ?ROUTES_KEY, Domain]),
- ok;
- _ ->
+ {ok, Num} = ejabberd_redis:hdel(DomKey, [PidKey]),
+ if Num > 0 ->
+ {ok, Len} = ejabberd_redis:hlen(DomKey),
+ if Len == 0 ->
+ {ok, _} = ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:del([DomKey]),
+ ejabberd_redis:srem(?ROUTES_KEY, [Domain])
+ end),
+ ok;
+ true ->
+ ok
+ end;
+ true ->
ok
end
catch _:{badmatch, Err} ->
find_routes(Domain) ->
DomKey = domain_key(Domain),
- case ejabberd_redis:q(["HGETALL", DomKey]) of
+ case ejabberd_redis:hgetall(DomKey) of
{ok, Vals} ->
decode_routes(Domain, Vals);
Err ->
host_of_route(Domain) ->
DomKey = domain_key(Domain),
- case ejabberd_redis:q(["HGETALL", DomKey]) of
- {ok, [_, Data|_]} ->
+ case ejabberd_redis:hgetall(DomKey) of
+ {ok, [{_Pid, Data}|_]} ->
{ServerHost, _} = binary_to_term(Data),
{ok, ServerHost};
{ok, []} ->
end.
is_my_route(Domain) ->
- case ejabberd_redis:q(["SISMEMBER", ?ROUTES_KEY, Domain]) of
- {ok, <<"1">>} -> true;
- {ok, _} -> false;
+ case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of
+ {ok, Bool} ->
+ Bool;
Err ->
?ERROR_MSG("failed to check route in redis: ~p", [Err]),
false
{ok, Domain} == host_of_route(Domain).
get_all_routes() ->
- case ejabberd_redis:q(["SMEMBERS", ?ROUTES_KEY]) of
+ case ejabberd_redis:smembers(?ROUTES_KEY) of
{ok, Routes} ->
Routes;
Err ->
end.
find_routes() ->
- lists:flatmap(
- fun(Domain) ->
- DomKey = domain_key(Domain),
- case ejabberd_redis:q(["HGETALL", DomKey]) of
- {ok, Vals} ->
- decode_routes(Domain, Vals);
- Err ->
- ?ERROR_MSG("failed to fetch routes from redis: ~p",
- [Err]),
- []
- end
- end, get_all_routes()).
+ lists:flatmap(fun find_routes/1, get_all_routes()).
%%%===================================================================
%%% Internal functions
domain_key(Domain) ->
<<"ejabberd:route:", Domain/binary>>.
-decode_routes(Domain, [Pid, Data|Vals]) ->
- {ServerHost, LocalHint} = binary_to_term(Data),
- [#route{domain = Domain,
- pid = binary_to_term(Pid),
- server_host = ServerHost,
- local_hint = LocalHint}|
- decode_routes(Domain, Vals)];
-decode_routes(_, []) ->
- [].
+decode_routes(Domain, Vals) ->
+ lists:map(
+ fun({Pid, Data}) ->
+ {ServerHost, LocalHint} = binary_to_term(Data),
+ #route{domain = Domain,
+ pid = binary_to_term(Pid),
+ server_host = ServerHost,
+ local_hint = LocalHint}
+ end, Vals).
SIDKey = sid_to_key(Session#session.sid),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
- case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
- ["HSET", ServKey, USSIDKey, T]]) of
- [{ok, _}, {ok, _}] ->
+ case ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:hset(USKey, SIDKey, T),
+ ejabberd_redis:hset(ServKey, USSIDKey, T)
+ end) of
+ {ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to set session for redis: ~p", [Err])
{ok, #session{}} | {error, notfound}.
delete_session(LUser, LServer, _LResource, SID) ->
USKey = us_to_key({LUser, LServer}),
- case ejabberd_redis:q(["HGETALL", USKey]) of
+ case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
Ss = decode_session_list(Vals),
case lists:keyfind(SID, #session.sid, Ss) of
SIDKey = sid_to_key(SID),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, SID),
- ejabberd_redis:qp([["HDEL", USKey, SIDKey],
- ["HDEL", ServKey, USSIDKey]]),
+ case ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:hdel(USKey, [SIDKey]),
+ ejabberd_redis:hdel(ServKey, [USSIDKey])
+ end) of
+ {ok, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to delete session from redis: ~p", [Err])
+ end,
{ok, Session}
end;
Err ->
-spec get_sessions(binary()) -> [#session{}].
get_sessions(LServer) ->
ServKey = server_to_key(LServer),
- case ejabberd_redis:q(["HGETALL", ServKey]) of
+ case ejabberd_redis:hgetall(ServKey) of
{ok, Vals} ->
decode_session_list(Vals);
Err ->
-spec get_sessions(binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer) ->
USKey = us_to_key({LUser, LServer}),
- case ejabberd_redis:q(["HGETALL", USKey]) of
- {ok, Vals} when is_list(Vals) ->
+ case ejabberd_redis:hgetall(USKey) of
+ {ok, Vals} ->
decode_session_list(Vals);
Err ->
?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
[#session{}].
get_sessions(LUser, LServer, LResource) ->
USKey = us_to_key({LUser, LServer}),
- case ejabberd_redis:q(["HGETALL", USKey]) of
- {ok, Vals} when is_list(Vals) ->
+ case ejabberd_redis:hgetall(USKey) of
+ {ok, Vals} ->
[S || S <- decode_session_list(Vals),
element(3, S#session.usr) == LResource];
Err ->
sid_to_key(SID) ->
term_to_binary(SID).
-decode_session_list([_, Val|T]) ->
- [binary_to_term(Val)|decode_session_list(T)];
-decode_session_list([]) ->
- [].
+decode_session_list(Vals) ->
+ [binary_to_term(Val) || {_, Val} <- Vals].
clean_table() ->
?INFO_MSG("Cleaning Redis SM table...", []),
- lists:foreach(
- fun(LServer) ->
- ServKey = server_to_key(LServer),
- case ejabberd_redis:q(["HKEYS", ServKey]) of
- {ok, []} ->
- ok;
- {ok, Vals} ->
- Vals1 = lists:filter(
- fun(USSIDKey) ->
- {_, SID} = binary_to_term(USSIDKey),
- node(element(2, SID)) == node()
- end, Vals),
- Q1 = case Vals1 of
- [] -> [];
- _ -> ["HDEL", ServKey | Vals1]
- end,
- Q2 = lists:map(
- fun(USSIDKey) ->
- {US, SID} = binary_to_term(USSIDKey),
- USKey = us_to_key(US),
- SIDKey = sid_to_key(SID),
- ["HDEL", USKey, SIDKey]
- end, Vals1),
- Res = ejabberd_redis:qp(lists:delete([], [Q1|Q2])),
- case lists:filter(
- fun({ok, _}) -> false;
- (_) -> true
- end, Res) of
- [] ->
- ok;
- Errs ->
- ?ERROR_MSG("failed to clean redis table for "
- "server ~s: ~p", [LServer, Errs])
- end;
- Err ->
- ?ERROR_MSG("failed to clean redis table for "
- "server ~s: ~p", [LServer, Err])
- end
- end, ejabberd_sm:get_vh_by_backend(?MODULE)).
+ try
+ lists:foreach(
+ fun(LServer) ->
+ ServKey = server_to_key(LServer),
+ {ok, Vals} = ejabberd_redis:hkeys(ServKey),
+ {ok, _} =
+ ejabberd_redis:multi(
+ fun() ->
+ lists:foreach(
+ fun(USSIDKey) ->
+ {US, SID} = binary_to_term(USSIDKey),
+ if node(element(2, SID)) == node() ->
+ USKey = us_to_key(US),
+ SIDKey = sid_to_key(SID),
+ ejabberd_redis:hdel(ServKey, [USSIDKey]),
+ ejabberd_redis:hdel(USKey, [SIDKey]);
+ true ->
+ ok
+ end
+ end, Vals)
+ end)
+ end, ejabberd_sm:get_vh_by_backend(?MODULE))
+ catch _:{badmatch, {error, _} = Err} ->
+ ?ERROR_MSG("failed to clean redis c2s sessions: ~p", [Err])
+ end.
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
open_session(SID, Pid) ->
PidBin = term_to_binary(Pid),
- case ejabberd_redis:q(["HSET", ?BOSH_KEY, SID, PidBin]) of
+ case ejabberd_redis:hset(?BOSH_KEY, SID, PidBin) of
{ok, _} ->
ok;
Err ->
end.
close_session(SID) ->
- case ejabberd_redis:q(["HDEL", ?BOSH_KEY, SID]) of
+ case ejabberd_redis:hdel(?BOSH_KEY, [SID]) of
{ok, _} ->
ok;
Err ->
end.
find_session(SID) ->
- case ejabberd_redis:q(["HGET", ?BOSH_KEY, SID]) of
+ case ejabberd_redis:hget(?BOSH_KEY, SID) of
{ok, Pid} when is_binary(Pid) ->
- {ok, binary_to_term(Pid)};
+ try
+ {ok, binary_to_term(Pid)}
+ catch _:badarg ->
+ ?ERROR_MSG("malformed data in redis (key = '~s'): ~p",
+ [SID, Pid]),
+ error
+ end;
{ok, _} ->
error;
Err ->
%%% Internal functions
%%%===================================================================
clean_table() ->
- ?INFO_MSG("Cleaning Redis BOSH table...", []),
- case ejabberd_redis:q(["HGETALL", ?BOSH_KEY]) of
+ ?INFO_MSG("Cleaning Redis BOSH sessions...", []),
+ case ejabberd_redis:hgetall(?BOSH_KEY) of
{ok, Vals} ->
- clean_table(Vals);
+ case ejabberd_redis:multi(
+ fun() ->
+ lists:foreach(
+ fun({SID, Pid}) when node(Pid) == node() ->
+ ejabberd_redis:hdel(?BOSH_KEY, [SID]);
+ (_) ->
+ ok
+ end, Vals)
+ end) of
+ {ok, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
+ end;
Err ->
- ?ERROR_MSG("failed to clean bosh table in redis: ~p", [Err])
+ ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
end.
-
-clean_table([SID, PidBin|Vals]) ->
- case binary_to_term(PidBin) of
- Pid when node(Pid) == node() ->
- close_session(SID);
- _ ->
- ok
- end,
- clean_table(Vals);
-clean_table([]) ->
- ok.
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
- case ejabberd_redis:qp([["HSET", USKey, LResource, NS],
- ["SADD", NodeKey, JID]]) of
- [{ok, _}, {ok, _}] ->
+ case ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:hset(USKey, LResource, NS),
+ ejabberd_redis:sadd(NodeKey, [JID])
+ end) of
+ {ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to write in redis: ~p", [Err]),
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
- case ejabberd_redis:qp([["HDEL", USKey, LResource],
- ["SREM", NodeKey, JID]]) of
- [{ok, _}, {ok, _}] ->
+ case ejabberd_redis:multi(
+ fun() ->
+ ejabberd_redis:hdel(USKey, [LResource]),
+ ejabberd_redis:srem(NodeKey, [JID])
+ end) of
+ {ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to delete from redis: ~p", [Err]),
list(LUser, LServer) ->
USKey = us_key(LUser, LServer),
- case ejabberd_redis:q(["HGETALL", USKey]) of
+ case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
- decode_vals(Vals);
+ Vals;
Err ->
?ERROR_MSG("failed to read from redis: ~p", [Err]),
[]
clean_table() ->
?INFO_MSG("Cleaning Redis 'carboncopy' table...", []),
NodeKey = node_key(),
- case ejabberd_redis:q(["SMEMBERS", NodeKey]) of
+ case ejabberd_redis:smembers(NodeKey) of
{ok, JIDs} ->
- lists:foreach(
- fun(JID) ->
- {U, S, R} = jid:split(jid:decode(JID)),
- USKey = us_key(U, S),
- case ejabberd_redis:q(["HDEL", USKey, R]) of
- {ok, _} ->
- ok;
- Err ->
- ?ERROR_MSG("failed to delete from redis: ~p",
- [Err])
- end
- end, JIDs);
+ case ejabberd_redis:multi(
+ fun() ->
+ lists:foreach(
+ fun(JID) ->
+ {U, S, R} = jid:split(jid:decode(JID)),
+ USKey = us_key(U, S),
+ ejabberd_redis:hdel(USKey, [R])
+ end, JIDs)
+ end) of
+ {ok, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to delete from redis: ~p", [Err])
+ end;
Err ->
?ERROR_MSG("failed to read from redis: ~p", [Err])
end,
- case ejabberd_redis:q(["DEL", NodeKey]) of
+ case ejabberd_redis:del([NodeKey]) of
{ok, _} -> ok;
Error -> ?ERROR_MSG("failed to delete from redis: ~p", [Error])
end.
node_key() ->
Node = erlang:atom_to_binary(node(), latin1),
<<"ejabberd:carboncopy:nodes:", Node/binary>>.
-
-decode_vals([Resource, NS|Vals]) ->
- [{Resource, NS}|decode_vals(Vals)];
-decode_vals([]) ->
- [].