]> granicus.if.org Git - ejabberd/commitdiff
Improve redis related code
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Sun, 2 Apr 2017 08:56:09 +0000 (11:56 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Sun, 2 Apr 2017 08:56:09 +0000 (11:56 +0300)
src/ejabberd_redis.erl
src/ejabberd_router_redis.erl
src/ejabberd_sm_redis.erl
src/mod_bosh_redis.erl
src/mod_carboncopy_redis.erl

index dbd55e914cbe3891d618fc6445e3228d1f3fd796..ec5d73596ecd083f9a4031c53787ebfe39e6fc60 100644 (file)
@@ -32,7 +32,9 @@
 %% API
 -export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
 %% Commands
--export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, scard/1]).
+-export([multi/1, get/1, set/2, del/1,
+        sadd/2, srem/2, smembers/1, sismember/2, scard/1,
+        hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]).
 
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -57,12 +59,14 @@ start_link() ->
 
 q(Command) ->
     try eredis:q(?PROCNAME, Command)
-    catch _:Reason -> {error, Reason}
+    catch _:{noproc, _} -> {error, disconnected};
+         _:{timeout, _} -> {error, timeout}
     end.
 
 qp(Pipeline) ->
     try eredis:qp(?PROCNAME, Pipeline)
-    catch _:Reason -> {error, Reason}
+    catch _:{noproc, _} -> {error, disconnected};
+         _:{timeout, _} -> {error, timeout}
     end.
 
 -spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
@@ -95,6 +99,7 @@ config_reloaded() ->
            ?MODULE ! disconnect
     end.
 
+-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
 get(Key) ->
     case erlang:get(?TR_STACK) of
        undefined ->
@@ -113,11 +118,12 @@ set(Key, Val) ->
                {error, _} = Err -> Err
            end;
        Stack ->
