--- /dev/null
+%%%----------------------------------------------------------------------
+%%% File : ejabberd_auth_riak.erl
+%%% Author : Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% Purpose : Authentification via Riak
+%%% Created : 12 Nov 2012 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2012 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_auth_riak).
+
+-author('alexey@process-one.net').
+
+-behaviour(ejabberd_auth).
+
+%% External exports
+-export([start/1, set_password/3, check_password/3,
+ check_password/5, try_register/3,
+ dirty_get_registered_users/0, get_vh_registered_users/1,
+ get_vh_registered_users/2,
+ get_vh_registered_users_number/1,
+ get_vh_registered_users_number/2, get_password/2,
+ get_password_s/2, is_user_exists/2, remove_user/2,
+ remove_user/3, store_type/0, export/1,
+ plain_password_required/0]).
+
+-include("ejabberd.hrl").
+
+-record(passwd, {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$1',
+ password = <<"">> :: binary() | scram() | '_'}).
+
+-define(SALT_LENGTH, 16).
+
+start(_Host) ->
+ ok.
+
+plain_password_required() ->
+ case is_scrammed() of
+ false -> false;
+ true -> true
+ end.
+
+store_type() ->
+ case is_scrammed() of
+ false -> plain; %% allows: PLAIN DIGEST-MD5 SCRAM
+ true -> scram %% allows: PLAIN SCRAM
+ end.
+
+check_password(User, Server, Password) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {ok, #passwd{password = Password}} when is_binary(Password) ->
+ Password /= <<"">>;
+ {ok, #passwd{password = Scram}} when is_record(Scram, scram) ->
+ is_password_scram_valid(Password, Scram);
+ _ ->
+ false
+ end.
+
+check_password(User, Server, Password, Digest,
+ DigestGen) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {ok, #passwd{password = Passwd}} when is_binary(Passwd) ->
+ DigRes = if Digest /= <<"">> ->
+ Digest == DigestGen(Passwd);
+ true -> false
+ end,
+ if DigRes -> true;
+ true -> (Passwd == Password) and (Password /= <<"">>)
+ end;
+ {ok, #passwd{password = Scram}}
+ when is_record(Scram, scram) ->
+ Passwd = jlib:decode_base64(Scram#scram.storedkey),
+ DigRes = if Digest /= <<"">> ->
+ Digest == DigestGen(Passwd);
+ true -> false
+ end,
+ if DigRes -> true;
+ true -> (Passwd == Password) and (Password /= <<"">>)
+ end;
+ _ -> false
+ end.
+
+set_password(User, Server, Password) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ US = {LUser, LServer},
+ if (LUser == error) or (LServer == error) ->
+ {error, invalid_jid};
+ true ->
+ Password2 = case is_scrammed() and is_binary(Password)
+ of
+ true -> password_to_scram(Password);
+ false -> Password
+ end,
+ ok = ejabberd_riak:put(#passwd{us = US, password = Password2},
+ [{'2i', [{<<"host">>, LServer}]}])
+ end.
+
+try_register(User, Server, PasswordList) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Password = iolist_to_binary(PasswordList),
+ US = {LUser, LServer},
+ if (LUser == error) or (LServer == error) ->
+ {error, invalid_jid};
+ true ->
+ case ejabberd_riak:get(passwd, US) of
+ {error, notfound} ->
+ Password2 = case is_scrammed() and
+ is_binary(Password)
+ of
+ true -> password_to_scram(Password);
+ false -> Password
+ end,
+ {atomic, ejabberd_riak:put(
+ #passwd{us = US,
+ password = Password2},
+ [{'2i', [{<<"host">>, LServer}]}])};
+ {ok, _} ->
+ exists;
+ Err ->
+ {atomic, Err}
+ end
+ end.
+
+dirty_get_registered_users() ->
+ lists:flatmap(
+ fun(Server) ->
+ get_vh_registered_users(Server)
+ end, ejabberd_config:get_vh_by_auth_method(riak)).
+
+get_vh_registered_users(Server) ->
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get_keys_by_index(passwd, <<"host">>, LServer) of
+ {ok, Users} ->
+ Users;
+ _ ->
+ []
+ end.
+
+get_vh_registered_users(Server, _) ->
+ get_vh_registered_users(Server).
+
+get_vh_registered_users_number(Server) ->
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:count_by_index(passwd, <<"host">>, LServer) of
+ {ok, N} ->
+ N;
+ _ ->
+ 0
+ end.
+
+get_vh_registered_users_number(Server, _) ->
+ get_vh_registered_users_number(Server).
+
+get_password(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {ok, #passwd{password = Password}}
+ when is_binary(Password) ->
+ Password;
+ {ok, #passwd{password = Scram}}
+ when is_record(Scram, scram) ->
+ {jlib:decode_base64(Scram#scram.storedkey),
+ jlib:decode_base64(Scram#scram.serverkey),
+ jlib:decode_base64(Scram#scram.salt),
+ Scram#scram.iterationcount};
+ _ -> false
+ end.
+
+get_password_s(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {ok, #passwd{password = Password}}
+ when is_binary(Password) ->
+ Password;
+ {ok, #passwd{password = Scram}}
+ when is_record(Scram, scram) ->
+ <<"">>;
+ _ -> <<"">>
+ end.
+
+is_user_exists(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {error, notfound} -> false;
+ {ok, _} -> true;
+ Err -> Err
+ end.
+
+remove_user(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ ejabberd_riak:delete(passwd, {LUser, LServer}),
+ ok.
+
+remove_user(User, Server, Password) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ case ejabberd_riak:get(passwd, {LUser, LServer}) of
+ {ok, #passwd{password = Password}}
+ when is_binary(Password) ->
+ ejabberd_riak:delete(passwd, {LUser, LServer}),
+ ok;
+ {ok, #passwd{password = Scram}}
+ when is_record(Scram, scram) ->
+ case is_password_scram_valid(Password, Scram) of
+ true ->
+ ejabberd_riak:delete(passwd, {LUser, LServer}),
+ ok;
+ false -> not_allowed
+ end;
+ _ -> not_exists
+ end.
+
+%%%
+%%% SCRAM
+%%%
+
+is_scrammed() ->
+ scram ==
+ ejabberd_config:get_local_option({auth_password_format, ?MYNAME},
+ fun(V) -> V end).
+
+password_to_scram(Password) ->
+ password_to_scram(Password,
+ ?SCRAM_DEFAULT_ITERATION_COUNT).
+
+password_to_scram(Password, IterationCount) ->
+ Salt = crypto:rand_bytes(?SALT_LENGTH),
+ SaltedPassword = scram:salted_password(Password, Salt,
+ IterationCount),
+ StoredKey =
+ scram:stored_key(scram:client_key(SaltedPassword)),
+ ServerKey = scram:server_key(SaltedPassword),
+ #scram{storedkey = jlib:encode_base64(StoredKey),
+ serverkey = jlib:encode_base64(ServerKey),
+ salt = jlib:encode_base64(Salt),
+ iterationcount = IterationCount}.
+
+is_password_scram_valid(Password, Scram) ->
+ IterationCount = Scram#scram.iterationcount,
+ Salt = jlib:decode_base64(Scram#scram.salt),
+ SaltedPassword = scram:salted_password(Password, Salt,
+ IterationCount),
+ StoredKey =
+ scram:stored_key(scram:client_key(SaltedPassword)),
+ jlib:decode_base64(Scram#scram.storedkey) == StoredKey.
+
+export(_Server) ->
+ [{passwd,
+ fun(Host, #passwd{us = {LUser, LServer}, password = Password})
+ when LServer == Host ->
+ Username = ejabberd_odbc:escape(LUser),
+ Pass = ejabberd_odbc:escape(Password),
+ [[<<"delete from users where username='">>, Username, <<"';">>],
+ [<<"insert into users(username, password) "
+ "values ('">>, Username, <<"', '">>, Pass, <<"');">>]];
+ (_Host, _R) ->
+ []
+ end}].
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_riak.erl
-%%% Author : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Serve Riak connection
+%%%-------------------------------------------------------------------
+%%% @author Alexey Shchepin <alexey@process-one.net>
+%%% @doc
+%%% Interface for Riak database
+%%% @end
%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2011 ProcessOne
+%%% @copyright (C) 2002-2012 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
-%%%----------------------------------------------------------------------
-
+%%%-------------------------------------------------------------------
-module(ejabberd_riak).
--author('alexey@process-one.net').
-
-%% External exports
--export([start_link/3,
- put/4,
- put/5,
- get_object/3,
- get/3,
- get_objects_by_index/4,
- get_by_index/4,
- get_keys_by_index/4,
- count_by_index/4,
- delete/3]).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/3, make_bucket/1, put/1, put/2,
+ get/1, get/2, get_by_index/3, delete/1, delete/2,
+ count_by_index/3, get_by_index_range/4,
+ get_keys/1, get_keys_by_index/3,
+ count/1, delete_by_index/3]).
+%% For debugging
+-export([get_tables/0]).
+%% map/reduce exports
+-export([map_key/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
-include("ejabberd.hrl").
-%%%----------------------------------------------------------------------
+-record(state, {pid = self() :: pid()}).
+
+-type index() :: {binary(), any()}.
+
+-type index_info() :: [{i, any()} | {'2i', [index()]}].
+
+%% The `index_info()' is used in put/delete functions:
+%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
+%% There must be only one primary index. If `i' is not specified,
+%% the first element of the record is assumed as a primary index,
+%% i.e. `i' = element(2, Record).
+
+-export_types([index_info/0]).
+
+%%%===================================================================
%%% API
-%%%----------------------------------------------------------------------
-start_link(Server, Port, StartInterval) ->
- {ok, Pid} = riakc_pb_socket:start_link(
- Server, Port,
- [auto_reconnect]),
- ejabberd_riak_sup:add_pid(Pid),
- {ok, Pid}.
-
-make_bucket(Host, Table) ->
- iolist_to_binary([Host, $@, Table]).
-
-put(Host, Table, Key, Value) ->
- Bucket = make_bucket(Host, Table),
- Obj = riakc_obj:new(Bucket, Key, Value),
- riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj).
-
-put(Host, Table, Key, Value, Indexes) ->
- Bucket = make_bucket(Host, Table),
- Obj = riakc_obj:new(Bucket, Key, Value),
- MetaData = dict:store(<<"index">>, Indexes, dict:new()),
- Obj2 = riakc_obj:update_metadata(Obj, MetaData),
- riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2).
-
-get_object(Host, Table, Key) ->
- Bucket = make_bucket(Host, Table),
+%%%===================================================================
+%% @private
+start_link(Server, Port, _StartInterval) ->
+ gen_server:start_link(?MODULE, [Server, Port], []).
+
+-spec make_bucket(atom()) -> binary().
+%% @doc Makes a bucket from a table name
+%% @private
+make_bucket(Table) ->
+ erlang:atom_to_binary(Table, utf8).
+
+-spec put(tuple()) -> ok | {error, any()}.
+%% @equiv put(Record, [])
+put(Record) ->
+ ?MODULE:put(Record, []).
+
+-spec put(tuple(), index_info()) -> ok | {error, any()}.
+%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
+put(Rec, IndexInfo) ->
+ Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
+ SecIdxs = [encode_index_key(K, V) ||
+ {K, V} <- proplists:get_value('2i', IndexInfo, [])],
+ Table = element(1, Rec),
+ Value = term_to_binary(Rec),
+ case put_raw(Table, Key, Value, SecIdxs) of
+ ok ->
+ ok;
+ Error ->
+ log_error(Error, put, [{record, Rec},
+ {index_info, IndexInfo}]),
+ Error
+ end.
+
+put_raw(Table, Key, Value, Indexes) ->
+ Bucket = make_bucket(Table),
+ Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"),
+ Obj1 = if Indexes /= [] ->
+ MetaData = dict:store(<<"index">>, Indexes, dict:new()),
+ riakc_obj:update_metadata(Obj, MetaData);
+ true ->
+ Obj
+ end,
+ riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
+
+get_object_raw(Table, Key) ->
+ Bucket = make_bucket(Table),
riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
-get(Host, Table, Key) ->
- case get_object(Host, Table, Key) of
+-spec get(atom()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns all objects from table `Table'
+get(Table) ->
+ Bucket = make_bucket(Table),
+ case riakc_pb_socket:mapred(
+ ejabberd_riak_sup:get_random_pid(),
+ Bucket,
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
+ {ok, [{_, Objs}]} ->
+ {ok, lists:flatmap(
+ fun(Obj) ->
+ case catch binary_to_term(Obj) of
+ {'EXIT', _} ->
+ Error = {error, make_invalid_object(Obj)},
+ log_error(Error, get,
+ [{table, Table}]),
+ [];
+ Term ->
+ [Term]
+ end
+ end, Objs)};
+ {error, notfound} ->
+ {ok, []};
+ Error ->
+ Error
+ end.
+
+-spec get(atom(), any()) -> {ok, any()} | {error, any()}.
+%% @doc Reads record by `Key' from table `Table'
+get(Table, Key) ->
+ case get_raw(Table, encode_key(Key)) of
+ {ok, Val} ->
+ case catch binary_to_term(Val) of
+ {'EXIT', _} ->
+ Error = {error, make_invalid_object(Val)},
+ log_error(Error, get, [{table, Table}, {key, Key}]),
+ {error, notfound};
+ Term ->
+ {ok, Term}
+ end;
+ Error ->
+ log_error(Error, get, [{table, Table},
+ {key, Key}]),
+ Error
+ end.
+
+-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}.
+%% @doc Reads records by `Index' and value `Key' from `Table'
+get_by_index(Table, Index, Key) ->
+ {NewIndex, NewKey} = encode_index_key(Index, Key),
+ case get_by_index_raw(Table, NewIndex, NewKey) of
+ {ok, Vals} ->
+ {ok, lists:flatmap(
+ fun(Val) ->
+ case catch binary_to_term(Val) of
+ {'EXIT', _} ->
+ Error = {error, make_invalid_object(Val)},
+ log_error(Error, get_by_index,
+ [{table, Table},
+ {index, Index},
+ {key, Key}]),
+ [];
+ Term ->
+ [Term]
+ end
+ end, Vals)};
+ {error, notfound} ->
+ {ok, []};
+ Error ->
+ log_error(Error, get_by_index,
+ [{table, Table},
+ {index, Index},
+ {key, Key}]),
+ Error
+ end.
+
+-spec get_by_index_range(atom(), binary(), any(), any()) ->
+ {ok, [any()]} | {error, any()}.
+%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
+get_by_index_range(Table, Index, FromKey, ToKey) ->
+ {NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
+ {NewIndex, NewToKey} = encode_index_key(Index, ToKey),
+ case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
+ {ok, Vals} ->
+ {ok, lists:flatmap(
+ fun(Val) ->
+ case catch binary_to_term(Val) of
+ {'EXIT', _} ->
+ Error = {error, make_invalid_object(Val)},
+ log_error(Error, get_by_index_range,
+ [{table, Table},
+ {index, Index},
+ {start_key, FromKey},
+ {end_key, ToKey}]),
+ [];
+ Term ->
+ [Term]
+ end
+ end, Vals)};
+ {error, notfound} ->
+ {ok, []};
+ Error ->
+ log_error(Error, get_by_index_range,
+ [{table, Table}, {index, Index},
+ {start_key, FromKey}, {end_key, ToKey}]),
+ Error
+ end.
+
+get_raw(Table, Key) ->
+ case get_object_raw(Table, Key) of
{ok, Obj} ->
{ok, riakc_obj:get_value(Obj)};
Error ->
Error
end.
-get_objects_by_index(Host, Table, Index, Key) ->
- Bucket = make_bucket(Host, Table),
+-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns a list of index values
+get_keys(Table) ->
+ Bucket = make_bucket(Table),
+ case riakc_pb_socket:mapred(
+ ejabberd_riak_sup:get_random_pid(),
+ Bucket,
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ {ok, [{_, Keys}]} ->
+ {ok, Keys};
+ Error ->
+ log_error(Error, get_keys, [{table, Table}]),
+ Error
+ end.
+
+-spec get_keys_by_index(atom(), binary(),
+ any()) -> {ok, [any()]} | {error, any()}.
+%% @doc Returns a list of primary keys of objects indexed by `Key'.
+get_keys_by_index(Table, Index, Key) ->
+ {NewIndex, NewKey} = encode_index_key(Index, Key),
+ Bucket = make_bucket(Table),
+ case riakc_pb_socket:mapred(
+ ejabberd_riak_sup:get_random_pid(),
+ {index, Bucket, NewIndex, NewKey},
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ {ok, [{_, Keys}]} ->
+ {ok, Keys};
+ Error ->
+ log_error(Error, get_keys_by_index, [{table, Table},
+ {index, Index},
+ {key, Key}]),
+ Error
+ end.
+
+%% @hidden
+get_tables() ->
+ riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
+
+get_by_index_raw(Table, Index, Key) ->
+ Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
- [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
Error ->
Error
end.
-get_by_index(Host, Table, Index, Key) ->
- Bucket = make_bucket(Host, Table),
+get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
+ Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, Key},
+ {index, Bucket, Index, FromKey, ToKey},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
Error
end.
-get_keys_by_index(Host, Table, Index, Key) ->
- Bucket = make_bucket(Host, Table),
+-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}.
+%% @doc Returns the number of objects in the `Table'
+count(Table) ->
+ Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, Key},
- []) of
- {ok, [{_, Ls}]} ->
- {ok, [K || {_, K} <- Ls]};
+ Bucket,
+ [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+ none, true}]) of
+ {ok, [{_, [Cnt]}]} ->
+ {ok, Cnt};
Error ->
+ log_error(Error, count, [{table, Table}]),
Error
end.
-count_by_index(Host, Table, Index, Key) ->
- Bucket = make_bucket(Host, Table),
+-spec count_by_index(atom(), binary(), any()) ->
+ {ok, non_neg_integer()} | {error, any()}.
+%% @doc Returns the number of objects in the `Table' by index
+count_by_index(Tab, Index, Key) ->
+ {NewIndex, NewKey} = encode_index_key(Index, Key),
+ case count_by_index_raw(Tab, NewIndex, NewKey) of
+ {ok, Cnt} ->
+ {ok, Cnt};
+ {error, notfound} ->
+ {ok, 0};
+ Error ->
+ log_error(Error, count_by_index,
+ [{table, Tab},
+ {index, Index},
+ {key, Key}]),
+ Error
+ end.
+
+count_by_index_raw(Table, Index, Key) ->
+ Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
Error
end.
-delete(Host, Table, Key) ->
- Bucket = make_bucket(Host, Table),
+-spec delete(tuple() | atom()) -> ok | {error, any()}.
+%% @doc Same as delete(T, []) when T is record.
+%% Or deletes all elements from table if T is atom.
+delete(Rec) when is_tuple(Rec) ->
+ delete(Rec, []);
+delete(Table) when is_atom(Table) ->
+ try
+ {ok, Keys} = ?MODULE:get_keys(Table),
+ lists:foreach(
+ fun(K) ->
+ ok = delete(Table, K)
+ end, Keys)
+ catch _:{badmatch, Err} ->
+ Err
+ end.
+
+-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}.
+%% @doc Delete an object
+delete(Rec, Opts) when is_tuple(Rec) ->
+ Table = element(1, Rec),
+ Key = proplists:get_value(i, Opts, element(2, Rec)),
+ delete(Table, Key);
+delete(Table, Key) when is_atom(Table) ->
+ case delete_raw(Table, encode_key(Key)) of
+ ok ->
+ ok;
+ Err ->
+ log_error(Err, delete, [{table, Table}, {key, Key}]),
+ Err
+ end.
+
+delete_raw(Table, Key) ->
+ Bucket = make_bucket(Table),
riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
+%% @doc Deletes objects by index
+delete_by_index(Table, Index, Key) ->
+ try
+ {ok, Keys} = get_keys_by_index(Table, Index, Key),
+ lists:foreach(
+ fun(K) ->
+ ok = delete(Table, K)
+ end, Keys)
+ catch _:{badmatch, Err} ->
+ Err
+ end.
+
+%%%===================================================================
+%%% map/reduce functions
+%%%===================================================================
+%% @private
+map_key(Obj, _, _) ->
+ [case riak_object:key(Obj) of
+ <<"b_", B/binary>> ->
+ B;
+ <<"i_", B/binary>> ->
+ list_to_integer(binary_to_list(B));
+ B ->
+ erlang:binary_to_term(B)
+ end].
+
+%%%===================================================================
+%%% gen_server API
+%%%===================================================================
+%% @private
+init([Server, Port]) ->
+ case riakc_pb_socket:start(
+ Server, Port,
+ [auto_reconnect]) of
+ {ok, Pid} ->
+ erlang:monitor(process, Pid),
+ ejabberd_riak_sup:add_pid(Pid),
+ {ok, #state{pid = Pid}};
+ Err ->
+ {stop, Err}
+ end.
+
+%% @private
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%% @private
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%% @private
+handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) ->
+ {stop, normal, State};
+handle_info(_Info, State) ->
+ ?ERROR_MSG("unexpected info: ~p", [_Info]),
+ {noreply, State}.
+
+%% @private
+terminate(_Reason, State) ->
+ ejabberd_riak_sup:remove_pid(State#state.pid),
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+encode_index_key(Idx, Key) when is_integer(Key) ->
+ {<<Idx/binary, "_int">>, Key};
+encode_index_key(Idx, Key) ->
+ {<<Idx/binary, "_bin">>, encode_key(Key)}.
+
+encode_key(Bin) when is_binary(Bin) ->
+ <<"b_", Bin/binary>>;
+encode_key(Int) when is_integer(Int) ->
+ <<"i_", (list_to_binary(integer_to_list(Int)))/binary>>;
+encode_key(Term) ->
+ erlang:term_to_binary(Term).
+
+log_error({error, notfound}, _, _) ->
+ ok;
+log_error({error, Why} = Err, Function, Opts) ->
+ Txt = lists:map(
+ fun({table, Table}) ->
+ io_lib:fwrite("** Table: ~p~n", [Table]);
+ ({key, Key}) ->
+ io_lib:fwrite("** Key: ~p~n", [Key]);
+ ({index, Index}) ->
+ io_lib:fwrite("** Index = ~p~n", [Index]);
+ ({start_key, Key}) ->
+ io_lib:fwrite("** Start Key: ~p~n", [Key]);
+ ({end_key, Key}) ->
+ io_lib:fwrite("** End Key: ~p~n", [Key]);
+ ({record, Rec}) ->
+ io_lib:fwrite("** Record = ~p~n", [Rec]);
+ ({index_info, IdxInfo}) ->
+ io_lib:fwrite("** Index info = ~p~n", [IdxInfo]);
+ (_) ->
+ ""
+ end, Opts),
+ ErrTxt = if is_binary(Why) ->
+ io_lib:fwrite("** Error: ~s", [Why]);
+ true ->
+ io_lib:fwrite("** Error: ~p", [Err])
+ end,
+ ?ERROR_MSG("database error:~n** Function: ~p~n~s~s",
+ [Function, Txt, ErrTxt]);
+log_error(_, _, _) ->
+ ok.
+
+make_invalid_object(Val) ->
+ list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
{Server, Port} =
ejabberd_config:get_local_option(
riak_server,
- fun({S, P}) when is_list(S), is_integer(P), P >= 1 -> {S, P} end,
- {"127.0.0.1", 8081}),
+ fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
+ {binary_to_list(iolist_to_binary(S)), P}
+ end, {"127.0.0.1", 8081}),
{ok, {{one_for_one, PoolSize*10, 1},
lists:map(
fun(I) ->
end, Sessions)
end,
mnesia:transaction(F);
+ riak ->
+ try
+ lists:foreach(
+ fun({U, S, _R}) ->
+ ok = ejabberd_riak:put(#motd_users{us = {U, S}},
+ [{'2i', [{<<"server">>, S}]}])
+ end, Sessions),
+ {atomic, ok}
+ catch _:{badmatch, Err} ->
+ {atomic, Err}
+ end;
odbc ->
F = fun() ->
lists:foreach(
mnesia:write(#motd{server = LServer, packet = Packet})
end,
mnesia:transaction(F);
+ riak ->
+ {atomic, ejabberd_riak:put(#motd{server = LServer,
+ packet = Packet})};
odbc ->
XML = ejabberd_odbc:escape(xml:element_to_binary(Packet)),
F = fun() ->
end, Users)
end,
mnesia:transaction(F);
+ riak ->
+ try
+ ok = ejabberd_riak:delete(motd, LServer),
+ ok = ejabberd_riak:delete_by_index(motd_users,
+ <<"server">>,
+ LServer),
+ {atomic, ok}
+ catch _:{badmatch, Err} ->
+ {atomic, Err}
+ end;
odbc ->
F = fun() ->
ejabberd_odbc:sql_query_t([<<"delete from motd;">>])
_ ->
ok
end;
+send_motd(#jid{luser = LUser, lserver = LServer} = JID, riak) ->
+ case catch ejabberd_riak:get(motd, LServer) of
+ {ok, #motd{packet = Packet}} ->
+ US = {LUser, LServer},
+ case ejabberd_riak:get(motd_users, US) of
+ {ok, #motd_users{}} ->
+ ok;
+ _ ->
+ Local = jlib:make_jid(<<>>, LServer, <<>>),
+ ejabberd_router:route(Local, JID, Packet),
+ {atomic, ejabberd_riak:put(
+ #motd_users{us = US},
+ [{'2i', [{<<"server">>, LServer}]}])}
+ end;
+ _ ->
+ ok
+ end;
send_motd(#jid{luser = LUser, lserver = LServer} = JID, odbc) when LUser /= <<>> ->
case catch ejabberd_odbc:sql_query(
LServer, [<<"select xml from motd where username='';">>]) of
_ ->
error
end;
+get_stored_motd_packet(LServer, riak) ->
+ case ejabberd_riak:get(motd, LServer) of
+ {ok, #motd{packet = Packet}} ->
+ {ok, Packet};
+ _ ->
+ error
+ end;
get_stored_motd_packet(LServer, odbc) ->
case catch ejabberd_odbc:sql_query(
LServer, [<<"select xml from motd where username='';">>]) of
{ok, NewDefault, NewList}
end,
mnesia:transaction(F);
+process_blocklist_block(LUser, LServer, Filter,
+ riak) ->
+ {atomic,
+ begin
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists} = P} ->
+ case lists:keysearch(Default, 1, Lists) of
+ {value, {_, List}} ->
+ NewDefault = Default,
+ NewLists1 = lists:keydelete(Default, 1, Lists);
+ false ->
+ NewDefault = <<"Blocked contacts">>,
+ NewLists1 = Lists,
+ List = []
+ end;
+ {error, _} ->
+ P = #privacy{us = {LUser, LServer}},
+ NewDefault = <<"Blocked contacts">>,
+ NewLists1 = [],
+ List = []
+ end,
+ NewList = Filter(List),
+ NewLists = [{NewDefault, NewList} | NewLists1],
+ case ejabberd_riak:put(P#privacy{default = NewDefault,
+ lists = NewLists}) of
+ ok ->
+ {ok, NewDefault, NewList};
+ Err ->
+ Err
+ end
+ end};
process_blocklist_block(LUser, LServer, Filter, odbc) ->
F = fun () ->
Default = case
end
end,
mnesia:transaction(F);
+process_blocklist_unblock_all(LUser, LServer, Filter,
+ riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists} = P} ->
+ case lists:keysearch(Default, 1, Lists) of
+ {value, {_, List}} ->
+ NewList = Filter(List),
+ NewLists1 = lists:keydelete(Default, 1, Lists),
+ NewLists = [{Default, NewList} | NewLists1],
+ case ejabberd_riak:put(P#privacy{lists = NewLists}) of
+ ok ->
+ {ok, Default, NewList};
+ Err ->
+ Err
+ end;
+ false ->
+ %% No default list, nothing to unblock
+ ok
+ end;
+ {error, _} ->
+ %% No lists, nothing to unblock
+ ok
+ end};
process_blocklist_unblock_all(LUser, LServer, Filter,
odbc) ->
F = fun () ->
end
end,
mnesia:transaction(F);
+process_blocklist_unblock(LUser, LServer, Filter,
+ riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {error, _} ->
+ %% No lists, nothing to unblock
+ ok;
+ {ok, #privacy{default = Default, lists = Lists} = P} ->
+ case lists:keysearch(Default, 1, Lists) of
+ {value, {_, List}} ->
+ NewList = Filter(List),
+ NewLists1 = lists:keydelete(Default, 1, Lists),
+ NewLists = [{Default, NewList} | NewLists1],
+ case ejabberd_riak:put(P#privacy{lists = NewLists}) of
+ ok ->
+ {ok, Default, NewList};
+ Err ->
+ Err
+ end;
+ false ->
+ %% No default list, nothing to unblock
+ ok
+ end
+ end};
process_blocklist_unblock(LUser, LServer, Filter,
odbc) ->
F = fun () ->
_ -> []
end
end;
+process_blocklist_get(LUser, LServer, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists}} ->
+ case lists:keysearch(Default, 1, Lists) of
+ {value, {_, List}} -> List;
+ _ -> []
+ end;
+ {error, notfound} ->
+ [];
+ {error, _} ->
+ error
+ end;
process_blocklist_get(LUser, LServer, odbc) ->
case catch
mod_privacy:sql_get_default_privacy_list(LUser, LServer)
[#caps_features{features = Features}] -> {ok, Features};
_ -> error
end
+ end;
+caps_read_fun(_LServer, Node, riak) ->
+ fun() ->
+ case ejabberd_riak:get(caps_features, Node) of
+ {ok, #caps_features{features = Features}} -> {ok, Features};
+ _ -> error
+ end
+ end;
+caps_read_fun(LServer, {Node, SubNode}, odbc) ->
+ fun() ->
+ SNode = ejabberd_odbc:escape(Node),
+ SSubNode = ejabberd_odbc:escape(SubNode),
+ case ejabberd_odbc:sql_query(
+ LServer, [<<"select feature from caps_features where ">>,
+ <<"node='">>, SNode, <<"' and subnode='">>,
+ SSubNode, <<"';">>]) of
+ {selected, [<<"feature">>], [[H]|_] = Fs} ->
+ case catch jlib:binary_to_integer(H) of
+ Int when is_integer(Int), Int>=0 ->
+ {ok, Int};
+ _ ->
+ {ok, lists:flatten(Fs)}
+ end;
+ _ ->
+ error
+ end
end.
caps_write_fun(Node, Features) ->
fun () ->
mnesia:dirty_write(#caps_features{node_pair = Node,
features = Features})
+ end;
+caps_write_fun(_LServer, Node, Features, riak) ->
+ fun () ->
+ ejabberd_riak:put(#caps_features{node_pair = Node,
+ features = Features})
+ end;
+caps_write_fun(LServer, NodePair, Features, odbc) ->
+ fun () ->
+ ejabberd_odbc:sql_transaction(
+ LServer,
+ sql_write_features_t(NodePair, Features))
end.
make_my_disco_hash(Host) ->
[] -> empty;
[#irc_custom{data = Data}] -> Data
end;
+get_data(LServer, Host, From, riak) ->
+ #jid{luser = LUser, lserver = LServer} = From,
+ US = {LUser, LServer},
+ case ejabberd_riak:get(irc_custom, {US, Host}) of
+ {ok, #irc_custom{data = Data}} ->
+ Data;
+ {error, notfound} ->
+ empty;
+ _Err ->
+ error
+ end;
get_data(LServer, Host, From, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
data = Data})
end,
mnesia:transaction(F);
+set_data(LServer, Host, From, Data, riak) ->
+ {LUser, LServer, _} = jlib:jid_tolower(From),
+ US = {LUser, LServer},
+ {atomic, ejabberd_riak:put(#irc_custom{us_host = {US, Host},
+ data = Data})};
set_data(LServer, Host, From, Data, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
status = Status}] ->
{ok, TimeStamp, Status}
end;
+get_last(LUser, LServer, riak) ->
+ case ejabberd_riak:get(last_activity, {LUser, LServer}) of
+ {ok, #last_activity{timestamp = TimeStamp,
+ status = Status}} ->
+ {ok, TimeStamp, Status};
+ {error, notfound} ->
+ not_found;
+ Err ->
+ Err
+ end;
get_last(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch odbc_queries:get_last(LServer, Username) of
status = Status})
end,
mnesia:transaction(F);
+store_last_info(LUser, LServer, TimeStamp, Status,
+ riak) ->
+ US = {LUser, LServer},
+ {atomic, ejabberd_riak:put(#last_activity{us = US,
+ timestamp = TimeStamp,
+ status = Status})};
store_last_info(LUser, LServer, TimeStamp, Status,
odbc) ->
Username = ejabberd_odbc:escape(LUser),
mnesia:transaction(F);
remove_user(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
- odbc_queries:del_last(LServer, Username).
+ odbc_queries:del_last(LServer, Username);
+remove_user(LUser, LServer, riak) ->
+ {atomic, ejabberd_riak:delete(last_activity, {LUser, LServer})}.
update_table() ->
Fields = record_info(fields, last_activity),
opts = Opts})
end,
mnesia:transaction(F);
+store_room(_LServer, Host, Name, Opts, riak) ->
+ {atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host},
+ opts = Opts})};
store_room(LServer, Host, Name, Opts, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
[#muc_room{opts = Opts}] -> Opts;
_ -> error
end;
+restore_room(_LServer, Host, Name, riak) ->
+ case ejabberd_riak:get(muc_room, {Name, Host}) of
+ {ok, #muc_room{opts = Opts}} -> Opts;
+ _ -> error
+ end;
restore_room(LServer, Host, Name, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
F = fun () -> mnesia:delete({muc_room, {Name, Host}})
end,
mnesia:transaction(F);
+forget_room(_LServer, Host, Name, riak) ->
+ {atomic, ejabberd_riak:delete(muc_room, {Name, Host})};
forget_room(LServer, Host, Name, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
[] -> true;
[#muc_registered{us_host = {U, _Host}}] -> U == LUS
end;
+can_use_nick(LServer, Host, JID, Nick, riak) ->
+ {LUser, LServer, _} = jlib:jid_tolower(JID),
+ LUS = {LUser, LServer},
+ case ejabberd_riak:get_by_index(muc_registered,
+ <<"nick_host">>, {Nick, Host}) of
+ {ok, []} ->
+ true;
+ {ok, [#muc_registered{us_host = {U, _Host}}]} ->
+ U == LUS;
+ {error, _} ->
+ true
+ end;
can_use_nick(LServer, Host, JID, Nick, odbc) ->
SJID =
jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(JID))),
{'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]), [];
Rs -> Rs
end;
+get_rooms(_LServer, Host, riak) ->
+ case ejabberd_riak:get(muc_room) of
+ {ok, Rs} ->
+ lists:filter(
+ fun(#muc_room{name_host = {_, H}}) ->
+ Host == H
+ end, Rs);
+ _Err ->
+ []
+ end;
get_rooms(LServer, Host, odbc) ->
SHost = ejabberd_odbc:escape(Host),
case catch ejabberd_odbc:sql_query(LServer,
[] -> error;
[#muc_registered{nick = Nick}] -> Nick
end;
+get_nick(LServer, Host, From, riak) ->
+ {LUser, LServer, _} = jlib:jid_tolower(From),
+ US = {LUser, LServer},
+ case ejabberd_riak:get(muc_registered, {US, Host}) of
+ {ok, #muc_registered{nick = Nick}} -> Nick;
+ {error, _} -> error
+ end;
get_nick(LServer, Host, From, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
end
end,
mnesia:transaction(F);
+set_nick(LServer, Host, From, Nick, riak) ->
+ {LUser, LServer, _} = jlib:jid_tolower(From),
+ LUS = {LUser, LServer},
+ {atomic,
+ case Nick of
+ <<"">> ->
+ ejabberd_riak:delete(muc_registered, {LUS, Host});
+ _ ->
+ Allow = case ejabberd_riak:get_by_index(
+ muc_registered,
+ <<"nick_host">>, {Nick, Host}) of
+ {ok, []} ->
+ true;
+ {ok, [#muc_registered{us_host = {U, _Host}}]} ->
+ U == LUS;
+ {error, _} ->
+ false
+ end,
+ if Allow ->
+ ejabberd_riak:put(#muc_registered{us_host = {LUS, Host},
+ nick = Nick},
+ [{'2i', [{<<"nick_host">>,
+ {Nick, Host}}]}]);
+ true ->
+ false
+ end
+ end};
set_nick(LServer, Host, From, Nick, odbc) ->
JID =
jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From))),
discard_warn_sender(Msgs);
true ->
lists:foreach(
- fun(M) ->
- Username = User,
- From = M#offline_msg.from,
- To = M#offline_msg.to,
- #xmlel{name = Name, attrs = Attrs,
- children = Els} =
- M#offline_msg.packet,
- Attrs2 = jlib:replace_from_to_attrs(
- jlib:jid_to_string(From),
- jlib:jid_to_string(To),
- Attrs),
- Packet = #xmlel{name = Name,
- attrs = Attrs2,
- children =
- Els ++
- [jlib:timestamp_to_xml(
- calendar:now_to_universal_time(
- M#offline_msg.timestamp),
- utc,
- jlib:make_jid(<<"">>, Host, <<"">>),
- <<"Offline Storage">>),
- jlib:timestamp_to_xml(
- calendar:now_to_universal_time(
- M#offline_msg.timestamp))]},
- XML = xml:element_to_binary(Packet),
- {MegaSecs, Secs, MicroSecs} =
- M#offline_msg.timestamp,
- TS =
- iolist_to_binary(
- io_lib:format("~6..0w~6..0w.~6..0w",
- [MegaSecs, Secs, MicroSecs])),
- ejabberd_riak:put(
- Host, <<"offline">>,
- undefined, XML,
- [{<<"user_bin">>, Username},
- {<<"timestamp_bin">>, TS}
- ])
+ fun(#offline_msg{us = US,
+ timestamp = TS} = M) ->
+ ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}])
end, Msgs)
end.
case DBType of
mnesia -> Msgs;
odbc -> lists:reverse(Msgs);
- riak -> lists:reverse(Msgs)
+ riak -> Msgs
end
end.
_ -> Ls
end;
pop_offline_messages(Ls, LUser, LServer, riak) ->
- Username = LUser,
- case ejabberd_riak:get_objects_by_index(
- LServer, <<"offline">>, <<"user_bin">>, Username) of
+ case ejabberd_riak:get_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}) of
{ok, Rs} ->
- SortedRs =
- lists:sort(fun(X, Y) ->
- MX = riak_object:get_metadata(X),
- {ok, IX} = dict:find(<<"index">>, MX),
- {value, TSX} = lists:keysearch(
- <<"timestamp_bin">>, 1,
- IX),
- MY = riak_object:get_metadata(Y),
- {ok, IY} = dict:find(<<"index">>, MY),
- {value, TSY} = lists:keysearch(
- <<"timestamp_bin">>, 1,
- IY),
- TSX =< TSY
- end, Rs),
- Ls ++ lists:flatmap(
- fun(R) ->
- Key = riak_object:key(R),
- ejabberd_riak:delete(LServer, <<"offline">>, Key),
- XML = riak_object:get_value(R),
- case xml_stream:parse_element(XML) of
- {error, _Reason} ->
- [];
- El ->
- case offline_msg_to_route(LServer, El) of
- error ->
- [];
- RouteMsg ->
- [RouteMsg]
- end
- end
- end, SortedRs);
+ try
+ lists:foreach(
+ fun(#offline_msg{timestamp = T}) ->
+ ok = ejabberd_riak:delete(offline_msg, T)
+ end, Rs),
+ TS = now(),
+ Ls ++ lists:map(
+ fun (R) ->
+ offline_msg_to_route(LServer, R)
+ end,
+ lists:filter(
+ fun(R) ->
+ case R#offline_msg.expire of
+ never -> true;
+ TimeStamp -> TS < TimeStamp
+ end
+ end,
+ lists:keysort(#offline_msg.timestamp, Rs)))
+ catch _:{badmatch, _} ->
+ Ls
+ end;
_ ->
Ls
end.
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username);
remove_user(LUser, LServer, riak) ->
- Username = LUser,
- case ejabberd_riak:get_keys_by_index(
- LServer, <<"offline">>, <<"user_bin">>, Username) of
- {ok, Keys} ->
- lists:foreach(
- fun(Key) ->
- ejabberd_riak:delete(LServer, <<"offline">>, Key)
- end, Keys);
- _ ->
- ok
- end.
+ {atomic, ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer})}.
jid_to_binary(#jid{user = U, server = S, resource = R,
luser = LU, lserver = LS, lresource = LR}) ->
get_offline_els(LUser, LServer, mnesia) ->
Msgs = read_all_msgs(LUser, LServer, mnesia),
+get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak ->
+ Msgs = read_all_msgs(LUser, LServer, DBType),
lists:map(
fun(Msg) ->
{route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
US = {LUser, LServer},
lists:keysort(#offline_msg.timestamp,
mnesia:dirty_read({offline_msg, US}));
+read_all_msgs(LUser, LServer, riak) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ lists:keysort(#offline_msg.timestamp, Rs);
+ _Err ->
+ []
+ end;
read_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
_ -> []
end.
-format_user_queue(Msgs, mnesia) ->
+format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
lists:map(fun (#offline_msg{timestamp = TimeStamp,
from = From, to = To,
packet =
ok;
false -> nothing
end;
+user_queue_parse_query(LUser, LServer, Query, riak) ->
+ case lists:keysearch(<<"delete">>, 1, Query) of
+ {value, _} ->
+ Msgs = read_all_msgs(LUser, LServer, riak),
+ lists:foreach(
+ fun (Msg) ->
+ ID = jlib:encode_base64((term_to_binary(Msg))),
+ case lists:member({<<"selected">>, ID}, Query) of
+ true ->
+ ejabberd_riak:delete(offline_msg,
+ Msg#offline_msg.timestamp);
+ false ->
+ ok
+ end
+ end,
+ Msgs),
+ ok;
+ false ->
+ nothing
+ end;
user_queue_parse_query(LUser, LServer, Query, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case lists:keysearch(<<"delete">>, 1, Query) of
get_queue_length(LUser, LServer, mnesia) ->
length(mnesia:dirty_read({offline_msg,
{LUser, LServer}}));
+get_queue_length(LUser, LServer, riak) ->
+ case ejabberd_riak:count_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}) of
+ {ok, N} ->
+ N;
+ _ ->
+ 0
+ end;
get_queue_length(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
get_messages_subset2(Max, Length, MsgsAll, _DBType)
when Length =< Max * 2 ->
MsgsAll;
-get_messages_subset2(Max, Length, MsgsAll, mnesia) ->
+get_messages_subset2(Max, Length, MsgsAll, DBType)
+ when DBType == mnesia; DBType == riak ->
FirstN = Max,
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
mnesia:dirty_read({offline_msg, US}))
end,
mnesia:transaction(F);
+delete_all_msgs(LUser, LServer, riak) ->
+ Res = ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}),
+ {atomic, Res};
delete_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username),
Acc.
%% Returns as integer the number of offline messages for a given user
-count_offline_messages(LUser, LServer) ->
+count_offline_messages(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ DBType = gen_mod:db_type(LServer, ?MODULE),
+ count_offline_messages(LUser, LServer, DBType).
+
+count_offline_messages(LUser, LServer, mnesia) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ p1_mnesia:count_records(offline_msg,
+ #offline_msg{us = US, _ = '_'})
+ end,
+ case catch mnesia:async_dirty(F) of
+ I when is_integer(I) -> I;
+ _ -> 0
+ end;
+count_offline_messages(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
- case catch odbc_queries:count_records_where(
- LServer, "spool",
- <<"where username='", Username/binary, "'">>) of
- {selected, [_], [[Res]]} ->
- jlib:binary_to_integer(Res);
+ case catch odbc_queries:count_records_where(LServer,
+ <<"spool">>,
+ <<"where username='",
+ Username/binary, "'">>)
+ of
+ {selected, [_], [[Res]]} ->
+ jlib:binary_to_integer(Res);
+ _ -> 0
+ end;
+count_offline_messages(LUser, LServer, riak) ->
+ case ejabberd_riak:count_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Res} ->
+ Res;
_ ->
0
- end.
+ end;
+count_offline_messages(_Acc, User, Server) ->
+ N = count_offline_messages(User, Server),
+ {stop, N}.
export(_Server) ->
[{offline_msg,
Lists),
{Default, LItems}
end;
+process_lists_get(LUser, LServer, _Active, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists}} ->
+ LItems = lists:map(fun ({N, _}) ->
+ #xmlel{name = <<"list">>,
+ attrs = [{<<"name">>, N}],
+ children = []}
+ end,
+ Lists),
+ {Default, LItems};
+ {error, notfound} ->
+ {none, []};
+ {error, _} ->
+ error
+ end;
process_lists_get(LUser, LServer, _Active, odbc) ->
Default = case catch sql_get_default_privacy_list(LUser,
LServer)
_ -> not_found
end
end;
+process_list_get(LUser, LServer, Name, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{lists = Lists}} ->
+ case lists:keysearch(Name, 1, Lists) of
+ {value, {_, List}} -> List;
+ _ -> not_found
+ end;
+ {error, notfound} ->
+ not_found;
+ {error, _} ->
+ error
+ end;
process_list_get(LUser, LServer, Name, odbc) ->
case catch sql_get_privacy_list_id(LUser, LServer, Name)
of
end
end,
mnesia:transaction(F);
+process_default_set(LUser, LServer, {value, Name}, riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{lists = Lists} = P} ->
+ case lists:keymember(Name, 1, Lists) of
+ true ->
+ ejabberd_riak:put(P#privacy{default = Name,
+ lists = Lists});
+ false ->
+ not_found
+ end;
+ {error, _} ->
+ not_found
+ end};
process_default_set(LUser, LServer, {value, Name},
odbc) ->
F = fun () ->
end
end,
mnesia:transaction(F);
+process_default_set(LUser, LServer, false, riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, R} ->
+ ejabberd_riak:put(R#privacy{default = none});
+ {error, _} ->
+ ok
+ end};
process_default_set(LUser, LServer, false, odbc) ->
case catch sql_unset_default_privacy_list(LUser,
LServer)
false -> error
end
end;
+process_active_set(LUser, LServer, Name, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{lists = Lists}} ->
+ case lists:keysearch(Name, 1, Lists) of
+ {value, {_, List}} -> List;
+ false -> error
+ end;
+ {error, _} ->
+ error
+ end;
process_active_set(LUser, LServer, Name, odbc) ->
case catch sql_get_privacy_list_id(LUser, LServer, Name)
of
end
end,
mnesia:transaction(F);
+remove_privacy_list(LUser, LServer, Name, riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists} = P} ->
+ if Name == Default ->
+ conflict;
+ true ->
+ NewLists = lists:keydelete(Name, 1, Lists),
+ ejabberd_riak:put(P#privacy{lists = NewLists})
+ end;
+ {error, _} ->
+ ok
+ end};
remove_privacy_list(LUser, LServer, Name, odbc) ->
F = fun () ->
case sql_get_default_privacy_list_t(LUser) of
end
end,
mnesia:transaction(F);
+set_privacy_list(LUser, LServer, Name, List, riak) ->
+ {atomic,
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{lists = Lists} = P} ->
+ NewLists1 = lists:keydelete(Name, 1, Lists),
+ NewLists = [{Name, List} | NewLists1],
+ ejabberd_riak:put(P#privacy{lists = NewLists});
+ {error, _} ->
+ NewLists = [{Name, List}],
+ ejabberd_riak:put(#privacy{us = {LUser, LServer},
+ lists = NewLists})
+ end};
set_privacy_list(LUser, LServer, Name, List, odbc) ->
RItems = lists:map(fun item_to_raw/1, List),
F = fun () ->
end;
_ -> {none, []}
end;
+get_user_list(_, LUser, LServer, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{default = Default, lists = Lists}} ->
+ case Default of
+ none -> {none, []};
+ _ ->
+ case lists:keysearch(Default, 1, Lists) of
+ {value, {_, List}} -> {Default, List};
+ _ -> {none, []}
+ end
+ end;
+ {error, _} ->
+ {none, []}
+ end;
get_user_list(_, LUser, LServer, odbc) ->
case catch sql_get_default_privacy_list(LUser, LServer)
of
_ ->
error
end;
+get_user_lists(LUser, LServer, riak) ->
+ case ejabberd_riak:get(privacy, {LUser, LServer}) of
+ {ok, #privacy{} = P} ->
+ {ok, P};
+ {error, _} ->
+ error
+ end;
get_user_lists(LUser, LServer, odbc) ->
Default = case catch sql_get_default_privacy_list(LUser, LServer) of
{selected, [<<"name">>], []} ->
F = fun () -> mnesia:delete({privacy, {LUser, LServer}})
end,
mnesia:transaction(F);
+remove_user(LUser, LServer, riak) ->
+ {atomic, ejabberd_riak:delete(privacy, {LUser, LServer})};
remove_user(LUser, LServer, odbc) ->
sql_del_privacy_lists(LUser, LServer).
odbc_queries:set_private_data(LServer, Username, LXMLNS,
SData);
set_data(LUser, LServer, {XMLNS, El}, riak) ->
- Username = LUser,
- Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
- SData = xml:element_to_binary(El),
- ejabberd_riak:put(
- LServer, <<"private">>, Key, SData,
- [{<<"user_bin">>, Username}]),
- ok.
+ ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS},
+ xml = El},
+ [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
get_data(LUser, LServer, Data) ->
get_data(LUser, LServer,
end;
get_data(LUser, LServer, riak, [{XMLNS, El} | Els],
Res) ->
- Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
- case ejabberd_riak:get(LServer, <<"private">>, Key) of
- {ok, SData} ->
- case xml_stream:parse_element(SData) of
- Data when element(1, Data) == xmlelement ->
- get_data(LUser, LServer, riak, Els, [Data | Res])
- end;
- _ ->
- get_data(LUser, LServer, riak, Els, [El | Res])
+ case ejabberd_riak:get(private_storage, {LUser, LServer, XMLNS}) of
+ {ok, #private_storage{xml = NewEl}} ->
+ get_data(LUser, LServer, riak, Els, [NewEl|Res]);
+ _ ->
+ get_data(LUser, LServer, riak, Els, [El|Res])
end.
-
get_data(LUser, LServer) ->
get_all_data(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
[]
end;
get_all_data(LUser, LServer, riak) ->
- Username = LUser,
case ejabberd_riak:get_by_index(
- LServer, <<"private">>, <<"user_bin">>, Username) of
+ private_storage, <<"us">>, {LUser, LServer}) of
{ok, Res} ->
- lists:flatmap(
- fun(SData) ->
- case xml_stream:parse_element(SData) of
- #xmlel{} = El ->
- [El];
- _ ->
- []
- end
- end, Res);
+ [El || #private_storage{xml = El} <- Res];
_ ->
[]
end.
odbc_queries:del_user_private_storage(LServer,
Username);
remove_user(LUser, LServer, riak) ->
- Username = LUser,
- case ejabberd_riak:get_keys_by_index(
- LServer, <<"private">>, <<"user_bin">>, Username) of
- {ok, Keys} ->
- lists:foreach(
- fun(Key) ->
- ejabberd_riak:delete(LServer, <<"private">>, Key)
- end, Keys);
- _ ->
- ok
- end.
+ {atomic, ejabberd_riak:delete_by_index(private_storage,
+ <<"us">>, {LUser, LServer})}.
update_table() ->
Fields = record_info(fields, private_storage),
{selected, [<<"version">>], []} -> error
end;
read_roster_version(LServer, LUser, riak) ->
- Username = LUser,
- case ejabberd_riak:get(LServer, <<"roster_version">>,
- Username) of
+ case ejabberd_riak:get(roster_version, {LUser, LServer}) of
{ok, Version} -> Version;
- {error, notfound} -> error
+ _Err -> error
end.
write_roster_version(LUser, LServer) ->
end;
write_roster_version(LUser, LServer, _InTransaction, Ver,
riak) ->
- Username = LUser,
- riak_set_roster_version(LServer, Username, Ver).
+ US = {LUser, LServer},
+ ejabberd_riak:put(#roster_version{us = US, version = Ver}).
%% Load roster from DB only if neccesary.
%% It is neccesary if
Items when is_list(Items)-> Items;
_ -> []
end;
+get_roster(LUser, LServer, riak) ->
+ case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
+ {ok, Items} -> Items;
+ _Err -> []
+ end;
get_roster(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch odbc_queries:get_roster(LServer, Username) of
Items),
RItems;
_ -> []
- end;
-get_roster(LUser, LServer, riak) ->
- Username = LUser,
- case catch riak_get_roster(LServer, Username) of
- {ok, Items} when is_list(Items) ->
- JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
- {ok, JGrps} when is_list(JGrps) ->
- JGrps;
- _ ->
- []
- end,
- GroupsDict = dict:from_list(JIDGroups),
- RItems = lists:flatmap(
- fun(I) ->
- case riak_raw_to_record(LServer, I) of
- %% Bad JID in database:
- error ->
- [];
- R ->
- SJID = jlib:jid_to_string(R#roster.jid),
- Groups =
- case dict:find(SJID, GroupsDict) of
- {ok, Gs} -> Gs;
- error -> []
- end,
- [R#roster{groups = Groups}]
- end
- end, Items),
- RItems;
- _ ->
- []
end.
item_to_xml(Item) ->
end
end;
get_roster_by_jid_t(LUser, LServer, LJID, riak) ->
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- Res = riak_get_roster_by_jid(LServer, Username, SJID),
- case Res of
- {error, _} ->
- #roster{usj = {LUser, LServer, LJID},
- us = {LUser, LServer},
- jid = LJID};
+ case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
{ok, I} ->
- R = riak_raw_to_record(LServer, I),
- case R of
- %% Bad JID in database:
- error ->
- #roster{usj = {LUser, LServer, LJID},
- us = {LUser, LServer},
- jid = LJID};
- _ ->
- R#roster{
- usj = {LUser, LServer, LJID},
- us = {LUser, LServer},
- jid = LJID,
- name = ""}
- end
+ I#roster{jid = LJID, name = <<"">>, groups = [],
+ xs = []};
+ {error, notfound} ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer}, jid = LJID};
+ Err ->
+ exit(Err)
end.
try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
_ -> []
end;
get_subscription_lists(_, LUser, LServer, riak) ->
- Username = LUser,
- case catch riak_get_roster(LServer, Username) of
- {ok, Items} when is_list(Items) ->
- lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items);
- _ ->
- []
+ case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
+ {ok, Items} -> Items;
+ _Err -> []
end.
fill_subscription_lists(LServer, [#roster{} = I | Is],
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
odbc_queries:roster_subscribe(LServer, Username, SJID,
ItemVals);
-roster_subscribe_t(LUser, LServer, LJID, Item, riak) ->
- ItemVals = riak_record_to_string(Item),
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- riak_roster_subscribe(LServer, Username, SJID, ItemVals).
+roster_subscribe_t(LUser, LServer, _LJID, Item, riak) ->
+ ejabberd_riak:put(Item,
+ [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
transaction(LServer, F) ->
case gen_mod:db_type(LServer, ?MODULE) of
us = {LUser, LServer}, jid = LJID}
end;
get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) ->
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- case riak_get_roster_by_jid(LServer, Username, SJID) of
+ case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
{ok, I} ->
- R = riak_raw_to_record(LServer, I),
- Groups =
- case riak_get_roster_groups(LServer, Username, SJID) of
- {ok, JGrps} when is_list(JGrps) ->
- JGrps;
- _ ->
- []
- end,
- R#roster{groups = Groups};
- {error, _} ->
+ I;
+ {error, notfound} ->
#roster{usj = {LUser, LServer, LJID},
- us = {LUser, LServer},
- jid = LJID}
+ us = {LUser, LServer}, jid = LJID};
+ Err ->
+ exit(Err)
end.
process_subscription(Direction, User, Server, JID1,
odbc_queries:del_user_roster_t(LServer, Username),
ok;
remove_user(LUser, LServer, riak) ->
- Username = LUser,
- riak_del_user_roster(LServer, Username),
- ok.
+ {atomic, ejabberd_riak:delete_by_index(roster, <<"us">>, {LUser, LServer})}.
%% For each contact with Subscription:
%% Both or From, send a "unsubscribed" presence stanza;
ItemGroups = groups_to_string(Item),
odbc_queries:update_roster(LServer, Username, SJID, ItemVals,
ItemGroups);
-update_roster_t(LUser, LServer, LJID, Item, riak) ->
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- ItemVals = riak_record_to_string(Item),
- ItemGroups = riak_groups_to_binary(Item),
- riak_update_roster(
- LServer, Username, SJID, ItemVals, ItemGroups).
+update_roster_t(LUser, LServer, _LJID, Item, riak) ->
+ ejabberd_riak:put(Item,
+ [{'2i', [{<<"us">>, {LUser, LServer}}]}]).
del_roster_t(LUser, LServer, LJID) ->
DBType = gen_mod:db_type(LServer, ?MODULE),
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
odbc_queries:del_roster(LServer, Username, SJID);
del_roster_t(LUser, LServer, LJID, riak) ->
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- riak_del_roster(LServer, Username, SJID).
+ ejabberd_riak:delete(roster, {LUser, LServer, LJID}).
process_item_set_t(LUser, LServer,
#xmlel{attrs = Attrs, children = Els}) ->
get_in_pending_subscriptions(Ls, User, Server,
gen_mod:db_type(LServer, ?MODULE)).
-get_in_pending_subscriptions(Ls, User, Server,
- mnesia) ->
+get_in_pending_subscriptions(Ls, User, Server, DBType)
+ when DBType == mnesia; DBType == riak ->
JID = jlib:make_jid(User, Server, <<"">>),
- US = {JID#jid.luser, JID#jid.lserver},
- case mnesia:dirty_index_read(roster, US, #roster.us) of
- Result when is_list(Result) ->
- Ls ++
- lists:map(fun (R) ->
- Message = R#roster.askmessage,
- Status = if is_binary(Message) -> (Message);
- true -> <<"">>
- end,
- #xmlel{name = <<"presence">>,
- attrs =
- [{<<"from">>,
- jlib:jid_to_string(R#roster.jid)},
- {<<"to">>, jlib:jid_to_string(JID)},
- {<<"type">>, <<"subscribe">>}],
- children =
- [#xmlel{name = <<"status">>,
- attrs = [],
- children =
- [{xmlcdata, Status}]}]}
- end,
- lists:filter(fun (R) ->
- case R#roster.ask of
- in -> true;
- both -> true;
- _ -> false
- end
- end,
- Result));
- _ -> Ls
- end;
+ Result = get_roster(JID#jid.luser, JID#jid.lserver, DBType),
+ Ls ++ lists:map(fun (R) ->
+ Message = R#roster.askmessage,
+ Status = if is_binary(Message) -> (Message);
+ true -> <<"">>
+ end,
+ #xmlel{name = <<"presence">>,
+ attrs =
+ [{<<"from">>,
+ jlib:jid_to_string(R#roster.jid)},
+ {<<"to">>, jlib:jid_to_string(JID)},
+ {<<"type">>, <<"subscribe">>}],
+ children =
+ [#xmlel{name = <<"status">>,
+ attrs = [],
+ children =
+ [{xmlcdata, Status}]}]}
+ end,
+ lists:filter(fun (R) ->
+ case R#roster.ask of
+ in -> true;
+ both -> true;
+ _ -> false
+ end
+ end,
+ Result));
get_in_pending_subscriptions(Ls, User, Server, odbc) ->
JID = jlib:make_jid(User, Server, <<"">>),
LUser = JID#jid.luser,
end,
Items));
_ -> Ls
- end;
-get_in_pending_subscriptions(Ls, User, Server, riak) ->
- JID = jlib:make_jid(User, Server, <<"">>),
- LUser = JID#jid.luser,
- LServer = JID#jid.lserver,
- Username = LUser,
- case catch riak_get_roster(LServer, Username) of
- {ok, Items} when is_list(Items) ->
- Ls ++ lists:map(
- fun(R) ->
- Message = R#roster.askmessage,
- #xmlel{name = <<"presence">>,
- attrs = [{<<"from">>,
- jlib:jid_to_string(R#roster.jid)},
- {<<"to">>, jlib:jid_to_string(JID)},
- {<<"type">>, <<"subscribe">>}],
- children = [#xmlel{name = <<"status">>,
- attrs = [],
- children =
- [{xmlcdata, Message}]}]}
- end,
- lists:flatmap(
- fun(I) ->
- case riak_raw_to_record(LServer, I) of
- %% Bad JID in database:
- error ->
- [];
- R ->
- case R#roster.ask of
- in -> [R];
- both -> [R];
- _ -> []
- end
- end
- end,
- Items));
- _ ->
- Ls
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
end;
read_subscription_and_groups(LUser, LServer, LJID,
riak) ->
- Username = LUser,
- SJID = jlib:jid_to_string(LJID),
- case catch riak_get_subscription(LServer, Username, SJID) of
- {ok, Subscription} ->
- Groups = case riak_get_roster_jid_groups(LServer, Username) of
- {ok, JGrps} when is_list(JGrps) ->
- JGrps;
- _ ->
- []
- end,
- {Subscription, Groups};
- _ -> error
+ case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
+ {ok, #roster{subscription = Subscription,
+ groups = Groups}} ->
+ {Subscription, Groups};
+ _ ->
+ error
end.
get_jid_info(_, User, Server, JID) ->
mnesia:dirty_select(sr_group,
[{#sr_group{group_host = {'$1', '$2'}, _ = '_'},
[{'==', '$2', Host}], ['$1']}]);
+list_groups(Host, riak) ->
+ case ejabberd_riak:get_keys_by_index(sr_group, <<"host">>, Host) of
+ {ok, Gs} ->
+ [G || {G, _} <- Gs];
+ _ ->
+ []
+ end;
list_groups(Host, odbc) ->
case ejabberd_odbc:sql_query(Host,
[<<"select name from sr_group;">>])
_ = '_'},
[], [['$1', '$2']]}]),
lists:map(fun ([G, O]) -> {G, O} end, Gs);
+groups_with_opts(Host, riak) ->
+ case ejabberd_riak:get_by_index(sr_group, <<"host">>, Host) of
+ {ok, Rs} ->
+ [{G, O} || #sr_group{group_host = {G, _}, opts = O} <- Rs];
+ _ ->
+ []
+ end;
groups_with_opts(Host, odbc) ->
case ejabberd_odbc:sql_query(Host,
[<<"select name, opts from sr_group;">>])
R = #sr_group{group_host = {Group, Host}, opts = Opts},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
+create_group(Host, Group, Opts, riak) ->
+ {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
+ opts = Opts},
+ [{'2i', [{<<"host">>, Host}]}])};
create_group(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
SOpts = ejabberd_odbc:encode_term(Opts),
Users)
end,
mnesia:transaction(F);
+delete_group(Host, Group, riak) ->
+ try
+ ok = ejabberd_riak:delete(sr_group, {Group, Host}),
+ ok = ejabberd_riak:delete_by_index(sr_user, <<"group_host">>,
+ {Group, Host}),
+ {atomic, ok}
+ catch _:{badmatch, Err} ->
+ {atomic, Err}
+ end;
delete_group(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
F = fun () ->
[#sr_group{opts = Opts}] -> Opts;
_ -> error
end;
+get_group_opts(Host, Group, riak) ->
+ case ejabberd_riak:get(sr_group, {Group, Host}) of
+ {ok, #sr_group{opts = Opts}} -> Opts;
+ _ -> error
+ end;
get_group_opts(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
case catch ejabberd_odbc:sql_query(Host,
R = #sr_group{group_host = {Group, Host}, opts = Opts},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
+set_group_opts(Host, Group, Opts, riak) ->
+ {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
+ opts = Opts},
+ [{'2i', [{<<"host">>, Host}]}])};
set_group_opts(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
SOpts = ejabberd_odbc:encode_term(Opts),
|| #sr_user{group_host = {Group, H}} <- Rs, H == Host];
_ -> []
end;
+get_user_groups(US, Host, riak) ->
+ case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
+ {ok, Rs} ->
+ [Group || #sr_user{group_host = {Group, H}} <- Rs, H == Host];
+ _ ->
+ []
+ end;
get_user_groups(US, Host, odbc) ->
SJID = make_jid_s(US),
case catch ejabberd_odbc:sql_query(Host,
Rs when is_list(Rs) -> [R#sr_user.us || R <- Rs];
_ -> []
end;
+get_group_explicit_users(Host, Group, riak) ->
+ case ejabberd_riak:get_by_index(sr_user, <<"group_host">>,
+ {Group, Host}) of
+ {ok, Rs} ->
+ [R#sr_user.us || R <- Rs];
+ _ ->
+ []
+ end;
get_group_explicit_users(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
case catch ejabberd_odbc:sql_query(Host,
H == LServer];
_ -> []
end;
+get_user_displayed_groups(LUser, LServer, GroupsOpts,
+ riak) ->
+ case ejabberd_riak:get_by_index(sr_user,
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ [{Group, proplists:get_value(Group, GroupsOpts, [])}
+ || #sr_user{group_host = {Group, _}} <- Rs];
+ _ ->
+ []
+ end;
get_user_displayed_groups(LUser, LServer, GroupsOpts,
odbc) ->
SJID = make_jid_s(LUser, LServer),
[] -> lists:member(US, get_group_users(Host, Group));
_ -> true
end;
+is_user_in_group(US, Group, Host, riak) ->
+ case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
+ {ok, Rs} ->
+ case lists:any(
+ fun(#sr_user{group_host = {G, H}}) ->
+ (Group == G) and (Host == H)
+ end, Rs) of
+ false ->
+ lists:member(US, get_group_users(Host, Group));
+ true ->
+ true
+ end;
+ _Err ->
+ false
+ end;
is_user_in_group(US, Group, Host, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),
R = #sr_user{us = US, group_host = {Group, Host}},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
+add_user_to_group(Host, US, Group, riak) ->
+ {atomic, ejabberd_riak:put(
+ #sr_user{us = US, group_host = {Group, Host}},
+ [{i, {US, {Group, Host}}},
+ {'2i', [{<<"us">>, US},
+ {<<"group_host">>, {Group, Host}}]}])};
add_user_to_group(Host, US, Group, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),
R = #sr_user{us = US, group_host = {Group, Host}},
F = fun () -> mnesia:delete_object(R) end,
mnesia:transaction(F);
+remove_user_from_group(Host, US, Group, riak) ->
+ {atomic, ejabberd_riak:delete(sr_group, {US, {Group, Host}})};
remove_user_from_group(Host, US, Group, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),
lbday, ctry, lctry, locality, llocality, email, lemail,
orgname, lorgname, orgunit, lorgunit}).
--record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()},
+-record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()} | binary(),
vcard = #xmlel{} :: xmlel()}).
-define(PROCNAME, ejabberd_mod_vcard).
_ -> error
end;
get_vcard(LUser, LServer, riak) ->
- Username = LUser,
- case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of
- {ok, SVCARD} ->
- case xml_stream:parse_element(SVCARD) of
- {error, _Reason} -> error;
- VCARD -> [VCARD]
- end;
+ case ejabberd_riak:get(vcard, {LUser, LServer}) of
+ {ok, R} ->
+ [R#vcard.vcard];
{error, notfound} ->
[];
_ ->
lorgunit = LOrgUnit})
end,
mnesia:transaction(F);
+ riak ->
+ US = {LUser, LServer},
+ ejabberd_riak:put(#vcard{us = US, vcard = VCARD},
+ [{'2i', [{<<"user">>, User},
+ {<<"luser">>, LUser},
+ {<<"fn">>, FN},
+ {<<"lfn">>, LFN},
+ {<<"family">>, Family},
+ {<<"lfamily">>, LFamily},
+ {<<"given">>, Given},
+ {<<"lgiven">>, LGiven},
+ {<<"middle">>, Middle},
+ {<<"lmiddle">>, LMiddle},
+ {<<"nickname">>, Nickname},
+ {<<"lnickname">>, LNickname},
+ {<<"bday">>, BDay},
+ {<<"lbday">>, LBDay},
+ {<<"ctry">>, CTRY},
+ {<<"lctry">>, LCTRY},
+ {<<"locality">>, Locality},
+ {<<"llocality">>, LLocality},
+ {<<"email">>, EMail},
+ {<<"lemail">>, LEMail},
+ {<<"orgname">>, OrgName},
+ {<<"lorgname">>, LOrgName},
+ {<<"orgunit">>, OrgUnit},
+ {<<"lorgunit">>, LOrgUnit}]}]);
odbc ->
Username = ejabberd_odbc:escape(User),
LUsername = ejabberd_odbc:escape(LUser),
SLGiven, SLLocality, SLMiddle,
SLNickname, SLOrgName, SLOrgUnit,
SLocality, SMiddle, SNickname, SOrgName,
- SOrgUnit, SVCARD, Username);
- riak ->
- Username = LUser,
- SVCARD = xml:element_to_binary(VCARD),
-
- ejabberd_riak:put(
- LServer, <<"vcard">>, Username, SVCARD,
- [{<<"bday_bin">>, LBDay},
- {<<"ctry_bin">>, LCTRY},
- {<<"email_bin">>, LEMail},
- {<<"fn_bin">>, LFN},
- {<<"family_bin">>, LFamily},
- {<<"given_bin">>, LGiven},
- {<<"locality_bin">>, LLocality},
- {<<"middle_bin">>, LMiddle},
- {<<"nickname_bin">>, LNickname},
- {<<"orgname_bin">>, LOrgName},
- {<<"orgunit_bin">>, LOrgUnit},
- {<<"user_bin">>, Username}])
+ SOrgUnit, SVCARD, Username)
end,
ejabberd_hooks:run(vcard_set, LServer,
[LUser, LServer, VCARD])
[<<"delete from vcard_search where lusername='">>,
Username, <<"';">>]]);
remove_user(LUser, LServer, riak) ->
- Username = LUser,
- ejabberd_riak:delete(LServer, <<"vcard">>, Username),
- ok.
+ {atomic, ejabberd_riak:delete(vcard, {LUser, LServer})}.
update_tables() ->
update_vcard_table(),
hash = Hash})
end,
mnesia:transaction(F);
+add_xupdate(LUser, LServer, Hash, riak) ->
+ {atomic, ejabberd_riak:put(#vcard_xupdate{us = {LUser, LServer},
+ hash = Hash})};
add_xupdate(LUser, LServer, Hash, odbc) ->
Username = ejabberd_odbc:escape(LUser),
SHash = ejabberd_odbc:escape(Hash),
[#vcard_xupdate{hash = Hash}] -> Hash;
_ -> undefined
end;
+get_xupdate(LUser, LServer, riak) ->
+ case ejabberd_riak:get(vcard_xupdate, {LUser, LServer}) of
+ {ok, #vcard_xupdate{hash = Hash}} -> Hash;
+ _ -> undefined
+ end;
get_xupdate(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case ejabberd_odbc:sql_query(LServer,
mnesia:delete({vcard_xupdate, {LUser, LServer}})
end,
mnesia:transaction(F);
+remove_xupdate(LUser, LServer, riak) ->
+ {atomic, ejabberd_riak:delete(vcard_xupdate, {LUser, LServer})};
remove_xupdate(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
F = fun () ->