]> granicus.if.org Git - ejabberd/commitdiff
Improve Riak support
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>
Tue, 13 Nov 2012 12:56:27 +0000 (22:56 +1000)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 10 Jul 2014 09:26:37 +0000 (13:26 +0400)
16 files changed:
src/ejabberd_auth_riak.erl [new file with mode: 0644]
src/ejabberd_riak.erl
src/ejabberd_riak_sup.erl
src/mod_announce.erl
src/mod_blocking.erl
src/mod_caps.erl
src/mod_irc.erl
src/mod_last.erl
src/mod_muc.erl
src/mod_offline.erl
src/mod_privacy.erl
src/mod_private.erl
src/mod_roster.erl
src/mod_shared_roster.erl
src/mod_vcard.erl
src/mod_vcard_xupdate.erl

diff --git a/src/ejabberd_auth_riak.erl b/src/ejabberd_auth_riak.erl
new file mode 100644 (file)
index 0000000..870aa48
--- /dev/null
@@ -0,0 +1,285 @@
+%%%----------------------------------------------------------------------
+%%% File    : ejabberd_auth_riak.erl
+%%% Author  : Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% Purpose : Authentification via Riak
+%%% Created : 12 Nov 2012 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2012   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_auth_riak).
+
+-author('alexey@process-one.net').
+
+-behaviour(ejabberd_auth).
+
+%% External exports
+-export([start/1, set_password/3, check_password/3,
+        check_password/5, try_register/3,
+        dirty_get_registered_users/0, get_vh_registered_users/1,
+        get_vh_registered_users/2,
+        get_vh_registered_users_number/1,
+        get_vh_registered_users_number/2, get_password/2,
+        get_password_s/2, is_user_exists/2, remove_user/2,
+        remove_user/3, store_type/0, export/1,
+        plain_password_required/0]).
+
+-include("ejabberd.hrl").
+
+-record(passwd, {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$1',
+                 password = <<"">> :: binary() | scram() | '_'}).
+
+-define(SALT_LENGTH, 16).
+
+start(_Host) ->
+    ok.
+
+plain_password_required() ->
+    case is_scrammed() of
+      false -> false;
+      true -> true
+    end.
+
+store_type() ->
+    case is_scrammed() of
+      false -> plain; %% allows: PLAIN DIGEST-MD5 SCRAM
+      true -> scram %% allows: PLAIN SCRAM
+    end.
+
+check_password(User, Server, Password) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+        {ok, #passwd{password = Password}} when is_binary(Password) ->
+            Password /= <<"">>;
+        {ok, #passwd{password = Scram}} when is_record(Scram, scram) ->
+            is_password_scram_valid(Password, Scram);
+        _ ->
+            false
+    end.
+
+check_password(User, Server, Password, Digest,
+              DigestGen) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+      {ok, #passwd{password = Passwd}} when is_binary(Passwd) ->
+         DigRes = if Digest /= <<"">> ->
+                         Digest == DigestGen(Passwd);
+                     true -> false
+                  end,
+         if DigRes -> true;
+            true -> (Passwd == Password) and (Password /= <<"">>)
+         end;
+      {ok, #passwd{password = Scram}}
+         when is_record(Scram, scram) ->
+         Passwd = jlib:decode_base64(Scram#scram.storedkey),
+         DigRes = if Digest /= <<"">> ->
+                         Digest == DigestGen(Passwd);
+                     true -> false
+                  end,
+         if DigRes -> true;
+            true -> (Passwd == Password) and (Password /= <<"">>)
+         end;
+      _ -> false
+    end.
+
+set_password(User, Server, Password) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    US = {LUser, LServer},
+    if (LUser == error) or (LServer == error) ->
+          {error, invalid_jid};
+       true ->
+            Password2 = case is_scrammed() and is_binary(Password)
+                        of
+                            true -> password_to_scram(Password);
+                            false -> Password
+                        end,
+            ok = ejabberd_riak:put(#passwd{us = US, password = Password2},
+                                   [{'2i', [{<<"host">>, LServer}]}])
+    end.
+
+try_register(User, Server, PasswordList) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Password = iolist_to_binary(PasswordList),
+    US = {LUser, LServer},
+    if (LUser == error) or (LServer == error) ->
+          {error, invalid_jid};
+       true ->
+            case ejabberd_riak:get(passwd, US) of
+                {error, notfound} ->
+                    Password2 = case is_scrammed() and
+                                    is_binary(Password)
+                                of
+                                    true -> password_to_scram(Password);
+                                    false -> Password
+                                end,
+                    {atomic, ejabberd_riak:put(
+                               #passwd{us = US,
+                                       password = Password2},
+                               [{'2i', [{<<"host">>, LServer}]}])};
+                {ok, _} ->
+                    exists;
+                Err ->
+                    {atomic, Err}
+            end
+    end.
+
+dirty_get_registered_users() ->
+    lists:flatmap(
+      fun(Server) ->
+              get_vh_registered_users(Server)
+      end, ejabberd_config:get_vh_by_auth_method(riak)).
+
+get_vh_registered_users(Server) ->
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get_keys_by_index(passwd, <<"host">>, LServer) of
+        {ok, Users} ->
+            Users;
+        _ ->
+            []
+    end.
+
+get_vh_registered_users(Server, _) ->
+    get_vh_registered_users(Server).
+
+get_vh_registered_users_number(Server) ->
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:count_by_index(passwd, <<"host">>, LServer) of
+        {ok, N} ->
+            N;
+        _ ->
+            0
+    end.
+
+get_vh_registered_users_number(Server, _) ->
+    get_vh_registered_users_number(Server).
+
+get_password(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+      {ok, #passwd{password = Password}}
+         when is_binary(Password) ->
+         Password;
+      {ok, #passwd{password = Scram}}
+         when is_record(Scram, scram) ->
+         {jlib:decode_base64(Scram#scram.storedkey),
+          jlib:decode_base64(Scram#scram.serverkey),
+          jlib:decode_base64(Scram#scram.salt),
+          Scram#scram.iterationcount};
+      _ -> false
+    end.
+
+get_password_s(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+      {ok, #passwd{password = Password}}
+         when is_binary(Password) ->
+         Password;
+      {ok, #passwd{password = Scram}}
+         when is_record(Scram, scram) ->
+         <<"">>;
+      _ -> <<"">>
+    end.
+
+is_user_exists(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+      {error, notfound} -> false;
+      {ok, _} -> true;
+      Err -> Err
+    end.
+
+remove_user(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    ejabberd_riak:delete(passwd, {LUser, LServer}),
+    ok.
+
+remove_user(User, Server, Password) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    case ejabberd_riak:get(passwd, {LUser, LServer}) of
+        {ok, #passwd{password = Password}}
+          when is_binary(Password) ->
+            ejabberd_riak:delete(passwd, {LUser, LServer}),
+            ok;
+        {ok, #passwd{password = Scram}}
+          when is_record(Scram, scram) ->
+            case is_password_scram_valid(Password, Scram) of
+                true ->
+                    ejabberd_riak:delete(passwd, {LUser, LServer}),
+                    ok;
+                false -> not_allowed
+            end;
+        _ -> not_exists
+    end.
+
+%%%
+%%% SCRAM
+%%%
+
+is_scrammed() ->
+    scram ==
+      ejabberd_config:get_local_option({auth_password_format, ?MYNAME},
+                                       fun(V) -> V end).
+
+password_to_scram(Password) ->
+    password_to_scram(Password,
+                     ?SCRAM_DEFAULT_ITERATION_COUNT).
+
+password_to_scram(Password, IterationCount) ->
+    Salt = crypto:rand_bytes(?SALT_LENGTH),
+    SaltedPassword = scram:salted_password(Password, Salt,
+                                          IterationCount),
+    StoredKey =
+       scram:stored_key(scram:client_key(SaltedPassword)),
+    ServerKey = scram:server_key(SaltedPassword),
+    #scram{storedkey = jlib:encode_base64(StoredKey),
+          serverkey = jlib:encode_base64(ServerKey),
+          salt = jlib:encode_base64(Salt),
+          iterationcount = IterationCount}.
+
+is_password_scram_valid(Password, Scram) ->
+    IterationCount = Scram#scram.iterationcount,
+    Salt = jlib:decode_base64(Scram#scram.salt),
+    SaltedPassword = scram:salted_password(Password, Salt,
+                                          IterationCount),
+    StoredKey =
+       scram:stored_key(scram:client_key(SaltedPassword)),
+    jlib:decode_base64(Scram#scram.storedkey) == StoredKey.
+
+export(_Server) ->
+    [{passwd,
+      fun(Host, #passwd{us = {LUser, LServer}, password = Password})
+            when LServer == Host ->
+              Username = ejabberd_odbc:escape(LUser),
+              Pass = ejabberd_odbc:escape(Password),
+              [[<<"delete from users where username='">>, Username, <<"';">>],
+               [<<"insert into users(username, password) "
+                  "values ('">>, Username, <<"', '">>, Pass, <<"');">>]];
+         (_Host, _R) ->
+              []
+      end}].
index 892e8fd696a699a4ed619611848c9e68b6a8beb1..04ff1ea11503b9668ac9a1b03f91fff5d35e2e6a 100644 (file)
@@ -1,11 +1,10 @@
-%%%----------------------------------------------------------------------
-%%% File    : ejabberd_riak.erl
-%%% Author  : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Serve Riak connection
+%%%-------------------------------------------------------------------
+%%% @author Alexey Shchepin <alexey@process-one.net>
+%%% @doc
+%%% Interface for Riak database
+%%% @end
 %%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%% @copyright (C) 2002-2012   ProcessOne
 %%%
 %%% This program is free software; you can redistribute it and/or
 %%% modify it under the terms of the GNU General Public License as
 %%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
 %%% 02111-1307 USA
 %%%
-%%%----------------------------------------------------------------------
-
+%%%-------------------------------------------------------------------
 -module(ejabberd_riak).
--author('alexey@process-one.net').
-
-%% External exports
--export([start_link/3,
-         put/4,
-         put/5,
-         get_object/3,
-         get/3,
-         get_objects_by_index/4,
-         get_by_index/4,
-         get_keys_by_index/4,
-         count_by_index/4,
-         delete/3]).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/3, make_bucket/1, put/1, put/2,
+         get/1, get/2, get_by_index/3, delete/1, delete/2,
+         count_by_index/3, get_by_index_range/4,
+         get_keys/1, get_keys_by_index/3,
+         count/1, delete_by_index/3]).
+%% For debugging
+-export([get_tables/0]).
+%% map/reduce exports
+-export([map_key/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
 
 -include("ejabberd.hrl").
 
-%%%----------------------------------------------------------------------
+-record(state, {pid = self() :: pid()}).
+
+-type index() :: {binary(), any()}.
+
+-type index_info() :: [{i, any()} | {'2i', [index()]}].
+
+%% The `index_info()' is used in put/delete functions:
+%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
+%% There must be only one primary index. If `i' is not specified,
+%% the first element of the record is assumed as a primary index,
+%% i.e. `i' = element(2, Record).
+
+-export_types([index_info/0]).
+
+%%%===================================================================
 %%% API
-%%%----------------------------------------------------------------------
-start_link(Server, Port, StartInterval) ->
-    {ok, Pid} = riakc_pb_socket:start_link(
-                  Server, Port,
-                  [auto_reconnect]),
-    ejabberd_riak_sup:add_pid(Pid),
-    {ok, Pid}.
-
-make_bucket(Host, Table) ->
-    iolist_to_binary([Host, $@, Table]).
-
-put(Host, Table, Key, Value) ->
-    Bucket = make_bucket(Host, Table),
-    Obj = riakc_obj:new(Bucket, Key, Value),
-    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj).
-
-put(Host, Table, Key, Value, Indexes) ->
-    Bucket = make_bucket(Host, Table),
-    Obj = riakc_obj:new(Bucket, Key, Value),
-    MetaData = dict:store(<<"index">>, Indexes, dict:new()),
-    Obj2 = riakc_obj:update_metadata(Obj, MetaData),
-    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2).
-
-get_object(Host, Table, Key) ->
-    Bucket = make_bucket(Host, Table),
+%%%===================================================================
+%% @private
+start_link(Server, Port, _StartInterval) ->
+    gen_server:start_link(?MODULE, [Server, Port], []).
+
+-spec make_bucket(atom()) -> binary().
+%% @doc Makes a bucket from a table name
+%% @private
+make_bucket(Table) ->
+    erlang:atom_to_binary(Table, utf8).
+
+-spec put(tuple()) -> ok | {error, any()}.
+%% @equiv put(Record, [])
+put(Record) ->
+    ?MODULE:put(Record, []).
+
+-spec put(tuple(), index_info()) -> ok | {error, any()}.
+%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
+put(Rec, IndexInfo) ->
+    Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
+    SecIdxs = [encode_index_key(K, V) ||
+                  {K, V} <- proplists:get_value('2i', IndexInfo, [])],
+    Table = element(1, Rec),
+    Value = term_to_binary(Rec),
+    case put_raw(Table, Key, Value, SecIdxs) of
+        ok ->
+            ok;
+        Error ->
+            log_error(Error, put, [{record, Rec},
+                                   {index_info, IndexInfo}]),
+            Error
+    end.
+
+put_raw(Table, Key, Value, Indexes) ->
+    Bucket = make_bucket(Table),
+    Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"),
+    Obj1 = if Indexes /= [] ->
+                   MetaData = dict:store(<<"index">>, Indexes, dict:new()),
+                   riakc_obj:update_metadata(Obj, MetaData);
+              true ->
+                   Obj
+           end,
+    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
+
+get_object_raw(Table, Key) ->
+    Bucket = make_bucket(Table),
     riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
 
-get(Host, Table, Key) ->
-    case get_object(Host, Table, Key) of
+-spec get(atom()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns all objects from table `Table'
+get(Table) ->
+    Bucket = make_bucket(Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           Bucket,
+           [{map, {modfun, riak_kv_mapreduce, map_object_value},
+             none, true}]) of
+        {ok, [{_, Objs}]} ->
+            {ok, lists:flatmap(
+                   fun(Obj) ->
+                           case catch binary_to_term(Obj) of
+                               {'EXIT', _} ->
+                                   Error = {error, make_invalid_object(Obj)},
+                                   log_error(Error, get,
+                                             [{table, Table}]),
+                                   [];
+                               Term ->
+                                   [Term]
+                           end
+                   end, Objs)};
+        {error, notfound} ->
+            {ok, []};
+        Error ->
+            Error
+    end.
+
+-spec get(atom(), any()) -> {ok, any()} | {error, any()}.
+%% @doc Reads record by `Key' from table `Table'
+get(Table, Key) ->
+    case get_raw(Table, encode_key(Key)) of
+        {ok, Val} ->
+            case catch binary_to_term(Val) of
+                {'EXIT', _} ->
+                    Error = {error, make_invalid_object(Val)},
+                    log_error(Error, get, [{table, Table}, {key, Key}]),
+                    {error, notfound};
+                Term ->
+                    {ok, Term}
+            end;
+        Error ->
+            log_error(Error, get, [{table, Table},
+                                   {key, Key}]),
+            Error
+    end.
+
+-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}.
+%% @doc Reads records by `Index' and value `Key' from `Table' 
+get_by_index(Table, Index, Key) ->
+    {NewIndex, NewKey} = encode_index_key(Index, Key),
+    case get_by_index_raw(Table, NewIndex, NewKey) of
+        {ok, Vals} ->
+            {ok, lists:flatmap(
+                   fun(Val) ->
+                           case catch binary_to_term(Val) of
+                               {'EXIT', _} ->
+                                   Error = {error, make_invalid_object(Val)},
+                                   log_error(Error, get_by_index,
+                                             [{table, Table},
+                                              {index, Index},
+                                              {key, Key}]),
+                                   [];
+                               Term ->
+                                   [Term]
+                           end
+                   end, Vals)};
+        {error, notfound} ->
+            {ok, []};
+        Error ->
+            log_error(Error, get_by_index,
+                      [{table, Table},
+                       {index, Index},
+                       {key, Key}]),
+            Error
+    end.
+
+-spec get_by_index_range(atom(), binary(), any(), any()) ->
+                                {ok, [any()]} | {error, any()}.
+%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
+get_by_index_range(Table, Index, FromKey, ToKey) ->
+    {NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
+    {NewIndex, NewToKey} = encode_index_key(Index, ToKey),
+    case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
+        {ok, Vals} ->
+            {ok, lists:flatmap(
+                   fun(Val) ->
+                           case catch binary_to_term(Val) of
+                               {'EXIT', _} ->
+                                   Error = {error, make_invalid_object(Val)},
+                                   log_error(Error, get_by_index_range,
+                                             [{table, Table},
+                                              {index, Index},
+                                              {start_key, FromKey},
+                                              {end_key, ToKey}]),
+                                   [];
+                               Term ->
+                                   [Term]
+                           end
+                   end, Vals)};
+        {error, notfound} ->
+            {ok, []};
+        Error ->
+            log_error(Error, get_by_index_range,
+                      [{table, Table}, {index, Index},
+                       {start_key, FromKey}, {end_key, ToKey}]),
+            Error
+    end.
+
+get_raw(Table, Key) ->
+    case get_object_raw(Table, Key) of
         {ok, Obj} ->
             {ok, riakc_obj:get_value(Obj)};
         Error ->
             Error
     end.
 
-get_objects_by_index(Host, Table, Index, Key) ->
-    Bucket = make_bucket(Host, Table),
+-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns a list of index values
+get_keys(Table) ->
+    Bucket = make_bucket(Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           Bucket,
+           [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+        {ok, [{_, Keys}]} ->
+            {ok, Keys};
+        Error ->
+            log_error(Error, get_keys, [{table, Table}]),
+            Error
+    end.
+
+-spec get_keys_by_index(atom(), binary(),
+                        any()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns a list of primary keys of objects indexed by `Key'.
+get_keys_by_index(Table, Index, Key) ->
+    {NewIndex, NewKey} = encode_index_key(Index, Key),
+    Bucket = make_bucket(Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           {index, Bucket, NewIndex, NewKey},
+           [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+        {ok, [{_, Keys}]} ->
+            {ok, Keys};
+        Error ->
+            log_error(Error, get_keys_by_index, [{table, Table},
+                                                 {index, Index},
+                                                 {key, Key}]),
+            Error
+    end.
+
+%% @hidden
+get_tables() ->
+    riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
+
+get_by_index_raw(Table, Index, Key) ->
+    Bucket = make_bucket(Table),
     case riakc_pb_socket:mapred(
            ejabberd_riak_sup:get_random_pid(),
            {index, Bucket, Index, Key},
-           [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of
+           [{map, {modfun, riak_kv_mapreduce, map_object_value},
+             none, true}]) of
         {ok, [{_, Objs}]} ->
             {ok, Objs};
         Error ->
             Error
     end.
 
-get_by_index(Host, Table, Index, Key) ->
-    Bucket = make_bucket(Host, Table),
+get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
+    Bucket = make_bucket(Table),
     case riakc_pb_socket:mapred(
            ejabberd_riak_sup:get_random_pid(),
-           {index, Bucket, Index, Key},
+           {index, Bucket, Index, FromKey, ToKey},
            [{map, {modfun, riak_kv_mapreduce, map_object_value},
              none, true}]) of
         {ok, [{_, Objs}]} ->
@@ -103,20 +288,42 @@ get_by_index(Host, Table, Index, Key) ->
             Error
     end.
 
-get_keys_by_index(Host, Table, Index, Key) ->
-    Bucket = make_bucket(Host, Table),
+-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}.
+%% @doc Returns the number of objects in the `Table'
+count(Table) ->
+    Bucket = make_bucket(Table),
     case riakc_pb_socket:mapred(
            ejabberd_riak_sup:get_random_pid(),
-           {index, Bucket, Index, Key},
-           []) of
-        {ok, [{_, Ls}]} ->
-            {ok, [K || {_, K} <- Ls]};
+           Bucket,
+           [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+             none, true}]) of
+        {ok, [{_, [Cnt]}]} ->
+            {ok, Cnt};
         Error ->
+            log_error(Error, count, [{table, Table}]),
             Error
     end.
 
-count_by_index(Host, Table, Index, Key) ->
-    Bucket = make_bucket(Host, Table),
+-spec count_by_index(atom(), binary(), any()) ->
+                            {ok, non_neg_integer()} | {error, any()}.
+%% @doc Returns the number of objects in the `Table' by index
+count_by_index(Tab, Index, Key) ->
+    {NewIndex, NewKey} = encode_index_key(Index, Key),
+    case count_by_index_raw(Tab, NewIndex, NewKey) of
+        {ok, Cnt} ->
+            {ok, Cnt};
+        {error, notfound} ->
+            {ok, 0};
+        Error ->
+            log_error(Error, count_by_index,
+                      [{table, Tab},
+                       {index, Index},
+                       {key, Key}]),
+            Error
+    end.
+
+count_by_index_raw(Table, Index, Key) ->
+    Bucket = make_bucket(Table),
     case riakc_pb_socket:mapred(
            ejabberd_riak_sup:get_random_pid(),
            {index, Bucket, Index, Key},
@@ -128,7 +335,154 @@ count_by_index(Host, Table, Index, Key) ->
             Error
     end.
 
-delete(Host, Table, Key) ->
-    Bucket = make_bucket(Host, Table),
+-spec delete(tuple() | atom()) -> ok | {error, any()}.
+%% @doc Same as delete(T, []) when T is record.
+%% Or deletes all elements from table if T is atom.
+delete(Rec) when is_tuple(Rec) ->
+    delete(Rec, []);
+delete(Table) when is_atom(Table) ->
+    try
+        {ok, Keys} = ?MODULE:get_keys(Table),
+        lists:foreach(
+          fun(K) ->
+                  ok = delete(Table, K)
+          end, Keys)
+    catch _:{badmatch, Err} ->
+            Err
+    end.
+
+-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}.
+%% @doc Delete an object
+delete(Rec, Opts) when is_tuple(Rec) ->
+    Table = element(1, Rec),
+    Key = proplists:get_value(i, Opts, element(2, Rec)),
+    delete(Table, Key);
+delete(Table, Key) when is_atom(Table) ->
+    case delete_raw(Table, encode_key(Key)) of
+        ok ->
+            ok;
+        Err ->
+            log_error(Err, delete, [{table, Table}, {key, Key}]),
+            Err
+    end.
+
+delete_raw(Table, Key) ->
+    Bucket = make_bucket(Table),
     riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
 
+-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
+%% @doc Deletes objects by index
+delete_by_index(Table, Index, Key) ->
+    try
+        {ok, Keys} = get_keys_by_index(Table, Index, Key),
+        lists:foreach(
+          fun(K) ->
+                  ok = delete(Table, K)
+          end, Keys)
+    catch _:{badmatch, Err} ->
+            Err
+    end.
+
+%%%===================================================================
+%%% map/reduce functions
+%%%===================================================================
+%% @private
+map_key(Obj, _, _) ->
+    [case riak_object:key(Obj) of
+         <<"b_", B/binary>> ->
+             B;
+         <<"i_", B/binary>> ->
+             list_to_integer(binary_to_list(B));
+         B ->
+             erlang:binary_to_term(B)
+     end].
+
+%%%===================================================================
+%%% gen_server API
+%%%===================================================================
+%% @private
+init([Server, Port]) ->
+    case riakc_pb_socket:start(
+           Server, Port,
+           [auto_reconnect]) of
+        {ok, Pid} ->
+            erlang:monitor(process, Pid),
+            ejabberd_riak_sup:add_pid(Pid),
+            {ok, #state{pid = Pid}};
+        Err ->
+            {stop, Err}
+    end.
+
+%% @private
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+%% @private
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%% @private
+handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) ->
+    {stop, normal, State};
+handle_info(_Info, State) ->
+    ?ERROR_MSG("unexpected info: ~p", [_Info]),
+    {noreply, State}.
+
+%% @private
+terminate(_Reason, State) ->
+    ejabberd_riak_sup:remove_pid(State#state.pid),
+    ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+encode_index_key(Idx, Key) when is_integer(Key) ->
+    {<<Idx/binary, "_int">>, Key};
+encode_index_key(Idx, Key) ->
+    {<<Idx/binary, "_bin">>, encode_key(Key)}.
+
+encode_key(Bin) when is_binary(Bin) ->
+    <<"b_", Bin/binary>>;
+encode_key(Int) when is_integer(Int) ->
+    <<"i_", (list_to_binary(integer_to_list(Int)))/binary>>;
+encode_key(Term) ->
+    erlang:term_to_binary(Term).
+
+log_error({error, notfound}, _, _) ->
+    ok;
+log_error({error, Why} = Err, Function, Opts) ->
+    Txt = lists:map(
+            fun({table, Table}) ->
+                    io_lib:fwrite("** Table: ~p~n", [Table]);
+               ({key, Key}) ->
+                    io_lib:fwrite("** Key: ~p~n", [Key]);
+               ({index, Index}) ->
+                    io_lib:fwrite("** Index = ~p~n", [Index]);
+               ({start_key, Key}) ->
+                    io_lib:fwrite("** Start Key: ~p~n", [Key]);
+               ({end_key, Key}) ->
+                    io_lib:fwrite("** End Key: ~p~n", [Key]);
+               ({record, Rec}) ->
+                    io_lib:fwrite("** Record = ~p~n", [Rec]);
+               ({index_info, IdxInfo}) ->
+                    io_lib:fwrite("** Index info = ~p~n", [IdxInfo]);
+               (_) ->
+                    ""
+            end, Opts),
+    ErrTxt = if is_binary(Why) ->
+                     io_lib:fwrite("** Error: ~s", [Why]);
+                true ->
+                     io_lib:fwrite("** Error: ~p", [Err])
+             end,
+    ?ERROR_MSG("database error:~n** Function: ~p~n~s~s",
+               [Function, Txt, ErrTxt]);
+log_error(_, _, _) ->
+    ok.
+
+make_invalid_object(Val) ->
+    list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
index 4ad7d4130c50bea1c6888f8b836dffafd1d38af4..d19b9fbe9ff643a89a890e1c1bbd2d90d5314bb7 100644 (file)
@@ -105,8 +105,9 @@ init([]) ->
     {Server, Port} =
         ejabberd_config:get_local_option(
           riak_server,
-          fun({S, P}) when is_list(S), is_integer(P), P >= 1 -> {S, P} end,
-          {"127.0.0.1", 8081}),
+          fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
+                  {binary_to_list(iolist_to_binary(S)), P}
+          end, {"127.0.0.1", 8081}),
     {ok, {{one_for_one, PoolSize*10, 1},
          lists:map(
            fun(I) ->
index fba6d3b81f633273f707d70b340796298e399106..6ccc5990eb76c2db1c08ca214f6e91d932a228dd 100644 (file)
@@ -792,6 +792,17 @@ announce_motd(Host, Packet) ->
                           end, Sessions)
                 end,
             mnesia:transaction(F);
+        riak ->
+            try
+                lists:foreach(
+                  fun({U, S, _R}) ->
+                          ok = ejabberd_riak:put(#motd_users{us = {U, S}},
+                                                 [{'2i', [{<<"server">>, S}]}])
+                  end, Sessions),
+                {atomic, ok}
+            catch _:{badmatch, Err} ->
+                    {atomic, Err}
+            end;
         odbc ->
             F = fun() ->
                         lists:foreach(
@@ -837,6 +848,9 @@ announce_motd_update(LServer, Packet) ->
                         mnesia:write(#motd{server = LServer, packet = Packet})
                 end,
             mnesia:transaction(F);
+        riak ->
+            {atomic, ejabberd_riak:put(#motd{server = LServer,
+                                             packet = Packet})};
         odbc ->
             XML = ejabberd_odbc:escape(xml:element_to_binary(Packet)),
             F = fun() ->
@@ -887,6 +901,16 @@ announce_motd_delete(LServer) ->
                                       end, Users)
                 end,
             mnesia:transaction(F);
+        riak ->
+            try
+                ok = ejabberd_riak:delete(motd, LServer),
+                ok = ejabberd_riak:delete_by_index(motd_users,
+                                                   <<"server">>,
+                                                   LServer),
+                {atomic, ok}
+            catch _:{badmatch, Err} ->
+                    {atomic, Err}
+            end;
         odbc ->
             F = fun() ->
                         ejabberd_odbc:sql_query_t([<<"delete from motd;">>])
@@ -915,6 +939,23 @@ send_motd(#jid{luser = LUser, lserver = LServer} = JID, mnesia) ->
        _ ->
            ok
     end;
+send_motd(#jid{luser = LUser, lserver = LServer} = JID, riak) ->
+    case catch ejabberd_riak:get(motd, LServer) of
+        {ok, #motd{packet = Packet}} ->
+            US = {LUser, LServer},
+            case ejabberd_riak:get(motd_users, US) of
+                {ok, #motd_users{}} ->
+                    ok;
+                _ ->
+                    Local = jlib:make_jid(<<>>, LServer, <<>>),
+                   ejabberd_router:route(Local, JID, Packet),
+                    {atomic, ejabberd_riak:put(
+                               #motd_users{us = US},
+                               [{'2i', [{<<"server">>, LServer}]}])}
+            end;
+        _ ->
+            ok
+    end;
 send_motd(#jid{luser = LUser, lserver = LServer} = JID, odbc) when LUser /= <<>> ->
     case catch ejabberd_odbc:sql_query(
                  LServer, [<<"select xml from motd where username='';">>]) of
@@ -965,6 +1006,13 @@ get_stored_motd_packet(LServer, mnesia) ->
        _ ->
            error
     end;
+get_stored_motd_packet(LServer, riak) ->
+    case ejabberd_riak:get(motd, LServer) of
+        {ok, #motd{packet = Packet}} ->
+            {ok, Packet};
+       _ ->
+           error
+    end;
 get_stored_motd_packet(LServer, odbc) ->
     case catch ejabberd_odbc:sql_query(
                  LServer, [<<"select xml from motd where username='';">>]) of
index 797b7573bddf68b581fcfd22c7a786bc4aa9d9f8..1bd7ae3b5403a380447016b5a9985d89ca40f830 100644 (file)
@@ -181,6 +181,37 @@ process_blocklist_block(LUser, LServer, Filter,
                {ok, NewDefault, NewList}
        end,
     mnesia:transaction(F);
+process_blocklist_block(LUser, LServer, Filter,
+                       riak) ->
+    {atomic,
+     begin
+         case ejabberd_riak:get(privacy, {LUser, LServer}) of
+             {ok, #privacy{default = Default, lists = Lists} = P} ->
+                 case lists:keysearch(Default, 1, Lists) of
+                     {value, {_, List}} ->
+                         NewDefault = Default,
+                         NewLists1 = lists:keydelete(Default, 1, Lists);
+                     false ->
+                         NewDefault = <<"Blocked contacts">>,
+                         NewLists1 = Lists,
+                         List = []
+                 end;
+             {error, _} ->
+                 P = #privacy{us = {LUser, LServer}},
+                 NewDefault = <<"Blocked contacts">>,
+                 NewLists1 = [],
+                 List = []
+         end,
+         NewList = Filter(List),
+         NewLists = [{NewDefault, NewList} | NewLists1],
+         case ejabberd_riak:put(P#privacy{default = NewDefault,
+                                          lists = NewLists}) of
+             ok ->
+                 {ok, NewDefault, NewList};
+             Err ->
+                 Err
+         end
+     end};
 process_blocklist_block(LUser, LServer, Filter, odbc) ->
     F = fun () ->
                Default = case
@@ -256,6 +287,30 @@ process_blocklist_unblock_all(LUser, LServer, Filter,
                end
        end,
     mnesia:transaction(F);
+process_blocklist_unblock_all(LUser, LServer, Filter,
+                              riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {ok, #privacy{default = Default, lists = Lists} = P} ->
+             case lists:keysearch(Default, 1, Lists) of
+                 {value, {_, List}} ->
+                     NewList = Filter(List),
+                     NewLists1 = lists:keydelete(Default, 1, Lists),
+                     NewLists = [{Default, NewList} | NewLists1],
+                     case ejabberd_riak:put(P#privacy{lists = NewLists}) of
+                         ok ->
+                             {ok, Default, NewList};
+                         Err ->
+                             Err
+                     end;
+                 false ->
+                     %% No default list, nothing to unblock
+                     ok
+             end;
+         {error, _} ->
+             %% No lists, nothing to unblock
+             ok
+     end};
 process_blocklist_unblock_all(LUser, LServer, Filter,
                              odbc) ->
     F = fun () ->
@@ -331,6 +386,30 @@ process_blocklist_unblock(LUser, LServer, Filter,
                end
        end,
     mnesia:transaction(F);
+process_blocklist_unblock(LUser, LServer, Filter,
+                          riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {error, _} ->
+             %% No lists, nothing to unblock
+             ok;
+         {ok, #privacy{default = Default, lists = Lists} = P} ->
+             case lists:keysearch(Default, 1, Lists) of
+                 {value, {_, List}} ->
+                     NewList = Filter(List),
+                     NewLists1 = lists:keydelete(Default, 1, Lists),
+                     NewLists = [{Default, NewList} | NewLists1],
+                     case ejabberd_riak:put(P#privacy{lists = NewLists}) of
+                         ok ->
+                             {ok, Default, NewList};
+                         Err ->
+                             Err
+                     end;
+                 false ->
+                     %% No default list, nothing to unblock
+                     ok
+             end
+     end};
 process_blocklist_unblock(LUser, LServer, Filter,
                          odbc) ->
     F = fun () ->
@@ -409,6 +488,18 @@ process_blocklist_get(LUser, LServer, mnesia) ->
            _ -> []
          end
     end;
+process_blocklist_get(LUser, LServer, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{default = Default, lists = Lists}} ->
+            case lists:keysearch(Default, 1, Lists) of
+                {value, {_, List}} -> List;
+                _ -> []
+            end;
+        {error, notfound} ->
+            [];
+        {error, _} ->
+            error
+    end;
 process_blocklist_get(LUser, LServer, odbc) ->
     case catch
           mod_privacy:sql_get_default_privacy_list(LUser, LServer)
index 5f529bd28db4e10aee46160ec46704509ddeb2fe..df00dbfb1673ef81194f66ade3b11d219a43ccc1 100644 (file)
@@ -430,12 +430,49 @@ caps_read_fun(Node) ->
              [#caps_features{features = Features}] -> {ok, Features};
              _ -> error
            end
+    end;
+caps_read_fun(_LServer, Node, riak) ->
+    fun() ->
+            case ejabberd_riak:get(caps_features, Node) of
+                {ok, #caps_features{features = Features}} -> {ok, Features};
+                _ -> error
+            end
+    end;
+caps_read_fun(LServer, {Node, SubNode}, odbc) ->
+    fun() ->
+            SNode = ejabberd_odbc:escape(Node),
+            SSubNode = ejabberd_odbc:escape(SubNode),
+            case ejabberd_odbc:sql_query(
+                   LServer, [<<"select feature from caps_features where ">>,
+                             <<"node='">>, SNode, <<"' and subnode='">>,
+                             SSubNode, <<"';">>]) of
+                {selected, [<<"feature">>], [[H]|_] = Fs} ->
+                    case catch jlib:binary_to_integer(H) of
+                        Int when is_integer(Int), Int>=0 ->
+                            {ok, Int};
+                        _ ->
+                            {ok, lists:flatten(Fs)}
+                    end;
+                _ ->
+                    error
+            end
     end.
 
 caps_write_fun(Node, Features) ->
     fun () ->
            mnesia:dirty_write(#caps_features{node_pair = Node,
                                              features = Features})
+    end;
+caps_write_fun(_LServer, Node, Features, riak) ->
+    fun () ->
+            ejabberd_riak:put(#caps_features{node_pair = Node,
+                                             features = Features})
+    end;
+caps_write_fun(LServer, NodePair, Features, odbc) ->
+    fun () ->
+            ejabberd_odbc:sql_transaction(
+              LServer,
+              sql_write_features_t(NodePair, Features))
     end.
 
 make_my_disco_hash(Host) ->
index c7dda8303f4db6a63266b3eb6699c409a54ab6fe..0e52b2342a70d5d67c65258ccb2ea1f561175586 100644 (file)
@@ -591,6 +591,17 @@ get_data(_LServer, Host, From, mnesia) ->
       [] -> empty;
       [#irc_custom{data = Data}] -> Data
     end;
+get_data(LServer, Host, From, riak) ->
+    #jid{luser = LUser, lserver = LServer} = From,
+    US = {LUser, LServer},
+    case ejabberd_riak:get(irc_custom, {US, Host}) of
+        {ok, #irc_custom{data = Data}} ->
+            Data;
+        {error, notfound} ->
+            empty;
+        _Err ->
+            error
+    end;
 get_data(LServer, Host, From, odbc) ->
     SJID =
        ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
@@ -723,6 +734,11 @@ set_data(_LServer, Host, From, Data, mnesia) ->
                                         data = Data})
        end,
     mnesia:transaction(F);
+set_data(LServer, Host, From, Data, riak) ->
+    {LUser, LServer, _} = jlib:jid_tolower(From),
+    US = {LUser, LServer},
+    {atomic, ejabberd_riak:put(#irc_custom{us_host = {US, Host},
+                                           data = Data})};
 set_data(LServer, Host, From, Data, odbc) ->
     SJID =
        ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
index 6b7a06bed3cd1d138aaacf0738c6b677d597b139..22f9ded9765f7030652024e2325c711574a0113a 100644 (file)
@@ -168,6 +168,16 @@ get_last(LUser, LServer, mnesia) ->
                      status = Status}] ->
          {ok, TimeStamp, Status}
     end;
+get_last(LUser, LServer, riak) ->
+    case ejabberd_riak:get(last_activity, {LUser, LServer}) of
+        {ok, #last_activity{timestamp = TimeStamp,
+                            status = Status}} ->
+            {ok, TimeStamp, Status};
+        {error, notfound} ->
+            not_found;
+        Err ->
+            Err
+    end;
 get_last(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case catch odbc_queries:get_last(LServer, Username) of
@@ -235,6 +245,12 @@ store_last_info(LUser, LServer, TimeStamp, Status,
                                            status = Status})
        end,
     mnesia:transaction(F);
+store_last_info(LUser, LServer, TimeStamp, Status,
+                riak) ->
+    US = {LUser, LServer},
+    {atomic, ejabberd_riak:put(#last_activity{us = US,
+                                              timestamp = TimeStamp,
+                                              status = Status})};
 store_last_info(LUser, LServer, TimeStamp, Status,
                odbc) ->
     Username = ejabberd_odbc:escape(LUser),
@@ -264,7 +280,9 @@ remove_user(LUser, LServer, mnesia) ->
     mnesia:transaction(F);
 remove_user(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
-    odbc_queries:del_last(LServer, Username).
+    odbc_queries:del_last(LServer, Username);
+remove_user(LUser, LServer, riak) ->
+    {atomic, ejabberd_riak:delete(last_activity, {LUser, LServer})}.
 
 update_table() ->
     Fields = record_info(fields, last_activity),
index 160b9009d7712074ec9f605161f4ac1c607d92c8..408700909d1b6bfa36c9bd8189479e69c8e8f9c8 100644 (file)
@@ -147,6 +147,9 @@ store_room(_LServer, Host, Name, Opts, mnesia) ->
                                       opts = Opts})
        end,
     mnesia:transaction(F);
+store_room(_LServer, Host, Name, Opts, riak) ->
+    {atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host},
+                                         opts = Opts})};
 store_room(LServer, Host, Name, Opts, odbc) ->
     SName = ejabberd_odbc:escape(Name),
     SHost = ejabberd_odbc:escape(Host),
@@ -170,6 +173,11 @@ restore_room(_LServer, Host, Name, mnesia) ->
       [#muc_room{opts = Opts}] -> Opts;
       _ -> error
     end;
+restore_room(_LServer, Host, Name, riak) ->
+    case ejabberd_riak:get(muc_room, {Name, Host}) of
+        {ok, #muc_room{opts = Opts}} -> Opts;
+        _ -> error
+    end;
 restore_room(LServer, Host, Name, odbc) ->
     SName = ejabberd_odbc:escape(Name),
     SHost = ejabberd_odbc:escape(Host),
@@ -192,6 +200,8 @@ forget_room(_LServer, Host, Name, mnesia) ->
     F = fun () -> mnesia:delete({muc_room, {Name, Host}})
        end,
     mnesia:transaction(F);
+forget_room(_LServer, Host, Name, riak) ->
+    {atomic, ejabberd_riak:delete(muc_room, {Name, Host})};
 forget_room(LServer, Host, Name, odbc) ->
     SName = ejabberd_odbc:escape(Name),
     SHost = ejabberd_odbc:escape(Host),
@@ -231,6 +241,18 @@ can_use_nick(_LServer, Host, JID, Nick, mnesia) ->
       [] -> true;
       [#muc_registered{us_host = {U, _Host}}] -> U == LUS
     end;
+can_use_nick(LServer, Host, JID, Nick, riak) ->
+    {LUser, LServer, _} = jlib:jid_tolower(JID),
+    LUS = {LUser, LServer},
+    case ejabberd_riak:get_by_index(muc_registered,
+                                    <<"nick_host">>, {Nick, Host}) of
+        {ok, []} ->
+            true;
+        {ok, [#muc_registered{us_host = {U, _Host}}]} ->
+            U == LUS;
+        {error, _} ->
+            true
+    end;
 can_use_nick(LServer, Host, JID, Nick, odbc) ->
     SJID =
        jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(JID))),
@@ -617,6 +639,16 @@ get_rooms(_LServer, Host, mnesia) ->
       {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]), [];
       Rs -> Rs
     end;
+get_rooms(_LServer, Host, riak) ->
+    case ejabberd_riak:get(muc_room) of
+        {ok, Rs} ->
+            lists:filter(
+              fun(#muc_room{name_host = {_, H}}) ->
+                      Host == H
+              end, Rs);
+        _Err ->
+            []
+    end;
 get_rooms(LServer, Host, odbc) ->
     SHost = ejabberd_odbc:escape(Host),
     case catch ejabberd_odbc:sql_query(LServer,
@@ -839,6 +871,13 @@ get_nick(_LServer, Host, From, mnesia) ->
       [] -> error;
       [#muc_registered{nick = Nick}] -> Nick
     end;
+get_nick(LServer, Host, From, riak) ->
+    {LUser, LServer, _} = jlib:jid_tolower(From),
+    US = {LUser, LServer},
+    case ejabberd_riak:get(muc_registered, {US, Host}) of
+        {ok, #muc_registered{nick = Nick}} -> Nick;
+        {error, _} -> error
+    end;
 get_nick(LServer, Host, From, odbc) ->
     SJID =
        ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
@@ -922,6 +961,33 @@ set_nick(_LServer, Host, From, Nick, mnesia) ->
                end
        end,
     mnesia:transaction(F);
+set_nick(LServer, Host, From, Nick, riak) ->
+    {LUser, LServer, _} = jlib:jid_tolower(From),
+    LUS = {LUser, LServer},
+    {atomic,
+     case Nick of
+         <<"">> ->
+             ejabberd_riak:delete(muc_registered, {LUS, Host});
+         _ ->
+             Allow = case ejabberd_riak:get_by_index(
+                            muc_registered,
+                            <<"nick_host">>, {Nick, Host}) of
+                         {ok, []} ->
+                             true;
+                         {ok, [#muc_registered{us_host = {U, _Host}}]} ->
+                             U == LUS;
+                         {error, _} ->
+                             false
+                     end,
+             if Allow ->
+                     ejabberd_riak:put(#muc_registered{us_host = {LUS, Host},
+                                                       nick = Nick},
+                                       [{'2i', [{<<"nick_host">>,
+                                                 {Nick, Host}}]}]);
+                true ->
+                     false
+             end
+     end};
 set_nick(LServer, Host, From, Nick, odbc) ->
     JID =
        jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From))),
index 5e2d80aa28075c96db467e6cfa08341bbeeabf63..c5dc305d64a66fab1044ec3f99ee440cbb4af9b9 100644 (file)
@@ -187,43 +187,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
             discard_warn_sender(Msgs);
         true ->
             lists:foreach(
-              fun(M) ->
-                      Username = User,
-                      From = M#offline_msg.from,
-                      To = M#offline_msg.to,
-                      #xmlel{name = Name, attrs = Attrs,
-                             children = Els} =
-                          M#offline_msg.packet,
-                      Attrs2 = jlib:replace_from_to_attrs(
-                                 jlib:jid_to_string(From),
-                                 jlib:jid_to_string(To),
-                                 Attrs),
-                      Packet = #xmlel{name = Name,
-                                      attrs = Attrs2,
-                                      children =
-                                      Els ++
-                                      [jlib:timestamp_to_xml(
-                                         calendar:now_to_universal_time(
-                                           M#offline_msg.timestamp),
-                                         utc,
-                                         jlib:make_jid(<<"">>, Host, <<"">>),
-                                         <<"Offline Storage">>),
-                                       jlib:timestamp_to_xml(
-                                         calendar:now_to_universal_time(
-                                           M#offline_msg.timestamp))]},
-                      XML = xml:element_to_binary(Packet),
-                      {MegaSecs, Secs, MicroSecs} =
-                          M#offline_msg.timestamp,
-                      TS =
-                          iolist_to_binary(
-                            io_lib:format("~6..0w~6..0w.~6..0w",
-                                          [MegaSecs, Secs, MicroSecs])),
-                      ejabberd_riak:put(
-                        Host, <<"offline">>,
-                        undefined, XML,
-                        [{<<"user_bin">>, Username},
-                         {<<"timestamp_bin">>, TS}
-                        ])
+              fun(#offline_msg{us = US,
+                               timestamp = TS} = M) ->
+                      ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}])
               end, Msgs)
     end.
 
@@ -244,7 +210,7 @@ receive_all(US, Msgs, DBType) ->
                case DBType of
                  mnesia -> Msgs;
                  odbc -> lists:reverse(Msgs);
-                 riak -> lists:reverse(Msgs)
+                 riak -> Msgs
                end
     end.
 
@@ -474,41 +440,30 @@ pop_offline_messages(Ls, LUser, LServer, odbc) ->
       _ -> Ls
     end;
 pop_offline_messages(Ls, LUser, LServer, riak) ->
-    Username = LUser,
-    case ejabberd_riak:get_objects_by_index(
-           LServer, <<"offline">>, <<"user_bin">>, Username) of
+    case ejabberd_riak:get_by_index(offline_msg,
+                                    <<"us">>, {LUser, LServer}) of
         {ok, Rs} ->
-            SortedRs =
-                lists:sort(fun(X, Y) ->
-                                   MX = riak_object:get_metadata(X),
-                                   {ok, IX} = dict:find(<<"index">>, MX),
-                                   {value, TSX} = lists:keysearch(
-                                                    <<"timestamp_bin">>, 1,
-                                                    IX),
-                                   MY = riak_object:get_metadata(Y),
-                                   {ok, IY} = dict:find(<<"index">>, MY),
-                                   {value, TSY} = lists:keysearch(
-                                                    <<"timestamp_bin">>, 1,
-                                                    IY),
-                                   TSX =< TSY
-                           end, Rs),
-            Ls ++ lists:flatmap(
-                   fun(R) ->
-                            Key = riak_object:key(R),
-                            ejabberd_riak:delete(LServer, <<"offline">>, Key),
-                            XML = riak_object:get_value(R),
-                           case xml_stream:parse_element(XML) of
-                               {error, _Reason} ->
-                                   [];
-                               El ->
-                                    case offline_msg_to_route(LServer, El) of
-                                        error ->
-                                            [];
-                                        RouteMsg ->
-                                            [RouteMsg]
-                                    end
-                           end
-                   end, SortedRs);
+            try
+                lists:foreach(
+                  fun(#offline_msg{timestamp = T}) ->
+                          ok = ejabberd_riak:delete(offline_msg, T)
+                  end, Rs),
+                TS = now(),
+                Ls ++ lists:map(
+                        fun (R) ->
+                                offline_msg_to_route(LServer, R)
+                        end,
+                        lists:filter(
+                          fun(R) ->
+                                  case R#offline_msg.expire of
+                                      never -> true;
+                                      TimeStamp -> TS < TimeStamp
+                                  end
+                          end,
+                          lists:keysort(#offline_msg.timestamp, Rs)))
+            catch _:{badmatch, _} ->
+                    Ls
+            end;
        _ ->
            Ls
     end.
@@ -579,17 +534,8 @@ remove_user(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     odbc_queries:del_spool_msg(LServer, Username);
 remove_user(LUser, LServer, riak) ->
-    Username = LUser,
-    case ejabberd_riak:get_keys_by_index(
-           LServer, <<"offline">>, <<"user_bin">>, Username) of
-        {ok, Keys} ->
-            lists:foreach(
-              fun(Key) ->
-                      ejabberd_riak:delete(LServer, <<"offline">>, Key)
-              end, Keys);
-        _ ->
-            ok
-    end.
+    {atomic, ejabberd_riak:delete_by_index(offline_msg,
+                                           <<"us">>, {LUser, LServer})}.
 
 jid_to_binary(#jid{user = U, server = S, resource = R,
                    luser = LU, lserver = LS, lresource = LR}) ->
@@ -650,6 +596,8 @@ get_offline_els(LUser, LServer) ->
 
 get_offline_els(LUser, LServer, mnesia) ->
     Msgs = read_all_msgs(LUser, LServer, mnesia),
+get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak ->
+    Msgs = read_all_msgs(LUser, LServer, DBType),
     lists:map(
       fun(Msg) ->
               {route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
@@ -706,6 +654,14 @@ read_all_msgs(LUser, LServer, mnesia) ->
     US = {LUser, LServer},
     lists:keysort(#offline_msg.timestamp,
                  mnesia:dirty_read({offline_msg, US}));
+read_all_msgs(LUser, LServer, riak) ->
+    case ejabberd_riak:get_by_index(
+           offline_msg, <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+            lists:keysort(#offline_msg.timestamp, Rs);
+        _Err ->
+            []
+    end;
 read_all_msgs(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case catch ejabberd_odbc:sql_query(LServer,
@@ -723,7 +679,7 @@ read_all_msgs(LUser, LServer, odbc) ->
       _ -> []
     end.
 
-format_user_queue(Msgs, mnesia) ->
+format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
     lists:map(fun (#offline_msg{timestamp = TimeStamp,
                                from = From, to = To,
                                packet =
@@ -831,6 +787,26 @@ user_queue_parse_query(LUser, LServer, Query, mnesia) ->
          ok;
       false -> nothing
     end;
+user_queue_parse_query(LUser, LServer, Query, riak) ->
+    case lists:keysearch(<<"delete">>, 1, Query) of
+        {value, _} ->
+            Msgs = read_all_msgs(LUser, LServer, riak),
+            lists:foreach(
+              fun (Msg) ->
+                      ID = jlib:encode_base64((term_to_binary(Msg))),
+                      case lists:member({<<"selected">>, ID}, Query) of
+                          true ->
+                              ejabberd_riak:delete(offline_msg,
+                                                   Msg#offline_msg.timestamp);
+                          false ->
+                              ok
+                      end
+              end,
+              Msgs),
+            ok;
+        false ->
+            nothing
+    end;
 user_queue_parse_query(LUser, LServer, Query, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case lists:keysearch(<<"delete">>, 1, Query) of
@@ -889,6 +865,14 @@ get_queue_length(LUser, LServer) ->
 get_queue_length(LUser, LServer, mnesia) ->
     length(mnesia:dirty_read({offline_msg,
                               {LUser, LServer}}));
+get_queue_length(LUser, LServer, riak) ->
+    case ejabberd_riak:count_by_index(offline_msg,
+                                      <<"us">>, {LUser, LServer}) of
+        {ok, N} ->
+            N;
+        _ ->
+            0
+    end;
 get_queue_length(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case catch ejabberd_odbc:sql_query(LServer,
@@ -917,7 +901,8 @@ get_messages_subset(User, Host, MsgsAll, DBType) ->
 get_messages_subset2(Max, Length, MsgsAll, _DBType)
     when Length =< Max * 2 ->
     MsgsAll;
-get_messages_subset2(Max, Length, MsgsAll, mnesia) ->
+get_messages_subset2(Max, Length, MsgsAll, DBType)
+  when DBType == mnesia; DBType == riak ->
     FirstN = Max,
     {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
     MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
@@ -965,6 +950,10 @@ delete_all_msgs(LUser, LServer, mnesia) ->
                              mnesia:dirty_read({offline_msg, US}))
        end,
     mnesia:transaction(F);
+delete_all_msgs(LUser, LServer, riak) ->
+    Res = ejabberd_riak:delete_by_index(offline_msg,
+                                        <<"us">>, {LUser, LServer}),
+    {atomic, Res};
 delete_all_msgs(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     odbc_queries:del_spool_msg(LServer, Username),
@@ -987,16 +976,44 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
     Acc.
 
 %% Returns as integer the number of offline messages for a given user
-count_offline_messages(LUser, LServer) ->
+count_offline_messages(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    DBType = gen_mod:db_type(LServer, ?MODULE),
+    count_offline_messages(LUser, LServer, DBType).
+
+count_offline_messages(LUser, LServer, mnesia) ->
+    US = {LUser, LServer},
+    F = fun () ->
+               p1_mnesia:count_records(offline_msg,
+                                       #offline_msg{us = US, _ = '_'})
+       end,
+    case catch mnesia:async_dirty(F) of
+      I when is_integer(I) -> I;
+      _ -> 0
+    end;
+count_offline_messages(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
-    case catch odbc_queries:count_records_where(
-                LServer, "spool",
-                 <<"where username='", Username/binary, "'">>) of
-        {selected, [_], [[Res]]} ->
-            jlib:binary_to_integer(Res);
+    case catch odbc_queries:count_records_where(LServer,
+                                               <<"spool">>,
+                                               <<"where username='",
+                                                 Username/binary, "'">>)
+       of
+      {selected, [_], [[Res]]} ->
+         jlib:binary_to_integer(Res);
+      _ -> 0
+    end;
+count_offline_messages(LUser, LServer, riak) ->
+    case ejabberd_riak:count_by_index(
+           offline_msg, <<"us">>, {LUser, LServer}) of
+        {ok, Res} ->
+            Res;
         _ ->
             0
-    end.
+    end;
+count_offline_messages(_Acc, User, Server) ->
+    N = count_offline_messages(User, Server),
+    {stop, N}.
 
 export(_Server) ->
     [{offline_msg,
index 6b852bb479df3dab3fc9533ad5e758b88e4bed31..2286875e366c2087ca01d23ecb79d5db53e5fc36 100644 (file)
@@ -160,6 +160,21 @@ process_lists_get(LUser, LServer, _Active, mnesia) ->
                             Lists),
          {Default, LItems}
     end;
+process_lists_get(LUser, LServer, _Active, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{default = Default, lists = Lists}} ->
+            LItems = lists:map(fun ({N, _}) ->
+                                       #xmlel{name = <<"list">>,
+                                              attrs = [{<<"name">>, N}],
+                                              children = []}
+                               end,
+                               Lists),
+            {Default, LItems};
+        {error, notfound} ->
+            {none, []};
+        {error, _} ->
+            error
+    end;
 process_lists_get(LUser, LServer, _Active, odbc) ->
     Default = case catch sql_get_default_privacy_list(LUser,
                                                      LServer)
@@ -209,6 +224,18 @@ process_list_get(LUser, LServer, Name, mnesia) ->
            _ -> not_found
          end
     end;
+process_list_get(LUser, LServer, Name, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{lists = Lists}} ->
+            case lists:keysearch(Name, 1, Lists) of
+                {value, {_, List}} -> List;
+                _ -> not_found
+            end;
+        {error, notfound} ->
+            not_found;
+        {error, _} ->
+            error
+    end;
 process_list_get(LUser, LServer, Name, odbc) ->
     case catch sql_get_privacy_list_id(LUser, LServer, Name)
        of
@@ -354,6 +381,20 @@ process_default_set(LUser, LServer, {value, Name},
                end
        end,
     mnesia:transaction(F);
+process_default_set(LUser, LServer, {value, Name}, riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {ok, #privacy{lists = Lists} = P} ->
+             case lists:keymember(Name, 1, Lists) of
+                 true ->
+                     ejabberd_riak:put(P#privacy{default = Name,
+                                                 lists = Lists});
+                 false ->
+                     not_found
+             end;
+         {error, _} ->
+             not_found
+     end};
 process_default_set(LUser, LServer, {value, Name},
                    odbc) ->
     F = fun () ->
@@ -375,6 +416,14 @@ process_default_set(LUser, LServer, false, mnesia) ->
                end
        end,
     mnesia:transaction(F);
+process_default_set(LUser, LServer, false, riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {ok, R} ->
+             ejabberd_riak:put(R#privacy{default = none});
+         {error, _} ->
+             ok
+     end};
 process_default_set(LUser, LServer, false, odbc) ->
     case catch sql_unset_default_privacy_list(LUser,
                                              LServer)
@@ -407,6 +456,16 @@ process_active_set(LUser, LServer, Name, mnesia) ->
            false -> error
          end
     end;
+process_active_set(LUser, LServer, Name, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{lists = Lists}} ->
+            case lists:keysearch(Name, 1, Lists) of
+                {value, {_, List}} -> List;
+                false -> error
+            end;
+        {error, _} ->
+            error
+    end;
 process_active_set(LUser, LServer, Name, odbc) ->
     case catch sql_get_privacy_list_id(LUser, LServer, Name)
        of
@@ -438,6 +497,19 @@ remove_privacy_list(LUser, LServer, Name, mnesia) ->
                end
        end,
     mnesia:transaction(F);
+remove_privacy_list(LUser, LServer, Name, riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {ok, #privacy{default = Default, lists = Lists} = P} ->
+             if Name == Default ->
+                     conflict;
+                true ->
+                     NewLists = lists:keydelete(Name, 1, Lists),
+                     ejabberd_riak:put(P#privacy{lists = NewLists})
+             end;
+         {error, _} ->
+             ok
+     end};
 remove_privacy_list(LUser, LServer, Name, odbc) ->
     F = fun () ->
                case sql_get_default_privacy_list_t(LUser) of
@@ -465,6 +537,18 @@ set_privacy_list(LUser, LServer, Name, List, mnesia) ->
                end
        end,
     mnesia:transaction(F);
+set_privacy_list(LUser, LServer, Name, List, riak) ->
+    {atomic,
+     case ejabberd_riak:get(privacy, {LUser, LServer}) of
+         {ok, #privacy{lists = Lists} = P} ->
+             NewLists1 = lists:keydelete(Name, 1, Lists),
+             NewLists = [{Name, List} | NewLists1],
+             ejabberd_riak:put(P#privacy{lists = NewLists});
+         {error, _} ->
+             NewLists = [{Name, List}],
+             ejabberd_riak:put(#privacy{us = {LUser, LServer},
+                                        lists = NewLists})
+     end};
 set_privacy_list(LUser, LServer, Name, List, odbc) ->
     RItems = lists:map(fun item_to_raw/1, List),
     F = fun () ->
@@ -649,6 +733,20 @@ get_user_list(_, LUser, LServer, mnesia) ->
          end;
       _ -> {none, []}
     end;
+get_user_list(_, LUser, LServer, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{default = Default, lists = Lists}} ->
+            case Default of
+                none -> {none, []};
+                _ ->
+                    case lists:keysearch(Default, 1, Lists) of
+                        {value, {_, List}} -> {Default, List};
+                        _ -> {none, []}
+                    end
+            end;
+        {error, _} ->
+            {none, []}
+    end;
 get_user_list(_, LUser, LServer, odbc) ->
     case catch sql_get_default_privacy_list(LUser, LServer)
        of
@@ -680,6 +778,13 @@ get_user_lists(LUser, LServer, mnesia) ->
         _ ->
             error
     end;
+get_user_lists(LUser, LServer, riak) ->
+    case ejabberd_riak:get(privacy, {LUser, LServer}) of
+        {ok, #privacy{} = P} ->
+            {ok, P};
+        {error, _} ->
+            error
+    end;
 get_user_lists(LUser, LServer, odbc) ->
     Default = case catch sql_get_default_privacy_list(LUser, LServer) of
                   {selected, [<<"name">>], []} ->
@@ -843,6 +948,8 @@ remove_user(LUser, LServer, mnesia) ->
     F = fun () -> mnesia:delete({privacy, {LUser, LServer}})
        end,
     mnesia:transaction(F);
+remove_user(LUser, LServer, riak) ->
+    {atomic, ejabberd_riak:delete(privacy, {LUser, LServer})};
 remove_user(LUser, LServer, odbc) ->
     sql_del_privacy_lists(LUser, LServer).
 
index 301880e97fa684b154580c17a8b0fecd19544014..a925eecdf2d8ed58f7079ed4de8070143b96abe3 100644 (file)
@@ -152,13 +152,9 @@ set_data(LUser, LServer, {XMLNS, El}, odbc) ->
     odbc_queries:set_private_data(LServer, Username, LXMLNS,
                                  SData);
 set_data(LUser, LServer, {XMLNS, El}, riak) ->
-    Username = LUser,
-    Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
-    SData = xml:element_to_binary(El),
-    ejabberd_riak:put(
-      LServer, <<"private">>, Key, SData,
-      [{<<"user_bin">>, Username}]),
-    ok.
+    ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS},
+                                       xml = El},
+                      [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
 
 get_data(LUser, LServer, Data) ->
     get_data(LUser, LServer,
@@ -195,18 +191,13 @@ get_data(LUser, LServer, odbc, [{XMLNS, El} | Els],
     end;
 get_data(LUser, LServer, riak, [{XMLNS, El} | Els],
         Res) ->
-    Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
-    case ejabberd_riak:get(LServer, <<"private">>, Key) of
-        {ok, SData} ->
-            case xml_stream:parse_element(SData) of
-                Data when element(1, Data) == xmlelement ->
-                    get_data(LUser, LServer, riak, Els, [Data | Res])
-            end;
-        _ -> 
-            get_data(LUser, LServer, riak, Els, [El | Res])
+    case ejabberd_riak:get(private_storage, {LUser, LServer, XMLNS}) of
+        {ok, #private_storage{xml = NewEl}} ->
+            get_data(LUser, LServer, riak, Els, [NewEl|Res]);
+        _ ->
+            get_data(LUser, LServer, riak, Els, [El|Res])
     end.
 
-
 get_data(LUser, LServer) ->
     get_all_data(LUser, LServer,
                  gen_mod:db_type(LServer, ?MODULE)).
@@ -234,19 +225,10 @@ get_all_data(LUser, LServer, odbc) ->
             []
     end;
 get_all_data(LUser, LServer, riak) ->
-    Username = LUser,
     case ejabberd_riak:get_by_index(
-           LServer, <<"private">>, <<"user_bin">>, Username) of
+           private_storage, <<"us">>, {LUser, LServer}) of
         {ok, Res} ->
-            lists:flatmap(
-              fun(SData) ->
-                      case xml_stream:parse_element(SData) of
-                          #xmlel{} = El ->
-                              [El];
-                          _ ->
-                              []
-                      end
-              end, Res);
+            [El || #private_storage{xml = El} <- Res];
         _ ->
             []
     end.
@@ -279,17 +261,8 @@ remove_user(LUser, LServer, odbc) ->
     odbc_queries:del_user_private_storage(LServer,
                                          Username);
 remove_user(LUser, LServer, riak) ->
-    Username = LUser,
-    case ejabberd_riak:get_keys_by_index(
-           LServer, <<"private">>, <<"user_bin">>, Username) of
-        {ok, Keys} ->
-            lists:foreach(
-              fun(Key) ->
-                      ejabberd_riak:delete(LServer, <<"private">>, Key)
-              end, Keys);
-        _ ->
-            ok
-    end.
+    {atomic, ejabberd_riak:delete_by_index(private_storage,
+                                           <<"us">>, {LUser, LServer})}.
 
 update_table() ->
     Fields = record_info(fields, private_storage),
index 8ef70eb598540cba8a6222a84b9070613dcbea00..24386e8e86b57428588aed9bf3192d5b2f8fef2e 100644 (file)
@@ -206,11 +206,9 @@ read_roster_version(LUser, LServer, odbc) ->
       {selected, [<<"version">>], []} -> error
     end;
 read_roster_version(LServer, LUser, riak) ->
-    Username = LUser,
-    case ejabberd_riak:get(LServer, <<"roster_version">>,
-                           Username) of
+    case ejabberd_riak:get(roster_version, {LUser, LServer}) of
         {ok, Version} -> Version;
-        {error, notfound} -> error
+        _Err -> error
     end.
 
 write_roster_version(LUser, LServer) ->
@@ -249,8 +247,8 @@ write_roster_version(LUser, LServer, InTransaction, Ver,
     end;
 write_roster_version(LUser, LServer, _InTransaction, Ver,
                     riak) ->
-    Username = LUser,
-    riak_set_roster_version(LServer, Username, Ver).
+    US = {LUser, LServer},
+    ejabberd_riak:put(#roster_version{us = US, version = Ver}).
 
 %% Load roster from DB only if neccesary. 
 %% It is neccesary if
@@ -358,6 +356,11 @@ get_roster(LUser, LServer, mnesia) ->
       Items  when is_list(Items)-> Items;
       _ -> []
     end;
+get_roster(LUser, LServer, riak) ->
+    case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
+        {ok, Items} -> Items;
+        _Err -> []
+    end;
 get_roster(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case catch odbc_queries:get_roster(LServer, Username) of
@@ -399,37 +402,6 @@ get_roster(LUser, LServer, odbc) ->
                                 Items),
          RItems;
       _ -> []
-    end;
-get_roster(LUser, LServer, riak) ->
-    Username = LUser,
-    case catch riak_get_roster(LServer, Username) of
-       {ok, Items} when is_list(Items) ->
-           JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
-                           {ok, JGrps} when is_list(JGrps) ->
-                               JGrps;
-                           _ ->
-                               []
-                       end,
-            GroupsDict = dict:from_list(JIDGroups),
-           RItems = lists:flatmap(
-                      fun(I) ->
-                              case riak_raw_to_record(LServer, I) of
-                                  %% Bad JID in database:
-                                  error ->
-                                      [];
-                                  R ->
-                                      SJID = jlib:jid_to_string(R#roster.jid),
-                                      Groups =
-                                           case dict:find(SJID, GroupsDict) of
-                                               {ok, Gs} -> Gs;
-                                               error -> []
-                                           end,
-                                      [R#roster{groups = Groups}]
-                              end
-                      end, Items),
-           RItems;
-       _ ->
-           []
     end.
 
 item_to_xml(Item) ->
@@ -499,29 +471,15 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) ->
          end
     end;
 get_roster_by_jid_t(LUser, LServer, LJID, riak) ->
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    Res = riak_get_roster_by_jid(LServer, Username, SJID),
-    case Res of
-        {error, _} ->
-            #roster{usj = {LUser, LServer, LJID},
-                    us = {LUser, LServer},
-                    jid = LJID};
+    case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
         {ok, I} ->
-            R = riak_raw_to_record(LServer, I),
-            case R of
-                %% Bad JID in database:
-                error ->
-                    #roster{usj = {LUser, LServer, LJID},
-                            us = {LUser, LServer},
-                            jid = LJID};
-                _ ->
-                    R#roster{
-                      usj = {LUser, LServer, LJID},
-                      us = {LUser, LServer},
-                      jid = LJID,
-                      name = ""}
-            end
+            I#roster{jid = LJID, name = <<"">>, groups = [],
+                     xs = []};
+        {error, notfound} ->
+            #roster{usj = {LUser, LServer, LJID},
+                    us = {LUser, LServer}, jid = LJID};
+        Err ->
+            exit(Err)
     end.
 
 try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
@@ -702,12 +660,9 @@ get_subscription_lists(_, LUser, LServer, odbc) ->
       _ -> []
     end;
 get_subscription_lists(_, LUser, LServer, riak) ->
-    Username = LUser,
-    case catch riak_get_roster(LServer, Username) of
-       {ok, Items} when is_list(Items) ->
-            lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items);
-       _ ->
-           []
+    case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
+        {ok, Items} -> Items;
+        _Err -> []
     end.
 
 fill_subscription_lists(LServer, [#roster{} = I | Is],
@@ -747,11 +702,9 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) ->
     SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
     odbc_queries:roster_subscribe(LServer, Username, SJID,
                                  ItemVals);
-roster_subscribe_t(LUser, LServer, LJID, Item, riak) ->
-    ItemVals = riak_record_to_string(Item),
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    riak_roster_subscribe(LServer, Username, SJID, ItemVals).
+roster_subscribe_t(LUser, LServer, _LJID, Item, riak) ->
+    ejabberd_riak:put(Item,
+                      [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
 
 transaction(LServer, F) ->
     case gen_mod:db_type(LServer, ?MODULE) of
@@ -810,23 +763,14 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID,
                  us = {LUser, LServer}, jid = LJID}
     end;
 get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) ->
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    case riak_get_roster_by_jid(LServer, Username, SJID) of
+    case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
         {ok, I} ->
-            R = riak_raw_to_record(LServer, I),
-            Groups =
-                case riak_get_roster_groups(LServer, Username, SJID) of
-                    {ok, JGrps} when is_list(JGrps) ->
-                        JGrps;
-                    _ ->
-                        []
-                end,
-            R#roster{groups = Groups};
-        {error, _} ->
+            I;
+        {error, notfound} ->
             #roster{usj = {LUser, LServer, LJID},
-                    us = {LUser, LServer},
-                    jid = LJID}
+                    us = {LUser, LServer}, jid = LJID};
+        Err ->
+            exit(Err)
     end.
 
 process_subscription(Direction, User, Server, JID1,
@@ -1040,9 +984,7 @@ remove_user(LUser, LServer, odbc) ->
     odbc_queries:del_user_roster_t(LServer, Username),
     ok;
 remove_user(LUser, LServer, riak) ->
-    Username = LUser,
-    riak_del_user_roster(LServer, Username),
-    ok.
+    {atomic, ejabberd_riak:delete_by_index(roster, <<"us">>, {LUser, LServer})}.
 
 %% For each contact with Subscription:
 %% Both or From, send a "unsubscribed" presence stanza;
@@ -1114,13 +1056,9 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) ->
     ItemGroups = groups_to_string(Item),
     odbc_queries:update_roster(LServer, Username, SJID, ItemVals,
                                ItemGroups);
-update_roster_t(LUser, LServer, LJID, Item, riak) ->
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    ItemVals = riak_record_to_string(Item),
-    ItemGroups = riak_groups_to_binary(Item),
-    riak_update_roster(
-      LServer, Username, SJID, ItemVals, ItemGroups).
+update_roster_t(LUser, LServer, _LJID, Item, riak) ->
+    ejabberd_riak:put(Item,
+                      [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
 
 del_roster_t(LUser, LServer, LJID) ->
     DBType = gen_mod:db_type(LServer, ?MODULE),
@@ -1133,9 +1071,7 @@ del_roster_t(LUser, LServer, LJID, odbc) ->
     SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
     odbc_queries:del_roster(LServer, Username, SJID);
 del_roster_t(LUser, LServer, LJID, riak) ->
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    riak_del_roster(LServer, Username, SJID).
+    ejabberd_riak:delete(roster, {LUser, LServer, LJID}).
 
 process_item_set_t(LUser, LServer,
                   #xmlel{attrs = Attrs, children = Els}) ->
@@ -1201,40 +1137,35 @@ get_in_pending_subscriptions(Ls, User, Server) ->
     get_in_pending_subscriptions(Ls, User, Server,
                                 gen_mod:db_type(LServer, ?MODULE)).
 
-get_in_pending_subscriptions(Ls, User, Server,
-                            mnesia) ->
+get_in_pending_subscriptions(Ls, User, Server, DBType)
+  when DBType == mnesia; DBType == riak ->
     JID = jlib:make_jid(User, Server, <<"">>),
-    US = {JID#jid.luser, JID#jid.lserver},
-    case mnesia:dirty_index_read(roster, US, #roster.us) of
-      Result when is_list(Result) ->
-         Ls ++
-           lists:map(fun (R) ->
-                             Message = R#roster.askmessage,
-                             Status = if is_binary(Message) -> (Message);
-                                         true -> <<"">>
-                                      end,
-                             #xmlel{name = <<"presence">>,
-                                    attrs =
-                                        [{<<"from">>,
-                                          jlib:jid_to_string(R#roster.jid)},
-                                         {<<"to">>, jlib:jid_to_string(JID)},
-                                         {<<"type">>, <<"subscribe">>}],
-                                    children =
-                                        [#xmlel{name = <<"status">>,
-                                                attrs = [],
-                                                children =
-                                                    [{xmlcdata, Status}]}]}
-                     end,
-                     lists:filter(fun (R) ->
-                                          case R#roster.ask of
-                                            in -> true;
-                                            both -> true;
-                                            _ -> false
-                                          end
-                                  end,
-                                  Result));
-      _ -> Ls
-    end;
+    Result = get_roster(JID#jid.luser, JID#jid.lserver, DBType),
+    Ls ++ lists:map(fun (R) ->
+                            Message = R#roster.askmessage,
+                            Status = if is_binary(Message) -> (Message);
+                                        true -> <<"">>
+                                     end,
+                            #xmlel{name = <<"presence">>,
+                                   attrs =
+                                       [{<<"from">>,
+                                         jlib:jid_to_string(R#roster.jid)},
+                                        {<<"to">>, jlib:jid_to_string(JID)},
+                                        {<<"type">>, <<"subscribe">>}],
+                                   children =
+                                       [#xmlel{name = <<"status">>,
+                                               attrs = [],
+                                               children =
+                                                   [{xmlcdata, Status}]}]}
+                    end,
+                    lists:filter(fun (R) ->
+                                         case R#roster.ask of
+                                             in -> true;
+                                             both -> true;
+                                             _ -> false
+                                         end
+                                 end,
+                                 Result));
 get_in_pending_subscriptions(Ls, User, Server, odbc) ->
     JID = jlib:make_jid(User, Server, <<"">>),
     LUser = JID#jid.luser,
@@ -1276,44 +1207,6 @@ get_in_pending_subscriptions(Ls, User, Server, odbc) ->
                                    end,
                                    Items));
       _ -> Ls
-    end;
-get_in_pending_subscriptions(Ls, User, Server, riak) ->
-    JID = jlib:make_jid(User, Server, <<"">>),
-    LUser = JID#jid.luser,
-    LServer = JID#jid.lserver,
-    Username = LUser,
-    case catch riak_get_roster(LServer, Username) of
-       {ok, Items} when is_list(Items) ->
-           Ls ++ lists:map(
-                   fun(R) ->
-                           Message = R#roster.askmessage,
-                            #xmlel{name = <<"presence">>,
-                                   attrs = [{<<"from">>,
-                                             jlib:jid_to_string(R#roster.jid)},
-                                            {<<"to">>, jlib:jid_to_string(JID)},
-                                            {<<"type">>, <<"subscribe">>}],
-                                   children = [#xmlel{name = <<"status">>,
-                                                      attrs = [],
-                                                      children =
-                                                      [{xmlcdata, Message}]}]}
-                   end,
-                   lists:flatmap(
-                     fun(I) ->
-                             case riak_raw_to_record(LServer, I) of
-                                 %% Bad JID in database:
-                                 error ->
-                                     [];
-                                 R ->
-                                     case R#roster.ask of
-                                         in   -> [R];
-                                         both -> [R];
-                                         _ -> []
-                                     end
-                             end
-                     end,
-                     Items));
-       _ ->
-           Ls
     end.
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1361,18 +1254,12 @@ read_subscription_and_groups(LUser, LServer, LJID,
     end;
 read_subscription_and_groups(LUser, LServer, LJID,
                             riak) ->
-    Username = LUser,
-    SJID = jlib:jid_to_string(LJID),
-    case catch riak_get_subscription(LServer, Username, SJID) of
-       {ok, Subscription} ->
-           Groups = case riak_get_roster_jid_groups(LServer, Username) of
-                         {ok, JGrps} when is_list(JGrps) ->
-                             JGrps;
-                         _ ->
-                             []
-                     end,
-         {Subscription, Groups};
-      _ -> error
+    case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
+        {ok, #roster{subscription = Subscription,
+                     groups = Groups}} ->
+            {Subscription, Groups};
+        _ ->
+            error
     end.
 
 get_jid_info(_, User, Server, JID) ->
index 8a1423c765fe37918dbac1f22f3a851133d71ed3..58a79d92073a9ea24e9e1584ab9b9e41d15b51c9 100644 (file)
@@ -400,6 +400,13 @@ list_groups(Host, mnesia) ->
     mnesia:dirty_select(sr_group,
                        [{#sr_group{group_host = {'$1', '$2'}, _ = '_'},
                          [{'==', '$2', Host}], ['$1']}]);
+list_groups(Host, riak) ->
+    case ejabberd_riak:get_keys_by_index(sr_group, <<"host">>, Host) of
+        {ok, Gs} ->
+            [G || {G, _} <- Gs];
+        _ ->
+            []
+    end;
 list_groups(Host, odbc) ->
     case ejabberd_odbc:sql_query(Host,
                                 [<<"select name from sr_group;">>])
@@ -417,6 +424,13 @@ groups_with_opts(Host, mnesia) ->
                                         _ = '_'},
                               [], [['$1', '$2']]}]),
     lists:map(fun ([G, O]) -> {G, O} end, Gs);
+groups_with_opts(Host, riak) ->
+    case ejabberd_riak:get_by_index(sr_group, <<"host">>, Host) of
+        {ok, Rs} ->
+            [{G, O} || #sr_group{group_host = {G, _}, opts = O} <- Rs];
+        _ ->
+            []
+    end;
 groups_with_opts(Host, odbc) ->
     case ejabberd_odbc:sql_query(Host,
                                 [<<"select name, opts from sr_group;">>])
@@ -438,6 +452,10 @@ create_group(Host, Group, Opts, mnesia) ->
     R = #sr_group{group_host = {Group, Host}, opts = Opts},
     F = fun () -> mnesia:write(R) end,
     mnesia:transaction(F);
+create_group(Host, Group, Opts, riak) ->
+    {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
+                                         opts = Opts},
+                               [{'2i', [{<<"host">>, Host}]}])};
 create_group(Host, Group, Opts, odbc) ->
     SGroup = ejabberd_odbc:escape(Group),
     SOpts = ejabberd_odbc:encode_term(Opts),
@@ -464,6 +482,15 @@ delete_group(Host, Group, mnesia) ->
                              Users)
        end,
     mnesia:transaction(F);
+delete_group(Host, Group, riak) ->
+    try
+        ok = ejabberd_riak:delete(sr_group, {Group, Host}),
+        ok = ejabberd_riak:delete_by_index(sr_user, <<"group_host">>,
+                                           {Group, Host}),
+        {atomic, ok}
+    catch _:{badmatch, Err} ->
+            {atomic, Err}
+    end;
 delete_group(Host, Group, odbc) ->
     SGroup = ejabberd_odbc:escape(Group),
     F = fun () ->
@@ -483,6 +510,11 @@ get_group_opts(Host, Group, mnesia) ->
       [#sr_group{opts = Opts}] -> Opts;
       _ -> error
     end;
+get_group_opts(Host, Group, riak) ->
+    case ejabberd_riak:get(sr_group, {Group, Host}) of
+        {ok, #sr_group{opts = Opts}} -> Opts;
+        _ -> error
+    end;
 get_group_opts(Host, Group, odbc) ->
     SGroup = ejabberd_odbc:escape(Group),
     case catch ejabberd_odbc:sql_query(Host,
@@ -502,6 +534,10 @@ set_group_opts(Host, Group, Opts, mnesia) ->
     R = #sr_group{group_host = {Group, Host}, opts = Opts},
     F = fun () -> mnesia:write(R) end,
     mnesia:transaction(F);
+set_group_opts(Host, Group, Opts, riak) ->
+    {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
+                                         opts = Opts},
+                               [{'2i', [{<<"host">>, Host}]}])};
 set_group_opts(Host, Group, Opts, odbc) ->
     SGroup = ejabberd_odbc:escape(Group),
     SOpts = ejabberd_odbc:encode_term(Opts),
@@ -525,6 +561,13 @@ get_user_groups(US, Host, mnesia) ->
           || #sr_user{group_host = {Group, H}} <- Rs, H == Host];
       _ -> []
     end;
+get_user_groups(US, Host, riak) ->
+    case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
+        {ok, Rs} ->
+            [Group || #sr_user{group_host = {Group, H}} <- Rs, H == Host];
+        _ ->
+            []
+    end;
 get_user_groups(US, Host, odbc) ->
     SJID = make_jid_s(US),
     case catch ejabberd_odbc:sql_query(Host,
@@ -595,6 +638,14 @@ get_group_explicit_users(Host, Group, mnesia) ->
       Rs when is_list(Rs) -> [R#sr_user.us || R <- Rs];
       _ -> []
     end;
+get_group_explicit_users(Host, Group, riak) ->
+    case ejabberd_riak:get_by_index(sr_user, <<"group_host">>,
+                                    {Group, Host}) of
+        {ok, Rs} ->
+            [R#sr_user.us || R <- Rs];
+        _ ->
+            []
+    end;
 get_group_explicit_users(Host, Group, odbc) ->
     SGroup = ejabberd_odbc:escape(Group),
     case catch ejabberd_odbc:sql_query(Host,
@@ -680,6 +731,16 @@ get_user_displayed_groups(LUser, LServer, GroupsOpts,
              H == LServer];
       _ -> []
     end;
+get_user_displayed_groups(LUser, LServer, GroupsOpts,
+                          riak) ->
+    case ejabberd_riak:get_by_index(sr_user,
+                                    <<"us">>, {LUser, LServer}) of
+        {ok, Rs} ->
+            [{Group, proplists:get_value(Group, GroupsOpts, [])}
+             || #sr_user{group_host = {Group, _}} <- Rs];
+        _ ->
+            []
+    end;
 get_user_displayed_groups(LUser, LServer, GroupsOpts,
                          odbc) ->
     SJID = make_jid_s(LUser, LServer),
@@ -726,6 +787,21 @@ is_user_in_group(US, Group, Host, mnesia) ->
       [] -> lists:member(US, get_group_users(Host, Group));
       _ -> true
     end;
+is_user_in_group(US, Group, Host, riak) ->
+    case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
+        {ok, Rs} ->
+            case lists:any(
+                   fun(#sr_user{group_host = {G, H}}) ->
+                           (Group == G) and (Host == H)
+                   end, Rs) of
+                false ->
+                    lists:member(US, get_group_users(Host, Group));
+                true ->
+                    true
+            end;
+        _Err ->
+            false
+    end;
 is_user_in_group(US, Group, Host, odbc) ->
     SJID = make_jid_s(US),
     SGroup = ejabberd_odbc:escape(Group),
@@ -765,6 +841,12 @@ add_user_to_group(Host, US, Group, mnesia) ->
     R = #sr_user{us = US, group_host = {Group, Host}},
     F = fun () -> mnesia:write(R) end,
     mnesia:transaction(F);
+add_user_to_group(Host, US, Group, riak) ->
+    {atomic, ejabberd_riak:put(
+               #sr_user{us = US, group_host = {Group, Host}},
+               [{i, {US, {Group, Host}}},
+                {'2i', [{<<"us">>, US},
+                        {<<"group_host">>, {Group, Host}}]}])};
 add_user_to_group(Host, US, Group, odbc) ->
     SJID = make_jid_s(US),
     SGroup = ejabberd_odbc:escape(Group),
@@ -816,6 +898,8 @@ remove_user_from_group(Host, US, Group, mnesia) ->
     R = #sr_user{us = US, group_host = {Group, Host}},
     F = fun () -> mnesia:delete_object(R) end,
     mnesia:transaction(F);
+remove_user_from_group(Host, US, Group, riak) ->
+    {atomic, ejabberd_riak:delete(sr_group, {US, {Group, Host}})};
 remove_user_from_group(Host, US, Group, odbc) ->
     SJID = make_jid_s(US),
     SGroup = ejabberd_odbc:escape(Group),
index e7dd77224a5ffad82d28a14ca7ddba5bc23d8123..8ffe6642bc2252de7b7564fcba736abcbab03159 100644 (file)
@@ -46,7 +46,7 @@
         lbday, ctry, lctry, locality, llocality, email, lemail,
         orgname, lorgname, orgunit, lorgunit}).
 
--record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()},
+-record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()} | binary(),
                 vcard = #xmlel{} :: xmlel()}).
 
 -define(PROCNAME, ejabberd_mod_vcard).
@@ -214,13 +214,9 @@ get_vcard(LUser, LServer, odbc) ->
       _ -> error
     end;
 get_vcard(LUser, LServer, riak) ->
-    Username = LUser,
-    case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of
-        {ok, SVCARD} ->
-            case xml_stream:parse_element(SVCARD) of
-                {error, _Reason} -> error;
-                VCARD -> [VCARD]
-            end;
+    case ejabberd_riak:get(vcard, {LUser, LServer}) of
+        {ok, R} ->
+            [R#vcard.vcard];
         {error, notfound} ->
             [];
         _ ->
@@ -302,6 +298,33 @@ set_vcard(User, LServer, VCARD) ->
                                                        lorgunit = LOrgUnit})
                     end,
                 mnesia:transaction(F);
+             riak ->
+                 US = {LUser, LServer},
+                 ejabberd_riak:put(#vcard{us = US, vcard = VCARD},
+                                   [{'2i', [{<<"user">>, User},
+                                            {<<"luser">>, LUser},
+                                            {<<"fn">>, FN},
+                                            {<<"lfn">>, LFN},
+                                            {<<"family">>, Family},
+                                            {<<"lfamily">>, LFamily},
+                                            {<<"given">>, Given},
+                                            {<<"lgiven">>, LGiven},
+                                            {<<"middle">>, Middle},
+                                            {<<"lmiddle">>, LMiddle},
+                                            {<<"nickname">>, Nickname},
+                                            {<<"lnickname">>, LNickname},
+                                            {<<"bday">>, BDay},
+                                            {<<"lbday">>, LBDay},
+                                            {<<"ctry">>, CTRY},
+                                            {<<"lctry">>, LCTRY},
+                                            {<<"locality">>, Locality},
+                                            {<<"llocality">>, LLocality},
+                                            {<<"email">>, EMail},
+                                            {<<"lemail">>, LEMail},
+                                            {<<"orgname">>, OrgName},
+                                            {<<"lorgname">>, LOrgName},
+                                            {<<"orgunit">>, OrgUnit},
+                                            {<<"lorgunit">>, LOrgUnit}]}]);
             odbc ->
                 Username = ejabberd_odbc:escape(User),
                 LUsername = ejabberd_odbc:escape(LUser),
@@ -335,25 +358,7 @@ set_vcard(User, LServer, VCARD) ->
                                        SLGiven, SLLocality, SLMiddle,
                                        SLNickname, SLOrgName, SLOrgUnit,
                                        SLocality, SMiddle, SNickname, SOrgName,
-                                       SOrgUnit, SVCARD, Username);
-            riak ->
-                   Username = LUser,
-                   SVCARD = xml:element_to_binary(VCARD),
-
-                   ejabberd_riak:put(
-                     LServer, <<"vcard">>, Username, SVCARD,
-                     [{<<"bday_bin">>, LBDay},
-                      {<<"ctry_bin">>, LCTRY},
-                      {<<"email_bin">>, LEMail},
-                      {<<"fn_bin">>, LFN},
-                      {<<"family_bin">>, LFamily},
-                      {<<"given_bin">>, LGiven},
-                      {<<"locality_bin">>, LLocality},
-                      {<<"middle_bin">>, LMiddle},
-                      {<<"nickname_bin">>, LNickname},
-                      {<<"orgname_bin">>, LOrgName},
-                      {<<"orgunit_bin">>, LOrgUnit},
-                      {<<"user_bin">>, Username}])
+                                       SOrgUnit, SVCARD, Username)
           end,
           ejabberd_hooks:run(vcard_set, LServer,
                              [LUser, LServer, VCARD])
@@ -921,9 +926,7 @@ remove_user(LUser, LServer, odbc) ->
                                   [<<"delete from vcard_search where lusername='">>,
                                    Username, <<"';">>]]);
 remove_user(LUser, LServer, riak) ->
-    Username = LUser,
-    ejabberd_riak:delete(LServer, <<"vcard">>, Username),
-    ok.
+    {atomic, ejabberd_riak:delete(vcard, {LUser, LServer})}.
 
 update_tables() ->
     update_vcard_table(),
index b2ea34419598984642c927b9b9d98704a2de5554..74dd30f27a564475eb39b427cf97d65663fb7f01 100644 (file)
@@ -88,6 +88,9 @@ add_xupdate(LUser, LServer, Hash, mnesia) ->
                                            hash = Hash})
        end,
     mnesia:transaction(F);
+add_xupdate(LUser, LServer, Hash, riak) ->
+    {atomic, ejabberd_riak:put(#vcard_xupdate{us = {LUser, LServer},
+                                              hash = Hash})};
 add_xupdate(LUser, LServer, Hash, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     SHash = ejabberd_odbc:escape(Hash),
@@ -109,6 +112,11 @@ get_xupdate(LUser, LServer, mnesia) ->
       [#vcard_xupdate{hash = Hash}] -> Hash;
       _ -> undefined
     end;
+get_xupdate(LUser, LServer, riak) ->
+    case ejabberd_riak:get(vcard_xupdate, {LUser, LServer}) of
+        {ok, #vcard_xupdate{hash = Hash}} -> Hash;
+        _ -> undefined
+    end;
 get_xupdate(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     case ejabberd_odbc:sql_query(LServer,
@@ -129,6 +137,8 @@ remove_xupdate(LUser, LServer, mnesia) ->
                mnesia:delete({vcard_xupdate, {LUser, LServer}})
        end,
     mnesia:transaction(F);
+remove_xupdate(LUser, LServer, riak) ->
+    {atomic, ejabberd_riak:delete(vcard_xupdate, {LUser, LServer})};
 remove_xupdate(LUser, LServer, odbc) ->
     Username = ejabberd_odbc:escape(LUser),
     F = fun () ->