-           erlang:put(?TR_STACK, [Cmd|Stack]),
-           queued
+           tr_enq(Cmd, Stack)
     end.
 
 -spec del(list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+del([]) ->
+    reply(0);
 del(Keys) ->
     Cmd = [<<"DEL">>|Keys],
     case erlang:get(?TR_STACK) of
@@ -127,11 +133,12 @@ del(Keys) ->
                {error, _} = Err -> Err
            end;
        Stack ->
-           erlang:put(?TR_STACK, [Cmd|Stack]),
-           queued
+           tr_enq(Cmd, Stack)
     end.
 
 -spec sadd(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+sadd(_Set, []) ->
+    reply(0);
 sadd(Set, Members) ->
     Cmd = [<<"SADD">>, Set|Members],
     case erlang:get(?TR_STACK) of
@@ -141,11 +148,12 @@ sadd(Set, Members) ->
                {error, _} = Err -> Err
            end;
        Stack ->
-           erlang:put(?TR_STACK, [Cmd|Stack]),
-           queued
+           tr_enq(Cmd, Stack)
     end.
 
 -spec srem(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+srem(_Set, []) ->
+    reply(0);
 srem(Set, Members) ->
     Cmd = [<<"SREM">>, Set|Members],
     case erlang:get(?TR_STACK) of
@@ -155,8 +163,7 @@ srem(Set, Members) ->
                {error, _} = Err -> Err
            end;
        Stack ->
-           erlang:put(?TR_STACK, [Cmd|Stack]),
-           queued
+           tr_enq(Cmd, Stack)
     end.
 
 -spec smembers(iodata()) -> {ok, [binary()]} | redis_error().
@@ -168,6 +175,18 @@ smembers(Set) ->
            {error, transaction_unsupported}
     end.
 
+-spec sismember(iodata(), iodata()) -> boolean() | redis_error().
+sismember(Set, Member) ->
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           case q([<<"SISMEMBER">>, Set, Member]) of
+               {ok, Flag} -> {ok, dec_bool(Flag)};
+               {error, _} = Err -> Err
+           end;
+       _ ->
+           {error, transaction_unsupported}
+    end.
+
 -spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error().
 scard(Set) ->
     case erlang:get(?TR_STACK) of
@@ -182,6 +201,76 @@ scard(Set) ->
            {error, transaction_unsupported}
     end.
 
+-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error().
+hget(Key, Field) ->
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           q([<<"HGET">>, Key, Field]);
+       _ ->
+           {error, transaction_unsupported}
+    end.
+
+-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued.
+hset(Key, Field, Val) ->
+    Cmd = [<<"HSET">>, Key, Field, Val],
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           case q(Cmd) of
+               {ok, Flag} -> {ok, dec_bool(Flag)};
+               {error, _} = Err -> Err
+           end;
+       Stack ->
+           tr_enq(Cmd, Stack)
+    end.
+
+-spec hdel(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
+hdel(_Key, []) ->
+    reply(0);
+hdel(Key, Fields) ->
+    Cmd = [<<"HDEL">>, Key|Fields],
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           case q(Cmd) of
+               {ok, N} -> {ok, binary_to_integer(N)};
+               {error, _} = Err -> Err
+           end;
+       Stack ->
+           tr_enq(Cmd, Stack)
+    end.
+
+-spec hgetall(iodata()) -> {ok, [{binary(), binary()}]} | redis_error().
+hgetall(Key) ->
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           case q([<<"HGETALL">>, Key]) of
+               {ok, Pairs} -> {ok, decode_pairs(Pairs)};
+               {error, _} = Err -> Err
+           end;
+       _ ->
+           {error, transaction_unsupported}
+    end.
+
+-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error().
+hlen(Key) ->
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           case q([<<"HLEN">>, Key]) of
+               {ok, N} -> {ok, binary_to_integer(N)};
+               {error, _} = Err -> Err
+           end;
+       _ ->
+           {error, transaction_unsupported}
+    end.
+
+-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error().
+hkeys(Key) ->
+    case erlang:get(?TR_STACK) of
+       undefined ->
+           q([<<"HKEYS">>, Key]);
+       _ ->
+           {error, transaction_unsupported}
+    end.
+
 %%%===================================================================
 %%% gen_server callbacks
 %%%===================================================================
@@ -325,6 +414,28 @@ get_result([{ok, _} = OK]) ->
 get_result([_|T]) ->
     get_result(T).
 
+-spec tr_enq([iodata()], list()) -> queued.
+tr_enq(Cmd, Stack) ->
+    erlang:put(?TR_STACK, [Cmd|Stack]),
+    queued.
+
+decode_pairs(Pairs) ->
+    decode_pairs(Pairs, []).
+
+decode_pairs([Field, Val|Pairs], Acc) ->
+    decode_pairs(Pairs, [{Field, Val}|Acc]);
+decode_pairs([], Acc) ->
+    lists:reverse(Acc).
+
+dec_bool(<<$1>>) -> true;
+dec_bool(<<$0>>) -> false.
+
+reply(Val) ->
+    case erlang:get(?TR_STACK) of
+       undefined -> {ok, Val};
+       _ -> queued
+    end.
+
 opt_type(redis_connect_timeout) ->
     fun (I) when is_integer(I), I > 0 -> I end;
 opt_type(redis_db) ->
index be8d166b959896d287cee7e26c5522cf98a68e48..cf240a2a086de6738006ab073fa7a348c4d818db 100644 (file)
@@ -44,9 +44,12 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) ->
     DomKey = domain_key(Domain),
     PidKey = term_to_binary(Pid),
     T = term_to_binary({ServerHost, LocalHint}),
-    case ejabberd_redis:qp([["HSET", DomKey, PidKey, T],
-                           ["SADD", ?ROUTES_KEY, Domain]]) of
-       [{ok, _}, {ok, _}] ->
+    case ejabberd_redis:multi(
+          fun() ->
+                  ejabberd_redis:hset(DomKey, PidKey, T),
+                  ejabberd_redis:sadd(?ROUTES_KEY, [Domain])
+          end) of
+       {ok, _} ->
            ok;
        Err ->
            ?ERROR_MSG("failed to register route in redis: ~p", [Err]),
@@ -57,13 +60,20 @@ unregister_route(Domain, _, Pid) ->
     DomKey = domain_key(Domain),
     PidKey = term_to_binary(Pid),
     try
-       {ok, _} = ejabberd_redis:q(["HDEL", DomKey, PidKey]),
-       {ok, Num} = ejabberd_redis:q(["HLEN", DomKey]),
-       case binary_to_integer(Num) of
-           0 ->
-               {ok, _} = ejabberd_redis:q(["SREM", ?ROUTES_KEY, Domain]),
-               ok;
-           _ ->
+       {ok, Num} = ejabberd_redis:hdel(DomKey, [PidKey]),
+       if Num > 0 ->
+               {ok, Len} = ejabberd_redis:hlen(DomKey),
+               if Len == 0 ->
+                       {ok, _} = ejabberd_redis:multi(
+                                   fun() ->
+                                           ejabberd_redis:del([DomKey]),
+                                           ejabberd_redis:srem(?ROUTES_KEY, [Domain])
+                                   end),
+                       ok;
+                  true ->
+                       ok
+               end;
+          true ->
                ok
        end
     catch _:{badmatch, Err} ->
@@ -73,7 +83,7 @@ unregister_route(Domain, _, Pid) ->
 
 find_routes(Domain) ->
     DomKey = domain_key(Domain),
-    case ejabberd_redis:q(["HGETALL", DomKey]) of
+    case ejabberd_redis:hgetall(DomKey) of
        {ok, Vals} ->
            decode_routes(Domain, Vals);
        Err ->
@@ -83,8 +93,8 @@ find_routes(Domain) ->
 
 host_of_route(Domain) ->
     DomKey = domain_key(Domain),
-    case ejabberd_redis:q(["HGETALL", DomKey]) of
-       {ok, [_, Data|_]} ->
+    case ejabberd_redis:hgetall(DomKey) of
+       {ok, [{_Pid, Data}|_]} ->
            {ServerHost, _} = binary_to_term(Data),
            {ok, ServerHost};
        {ok, []} ->
@@ -95,9 +105,9 @@ host_of_route(Domain) ->
     end.
 
 is_my_route(Domain) ->
-    case ejabberd_redis:q(["SISMEMBER", ?ROUTES_KEY, Domain]) of
-       {ok, <<"1">>} -> true;
-       {ok, _} -> false;
+    case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of
+       {ok, Bool} ->
+           Bool;
        Err ->
            ?ERROR_MSG("failed to check route in redis: ~p", [Err]),
            false
@@ -107,7 +117,7 @@ is_my_host(Domain) ->
     {ok, Domain} == host_of_route(Domain).
 
 get_all_routes() ->
-    case ejabberd_redis:q(["SMEMBERS", ?ROUTES_KEY]) of
+    case ejabberd_redis:smembers(?ROUTES_KEY) of
        {ok, Routes} ->
            Routes;
        Err ->
@@ -116,18 +126,7 @@ get_all_routes() ->
     end.
 
 find_routes() ->
-    lists:flatmap(
-      fun(Domain) ->
-             DomKey = domain_key(Domain),
-             case ejabberd_redis:q(["HGETALL", DomKey]) of
-                 {ok, Vals} ->
-                     decode_routes(Domain, Vals);
-                 Err ->
-                     ?ERROR_MSG("failed to fetch routes from redis: ~p",
-                                [Err]),
-                     []
-             end
-      end, get_all_routes()).
+    lists:flatmap(fun find_routes/1, get_all_routes()).
 
 %%%===================================================================
 %%% Internal functions
@@ -143,12 +142,12 @@ clean_table() ->
 domain_key(Domain) ->
     <<"ejabberd:route:", Domain/binary>>.
 
-decode_routes(Domain, [Pid, Data|Vals]) ->
-    {ServerHost, LocalHint} = binary_to_term(Data),
-    [#route{domain = Domain,
-           pid = binary_to_term(Pid),
-           server_host = ServerHost,
-           local_hint = LocalHint}|
-     decode_routes(Domain, Vals)];
-decode_routes(_, []) ->
-    [].
+decode_routes(Domain, Vals) ->
+    lists:map(
+      fun({Pid, Data}) ->
+             {ServerHost, LocalHint} = binary_to_term(Data),
+             #route{domain = Domain,
+                    pid = binary_to_term(Pid),
+                    server_host = ServerHost,
+                    local_hint = LocalHint}
+      end, Vals).
index 689e0ccdbb3f41a8639f22fc88832b3f22fe7b14..8c9dc56d3924c51fda83e9ef846770d22462525e 100644 (file)
@@ -50,9 +50,12 @@ set_session(Session) ->
     SIDKey = sid_to_key(Session#session.sid),
     ServKey = server_to_key(element(2, Session#session.us)),
     USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
-    case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
-                           ["HSET", ServKey, USSIDKey, T]]) of
-       [{ok, _}, {ok, _}] ->
+    case ejabberd_redis:multi(
+          fun() ->
+                  ejabberd_redis:hset(USKey, SIDKey, T),
+                  ejabberd_redis:hset(ServKey, USSIDKey, T)
+          end) of
+       {ok, _} ->
            ok;
        Err ->
            ?ERROR_MSG("failed to set session for redis: ~p", [Err])
@@ -62,7 +65,7 @@ set_session(Session) ->
                            {ok, #session{}} | {error, notfound}.
 delete_session(LUser, LServer, _LResource, SID) ->
     USKey = us_to_key({LUser, LServer}),
-    case ejabberd_redis:q(["HGETALL", USKey]) of
+    case ejabberd_redis:hgetall(USKey) of
        {ok, Vals} ->
            Ss = decode_session_list(Vals),
            case lists:keyfind(SID, #session.sid, Ss) of
@@ -72,8 +75,16 @@ delete_session(LUser, LServer, _LResource, SID) ->
                    SIDKey = sid_to_key(SID),
                    ServKey = server_to_key(element(2, Session#session.us)),
                    USSIDKey = us_sid_to_key(Session#session.us, SID),
-                   ejabberd_redis:qp([["HDEL", USKey, SIDKey],
-                                      ["HDEL", ServKey, USSIDKey]]),
+                   case ejabberd_redis:multi(
+                          fun() ->
+                                  ejabberd_redis:hdel(USKey, [SIDKey]),
+                                  ejabberd_redis:hdel(ServKey, [USSIDKey])
+                          end) of
+                       {ok, _} ->
+                           ok;
+                       Err ->
+                           ?ERROR_MSG("failed to delete session from redis: ~p", [Err])
+                   end,
                    {ok, Session}
            end;
        Err ->
@@ -91,7 +102,7 @@ get_sessions() ->
 -spec get_sessions(binary()) -> [#session{}].
 get_sessions(LServer) ->
     ServKey = server_to_key(LServer),
-    case ejabberd_redis:q(["HGETALL", ServKey]) of
+    case ejabberd_redis:hgetall(ServKey) of
        {ok, Vals} ->
            decode_session_list(Vals);
        Err ->
@@ -102,8 +113,8 @@ get_sessions(LServer) ->
 -spec get_sessions(binary(), binary()) -> [#session{}].
 get_sessions(LUser, LServer) ->
     USKey = us_to_key({LUser, LServer}),
-    case ejabberd_redis:q(["HGETALL", USKey]) of
-       {ok, Vals} when is_list(Vals) ->
+    case ejabberd_redis:hgetall(USKey) of
+       {ok, Vals} ->
            decode_session_list(Vals);
        Err ->
            ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
@@ -114,8 +125,8 @@ get_sessions(LUser, LServer) ->
     [#session{}].
 get_sessions(LUser, LServer, LResource) ->
     USKey = us_to_key({LUser, LServer}),
-    case ejabberd_redis:q(["HGETALL", USKey]) of
-       {ok, Vals} when is_list(Vals) ->
+    case ejabberd_redis:hgetall(USKey) of
+       {ok, Vals} ->
            [S || S <- decode_session_list(Vals),
                  element(3, S#session.usr) == LResource];
        Err ->
@@ -141,52 +152,36 @@ us_sid_to_key(US, SID) ->
 sid_to_key(SID) ->
     term_to_binary(SID).
 
-decode_session_list([_, Val|T]) ->
-    [binary_to_term(Val)|decode_session_list(T)];
-decode_session_list([]) ->
-    [].
+decode_session_list(Vals) ->
+  [binary_to_term(Val) || {_, Val} <- Vals].
 
 clean_table() ->
     ?INFO_MSG("Cleaning Redis SM table...", []),
-    lists:foreach(
-      fun(LServer) ->
-             ServKey = server_to_key(LServer),
-             case ejabberd_redis:q(["HKEYS", ServKey]) of
-                 {ok, []} ->
-                     ok;
-                 {ok, Vals} ->
-                     Vals1 = lists:filter(
-                               fun(USSIDKey) ->
-                                       {_, SID} = binary_to_term(USSIDKey),
-                                       node(element(2, SID)) == node()
-                               end, Vals),
-                               Q1 = case Vals1 of
-                                       [] -> [];
-                                       _ -> ["HDEL", ServKey | Vals1]
-                               end,
-                     Q2 = lists:map(
-                            fun(USSIDKey) ->
-                                    {US, SID} = binary_to_term(USSIDKey),
-                                    USKey = us_to_key(US),
-                                    SIDKey = sid_to_key(SID),
-                                    ["HDEL", USKey, SIDKey]
-                            end, Vals1),
-                     Res = ejabberd_redis:qp(lists:delete([], [Q1|Q2])),
-                     case lists:filter(
-                            fun({ok, _}) -> false;
-                               (_) -> true
-                            end, Res) of
-                         [] ->
-                             ok;
-                         Errs ->
-                             ?ERROR_MSG("failed to clean redis table for "
-                                        "server ~s: ~p", [LServer, Errs])
-                     end;
-                 Err ->
-                     ?ERROR_MSG("failed to clean redis table for "
-                                "server ~s: ~p", [LServer, Err])
-             end
-      end, ejabberd_sm:get_vh_by_backend(?MODULE)).
+    try
+       lists:foreach(
+         fun(LServer) ->
+                 ServKey = server_to_key(LServer),
+                 {ok, Vals} = ejabberd_redis:hkeys(ServKey),
+                 {ok, _} =
+                     ejabberd_redis:multi(
+                       fun() ->
+                               lists:foreach(
+                                 fun(USSIDKey) ->
+                                         {US, SID} = binary_to_term(USSIDKey),
+                                         if node(element(2, SID)) == node() ->
+                                                 USKey = us_to_key(US),
+                                                 SIDKey = sid_to_key(SID),
+                                                 ejabberd_redis:hdel(ServKey, [USSIDKey]),
+                                                 ejabberd_redis:hdel(USKey, [SIDKey]);
+                                            true ->
+                                                 ok
+                                         end
+                                 end, Vals)
+                       end)
+         end, ejabberd_sm:get_vh_by_backend(?MODULE))
+    catch _:{badmatch, {error, _} = Err} ->
+           ?ERROR_MSG("failed to clean redis c2s sessions: ~p", [Err])
+    end.
 
 opt_type(redis_connect_timeout) ->
     fun (I) when is_integer(I), I > 0 -> I end;
index 0b61223bfbff4e835c0891d9d22e794ca253f9b7..ca23f72bb3baa545bedf9d2e88119a13575958f1 100644 (file)
@@ -25,7 +25,7 @@ init() ->
 
 open_session(SID, Pid) ->
     PidBin = term_to_binary(Pid),
-    case ejabberd_redis:q(["HSET", ?BOSH_KEY, SID, PidBin]) of
+    case ejabberd_redis:hset(?BOSH_KEY, SID, PidBin) of
        {ok, _} ->
            ok;
        Err ->
@@ -34,7 +34,7 @@ open_session(SID, Pid) ->
     end.
 
 close_session(SID) ->
-    case ejabberd_redis:q(["HDEL", ?BOSH_KEY, SID]) of
+    case ejabberd_redis:hdel(?BOSH_KEY, [SID]) of
        {ok, _} ->
            ok;
        Err ->
@@ -42,9 +42,15 @@ close_session(SID) ->
     end.
 
 find_session(SID) ->
-    case ejabberd_redis:q(["HGET", ?BOSH_KEY, SID]) of
+    case ejabberd_redis:hget(?BOSH_KEY, SID) of
        {ok, Pid} when is_binary(Pid) ->
-           {ok, binary_to_term(Pid)};
+           try
+               {ok, binary_to_term(Pid)}
+           catch _:badarg ->
+                   ?ERROR_MSG("malformed data in redis (key = '~s'): ~p",
+                              [SID, Pid]),
+                   error
+           end;
        {ok, _} ->
            error;
        Err ->
@@ -56,21 +62,23 @@ find_session(SID) ->
 %%% Internal functions
 %%%===================================================================
 clean_table() ->
-    ?INFO_MSG("Cleaning Redis BOSH table...", []),
-    case ejabberd_redis:q(["HGETALL", ?BOSH_KEY]) of
+    ?INFO_MSG("Cleaning Redis BOSH sessions...", []),
+    case ejabberd_redis:hgetall(?BOSH_KEY) of
        {ok, Vals} ->
-           clean_table(Vals);
+           case ejabberd_redis:multi(
+                  fun() ->
+                          lists:foreach(
+                            fun({SID, Pid}) when node(Pid) == node() ->
+                                    ejabberd_redis:hdel(?BOSH_KEY, [SID]);
+                               (_) ->
+                                    ok
+                            end, Vals)
+                  end) of
+               {ok, _} ->
+                   ok;
+               Err ->
+                   ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
+           end;
        Err ->
-           ?ERROR_MSG("failed to clean bosh table in redis: ~p", [Err])
+           ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
     end.
-
-clean_table([SID, PidBin|Vals]) ->
-    case binary_to_term(PidBin) of
-       Pid when node(Pid) == node() ->
-           close_session(SID);
-       _ ->
-           ok
-    end,
-    clean_table(Vals);
-clean_table([]) ->
-    ok.
index 4485e389633cba2925ecff4c7c3e8af5ac11ea27..4562e68eb3c788c9f9c5ad197246cec3fe453977 100644 (file)
@@ -39,9 +39,12 @@ enable(LUser, LServer, LResource, NS) ->
     USKey = us_key(LUser, LServer),
     NodeKey = node_key(),
     JID = jid:encode({LUser, LServer, LResource}),
-    case ejabberd_redis:qp([["HSET", USKey, LResource, NS],
-                           ["SADD", NodeKey, JID]]) of
-       [{ok, _}, {ok, _}] ->
+    case ejabberd_redis:multi(
+          fun() ->
+                  ejabberd_redis:hset(USKey, LResource, NS),
+                  ejabberd_redis:sadd(NodeKey, [JID])
+          end) of
+       {ok, _} ->
            ok;
        Err ->
            ?ERROR_MSG("failed to write in redis: ~p", [Err]),
@@ -52,9 +55,12 @@ disable(LUser, LServer, LResource) ->
     USKey = us_key(LUser, LServer),
     NodeKey = node_key(),
     JID = jid:encode({LUser, LServer, LResource}),
-    case ejabberd_redis:qp([["HDEL", USKey, LResource],
-                           ["SREM", NodeKey, JID]]) of
-       [{ok, _}, {ok, _}] ->
+    case ejabberd_redis:multi(
+          fun() ->
+                  ejabberd_redis:hdel(USKey, [LResource]),
+                  ejabberd_redis:srem(NodeKey, [JID])
+          end) of
+       {ok, _} ->
            ok;
        Err ->
            ?ERROR_MSG("failed to delete from redis: ~p", [Err]),
@@ -63,9 +69,9 @@ disable(LUser, LServer, LResource) ->
 
 list(LUser, LServer) ->
     USKey = us_key(LUser, LServer),
-    case ejabberd_redis:q(["HGETALL", USKey]) of
+    case ejabberd_redis:hgetall(USKey) of
        {ok, Vals} ->
-           decode_vals(Vals);
+           Vals;
        Err ->
            ?ERROR_MSG("failed to read from redis: ~p", [Err]),
            []
@@ -77,24 +83,26 @@ list(LUser, LServer) ->
 clean_table() ->
     ?INFO_MSG("Cleaning Redis 'carboncopy' table...", []),
     NodeKey = node_key(),
-    case ejabberd_redis:q(["SMEMBERS", NodeKey]) of
+    case ejabberd_redis:smembers(NodeKey) of
        {ok, JIDs} ->
-           lists:foreach(
-             fun(JID) ->
-                     {U, S, R} = jid:split(jid:decode(JID)),
-                     USKey = us_key(U, S),
-                     case ejabberd_redis:q(["HDEL", USKey, R]) of
-                         {ok, _} ->
-                             ok;
-                         Err ->
-                             ?ERROR_MSG("failed to delete from redis: ~p",
-                                        [Err])
-                     end
-             end, JIDs);
+           case ejabberd_redis:multi(
+                  fun() ->
+                          lists:foreach(
+                            fun(JID) ->
+                                    {U, S, R} = jid:split(jid:decode(JID)),
+                                    USKey = us_key(U, S),
+                                    ejabberd_redis:hdel(USKey, [R])
+                            end, JIDs)
+                  end) of
+               {ok, _} ->
+                   ok;
+               Err ->
+                   ?ERROR_MSG("failed to delete from redis: ~p", [Err])
+           end;
        Err ->
            ?ERROR_MSG("failed to read from redis: ~p", [Err])
     end,
-    case ejabberd_redis:q(["DEL", NodeKey]) of
+    case ejabberd_redis:del([NodeKey]) of
        {ok, _} -> ok;
        Error -> ?ERROR_MSG("failed to delete from redis: ~p", [Error])
     end.
@@ -105,8 +113,3 @@ us_key(LUser, LServer) ->
 node_key() ->
     Node = erlang:atom_to_binary(node(), latin1),
     <<"ejabberd:carboncopy:nodes:", Node/binary>>.
-
-decode_vals([Resource, NS|Vals]) ->
-    [{Resource, NS}|decode_vals(Vals)];
-decode_vals([]) ->
-    [].