]> granicus.if.org Git - ejabberd/commitdiff
Add Redis backend for SM
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Wed, 11 Mar 2015 11:46:57 +0000 (14:46 +0300)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Wed, 11 Mar 2015 11:46:57 +0000 (14:46 +0300)
configure.ac
rebar.config.script
src/ejabberd_sm.erl
src/ejabberd_sm_mnesia.erl
src/ejabberd_sm_odbc.erl
src/ejabberd_sm_redis.erl [new file with mode: 0644]
vars.config.in

index edd65747a62f79c1450bd0d6d0a3e28e86114fd3..6bef15864a87d4b4b25e3b4b79fa4e9cd38da4d6 100644 (file)
@@ -106,10 +106,10 @@ AC_ARG_ENABLE(mssql,
 esac],[db_type=generic])
 
 AC_ARG_ENABLE(all,
-[AC_HELP_STRING([--enable-all], [same as --enable-nif --enable-odbc --enable-mysql --enable-pgsql --enable-pam --enable-zlib --enable-riak --enable-json --enable-elixir --enable-iconv --enable-debug --enable-lager --enable-tools (useful for Dialyzer checks, default: no)])],
+[AC_HELP_STRING([--enable-all], [same as --enable-nif --enable-odbc --enable-mysql --enable-pgsql --enable-pam --enable-zlib --enable-riak --enable-redis --enable-json --enable-elixir --enable-iconv --enable-debug --enable-lager --enable-tools (useful for Dialyzer checks, default: no)])],
 [case "${enableval}" in
-  yes) nif=true odbc=true mysql=true pgsql=true pam=true zlib=true riak=true json=true elixir=true iconv=true debug=true lager=true tools=true ;;
-  no) nif=false odbc=false mysql=false pgsql=false pam=false zlib=false riak=false json=false elixir=false iconv=false debug=false lager=false tools=false ;;
+  yes) nif=true odbc=true mysql=true pgsql=true pam=true zlib=true riak=true redis=true json=true elixir=true iconv=true debug=true lager=true tools=true ;;
+  no) nif=false odbc=false mysql=false pgsql=false pam=false zlib=false riak=false redis=false json=false elixir=false iconv=false debug=false lager=false tools=false ;;
   *) AC_MSG_ERROR(bad value ${enableval} for --enable-all) ;;
 esac],[])
 
@@ -177,6 +177,14 @@ AC_ARG_ENABLE(riak,
   *) AC_MSG_ERROR(bad value ${enableval} for --enable-riak) ;;
 esac],[if test "x$riak" = "x"; then riak=false; fi])
 
+AC_ARG_ENABLE(redis,
+[AC_HELP_STRING([--enable-redis], [enable Redis support (default: no)])],
+[case "${enableval}" in
+  yes) redis=true ;;
+  no)  redis=false ;;
+  *) AC_MSG_ERROR(bad value ${enableval} for --enable-redis) ;;
+esac],[if test "x$redis" = "x"; then redis=false; fi])
+
 AC_ARG_ENABLE(json,
 [AC_HELP_STRING([--enable-json], [enable JSON support for mod_bosh (default: no)])],
 [case "${enableval}" in
@@ -249,6 +257,7 @@ AC_SUBST(pgsql)
 AC_SUBST(pam)
 AC_SUBST(zlib)
 AC_SUBST(riak)
+AC_SUBST(redis)
 AC_SUBST(json)
 AC_SUBST(elixir)
 AC_SUBST(iconv)
index f7846c0fa9604c8c9afbd4ec849f804ce5673c58..69964852bb131ad902382e92f8e492d2a3d2aba7 100644 (file)
@@ -106,6 +106,8 @@ CfgDeps = lists:flatmap(
                     [{p1_logger, ".*", {git, "git://github.com/processone/p1_logger"}}];
                ({tools, true}) ->
                     [{meck, "0.*", {git, "https://github.com/eproxus/meck"}}];
+              ({redis, true}) ->
+                   [{eredis, ".*", {git, "git://github.com/wooga/eredis"}}];
                (_) ->
                     []
             end, Cfg),
index 67a82d024c04097846d4e3054332aaa3cd783051..abe15d9ffee6938576bd4084817a4b5add863111 100644 (file)
@@ -78,9 +78,9 @@
 -include("ejabberd_sm.hrl").
 
 -callback init() -> ok | {error, any()}.
--callback get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
 -callback set_session(#session{}) -> ok.
--callback delete_session(binary(), sid()) -> ok.
+-callback delete_session(binary(), binary(), binary(), sid()) ->
+    {ok, #session{}} | {error, notfound}.
 -callback get_sessions() -> [#session{}].
 -callback get_sessions(binary()) -> [#session{}].
 -callback get_sessions(binary(), binary()) -> [#session{}].
@@ -137,12 +137,13 @@ open_session(SID, User, Server, Resource, Info) ->
 
 close_session(SID, User, Server, Resource) ->
     Mod = get_sm_backend(),
+    LUser = jlib:nodeprep(User),
     LServer = jlib:nameprep(Server),
-    Info = case Mod:get_session(LServer, SID) of
+    LResource = jlib:resourceprep(Resource),
+    Info = case Mod:delete_session(LUser, LServer, LResource, SID) of
               {ok, #session{info = I}} -> I;
               {error, notfound} -> []
           end,
-    Mod:delete_session(LServer, SID),
     JID = jlib:make_jid(User, Server, Resource),
     ejabberd_hooks:run(sm_remove_connection_hook,
                       JID#jid.lserver, [SID, JID, Info]).
@@ -731,7 +732,8 @@ get_sm_backend() ->
     DBType = ejabberd_config:get_option(sm_db_type,
                                        fun(mnesia) -> mnesia;
                                           (internal) -> mnesia;
-                                          (odbc) -> odbc
+                                          (odbc) -> odbc;
+                                          (redis) -> redis
                                        end, mnesia),
     list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)).
 
index 59a6c64f69875d6233ffec495e4e186b72ca2ba1..7acc1022dbd348b538930b596b784dfcb834bee0 100644 (file)
@@ -13,9 +13,8 @@
 
 %% API
 -export([init/0,
-        get_session/2,
         set_session/1,
-        delete_session/2,
+        delete_session/4,
         get_sessions/0,
         get_sessions/1,
         get_sessions/2,
@@ -44,22 +43,20 @@ init() ->
            Err
     end.
 
--spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
-get_session(_LServer, SID) ->
-    case mnesia:dirty_read(session, SID) of
-       [] ->
-           {error, notfound};
-       [Session] ->
-           {ok, Session}
-    end.
-
 -spec set_session(#session{}) -> ok.
 set_session(Session) ->
     mnesia:dirty_write(Session).
 
--spec delete_session(binary(), sid()) -> ok.
-delete_session(_LServer, SID) ->
-    mnesia:dirty_delete(session, SID).
+-spec delete_session(binary(), binary(), binary(), sid()) ->
+                           {ok, #session{}} | {error, notfound}.
+delete_session(_LUser, _LServer, _LResource, SID) ->
+    case mnesia:dirty_read(session, SID) of
+       [Session] ->
+           mnesia:dirty_delete(session, SID),
+           {ok, Session};
+       [] ->
+           {error, notfound}
+    end.
 
 -spec get_sessions() -> [#session{}].
 get_sessions() ->
index 55bbc74fb1f53f642302b0f265ac9fb99a9366b9..946f58ffa6cd1b56d2a3e0e8aea3ce2b76e64e2f 100644 (file)
@@ -12,9 +12,8 @@
 
 %% API
 -export([init/0,
-        get_session/2,
         set_session/1,
-        delete_session/2,
+        delete_session/4,
         get_sessions/0,
         get_sessions/1,
         get_sessions/2,
@@ -31,6 +30,7 @@
 -spec init() -> ok | {error, any()}.
 init() ->
     Node = ejabberd_odbc:escape(jlib:atom_to_binary(node())),
+    ?INFO_MSG("Cleaning SQL SM table...", []),
     lists:foldl(
       fun(Host, ok) ->
              case ejabberd_odbc:sql_query(
@@ -45,23 +45,6 @@ init() ->
              Err
       end, ok, ?MYHOSTS).
 
--spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
-get_session(LServer, {Now, Pid} = SID) ->
-    Host = ejabberd_odbc:escape(LServer),
-    PidS = list_to_binary(erlang:pid_to_list(Pid)),
-    TS = now_to_timestamp(Now),
-    case ejabberd_odbc:sql_query(
-          Host, [<<"select username, resource, priority, info from sm ">>,
-                 <<"where usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of
-       {selected, _, [[User, Resource, Priority, Info]|_]} ->
-           {ok, #session{sid = SID, us = {User, Resource},
-                         usr = {User, Resource, LServer},
-                         priority = dec_priority(Priority),
-                         info = ejabberd_odbc:decode_term(Info)}};
-       {selected, _, []} ->
-           {error, notfound}
-    end.
-
 set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R},
                     priority = Priority, info = Info}) ->
     Username = ejabberd_odbc:escape(U),
@@ -84,16 +67,23 @@ set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R},
            ?ERROR_MSG("failed to update 'sm' table: ~p", [Err])
     end.
 
-delete_session(LServer, {Now, Pid}) ->
+delete_session(_LUser, LServer, _LResource, {Now, Pid}) ->
     TS = now_to_timestamp(Now),
     PidS = list_to_binary(erlang:pid_to_list(Pid)),
     case ejabberd_odbc:sql_query(
-          LServer, [<<"delete from sm where usec='">>,
-                    TS, <<"' and pid='">>, PidS, <<"'">>]) of
-       {updated, _} ->
-           ok;
+          LServer,
+          [<<"select usec, pid, username, resource, priority, info ">>,
+           <<"from sm where usec='">>, TS, <<"' and pid='">>,PidS, <<"'">>]) of
+       {selected, _, [Row]} ->
+           ejabberd_odbc:sql_query(
+             LServer, [<<"delete from sm where usec='">>,
+                       TS, <<"' and pid='">>, PidS, <<"'">>]),
+           {ok, row_to_session(LServer, Row)};
+       {selected, _, []} ->
+           {error, notfound};
        Err ->
-           ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err])
+           ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]),
+           {error, notfound}
     end.
 
 get_sessions() ->
diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl
new file mode 100644 (file)
index 0000000..7abab18
--- /dev/null
@@ -0,0 +1,208 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2015, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 11 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(ejabberd_sm_redis).
+
+-behaviour(ejabberd_sm).
+
+%% API
+-export([init/0,
+        set_session/1,
+        delete_session/4,
+        get_sessions/0,
+        get_sessions/1,
+        get_sessions/2,
+        get_sessions/3]).
+
+-include("ejabberd.hrl").
+-include("ejabberd_sm.hrl").
+-include("logger.hrl").
+-include("jlib.hrl").
+
+-define(PROCNAME, 'ejabberd_redis_client').
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec init() -> ok | {error, any()}.
+init() ->
+    Server = ejabberd_config:get_option(redis_server,
+                                       fun iolist_to_list/1,
+                                       "localhost"),
+    Port = ejabberd_config:get_option(redis_port,
+                                     fun(P) when is_integer(P),
+                                                 P>0, P<65536 ->
+                                             P
+                                     end, 6379),
+    DB = ejabberd_config:get_option(redis_db,
+                                   fun(I) when is_integer(I), I >= 0 ->
+                                           I
+                                   end, 0),
+    Pass = ejabberd_config:get_option(redis_password,
+                                     fun iolist_to_list/1,
+                                     ""),
+    ReconnTimeout = timer:seconds(
+                     ejabberd_config:get_option(
+                       redis_reconnect_timeout,
+                       fun(I) when is_integer(I), I>0 -> I end,
+                       1)),
+    ConnTimeout = timer:seconds(
+                   ejabberd_config:get_option(
+                     redis_connect_timeout,
+                     fun(I) when is_integer(I), I>0 -> I end,
+                     1)),
+    case eredis:start_link(Server, Port, DB, Pass,
+                          ReconnTimeout, ConnTimeout) of
+       {ok, Client} ->
+           register(?PROCNAME, Client),
+           clean_table(),
+           ok;
+       {error, _} = Err ->
+           ?ERROR_MSG("failed to start redis client: ~p", [Err]),
+           Err
+    end.
+
+-spec set_session(#session{}) -> ok.
+set_session(Session) ->
+    T = term_to_binary(Session),
+    USKey = us_to_key(Session#session.us),
+    SIDKey = sid_to_key(Session#session.sid),
+    ServKey = server_to_key(element(2, Session#session.us)),
+    USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
+    case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T],
+                              ["HSET", ServKey, USSIDKey, T]]) of
+       [{ok, _}, {ok, _}] ->
+           ok;
+       Err ->
+           ?ERROR_MSG("failed to set session for redis: ~p", [Err])
+    end.
+
+-spec delete_session(binary(), binary(), binary(), sid()) -> ok.
+delete_session(LUser, LServer, _LResource, SID) ->
+    USKey = us_to_key({LUser, LServer}),
+    case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+       {ok, Vals} ->
+           Ss = decode_session_list(Vals),
+           case lists:keyfind(SID, #session.sid, Ss) of
+               false ->
+                   {error, notfound};
+               Session ->
+                   SIDKey = sid_to_key(SID),
+                   ServKey = server_to_key(element(2, Session#session.us)),
+                   USSIDKey = us_sid_to_key(Session#session.us, SID),
+                   eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey],
+                                         ["HDEL", ServKey, USSIDKey]]),
+                   {ok, Session}
+           end;
+       Err ->
+           ?ERROR_MSG("failed to delete session from redis: ~p", [Err]),
+           {error, notfound}
+    end.
+
+-spec get_sessions() -> [#session{}].
+get_sessions() ->
+    lists:flatmap(
+      fun(LServer) ->
+             get_sessions(LServer)
+      end, ?MYHOSTS).
+
+-spec get_sessions(binary()) -> [#session{}].
+get_sessions(LServer) ->
+    ServKey = server_to_key(LServer),
+    case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of
+       {ok, Vals} ->
+           decode_session_list(Vals);
+       Err ->
+           ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
+           []
+    end.
+
+-spec get_sessions(binary(), binary()) -> [#session{}].
+get_sessions(LUser, LServer) ->
+    USKey = us_to_key({LUser, LServer}),
+    case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+       {ok, Vals} when is_list(Vals) ->
+           decode_session_list(Vals);
+       Err ->
+           ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
+           []
+    end.
+
+-spec get_sessions(binary(), binary(), binary()) -> [#session{}].
+get_sessions(LUser, LServer, LResource) ->
+    USKey = us_to_key({LUser, LServer}),
+    case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+       {ok, Vals} when is_list(Vals) ->
+           [S || S <- decode_session_list(Vals),
+                 element(3, S#session.usr) == LResource];
+       Err ->
+           ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
+           []
+    end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+iolist_to_list(IOList) ->
+    binary_to_list(iolist_to_binary(IOList)).
+
+us_to_key({LUser, LServer}) ->
+    <<"ejabberd:sm:", LUser/binary, "@", LServer/binary>>.
+
+server_to_key(LServer) ->
+    <<"ejabberd:sm:", LServer/binary>>.
+
+us_sid_to_key(US, SID) ->
+    term_to_binary({US, SID}).
+
+sid_to_key(SID) ->
+    term_to_binary(SID).
+
+decode_session_list([_, Val|T]) ->
+    [binary_to_term(Val)|decode_session_list(T)];
+decode_session_list([]) ->
+    [].
+
+clean_table() ->
+    ?INFO_MSG("Cleaning Redis SM table...", []),
+    lists:foreach(
+      fun(LServer) ->
+             ServKey = server_to_key(LServer),
+             case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of
+                 {ok, []} ->
+                     ok;
+                 {ok, Vals} ->
+                     Vals1 = lists:filter(
+                               fun(USSIDKey) ->
+                                       {_, SID} = binary_to_term(USSIDKey),
+                                       node(element(2, SID)) == node()
+                               end, Vals),
+                     Q1 = ["HDEL", ServKey | Vals1],
+                     Q2 = lists:map(
+                            fun(USSIDKey) ->
+                                    {US, SID} = binary_to_term(USSIDKey),
+                                    USKey = us_to_key(US),
+                                    SIDKey = sid_to_key(SID),
+                                    ["HDEL", USKey, SIDKey]
+                            end, Vals1),
+                     Res = eredis:qp(?PROCNAME, [Q1|Q2]),
+                     case lists:filter(
+                            fun({ok, _}) -> false;
+                               (_) -> true
+                            end, Res) of
+                         [] ->
+                             ok;
+                         Errs ->
+                             ?ERROR_MSG("failed to clean redis table for "
+                                        "server ~s: ~p", [LServer, Errs])
+                     end;
+                 Err ->
+                     ?ERROR_MSG("failed to clean redis table for "
+                                "server ~s: ~p", [LServer, Err])
+             end
+      end, ?MYHOSTS).
index 2efa449aece2540d424bee4277fd0f77a31e9109..cb82775aae793b064e1a1737c1d0c15443938af6 100644 (file)
@@ -26,6 +26,7 @@
 {pam, @pam@}.
 {zlib, @zlib@}.
 {riak, @riak@}.
+{redis, @redis@}.
 {json, @json@}.
 {elixir, @elixir@}.
 {lager, @lager@}.