]> granicus.if.org Git - ejabberd/commitdiff
Preliminary Riak support
authorAlexey Shchepin <alexey@process-one.net>
Wed, 25 Jan 2012 10:02:16 +0000 (12:02 +0200)
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>
Thu, 10 Jul 2014 09:04:39 +0000 (13:04 +0400)
src/ejabberd_riak.erl [new file with mode: 0644]
src/ejabberd_riak_sup.erl [new file with mode: 0644]
src/mod_offline_riak.erl [new file with mode: 0644]
src/mod_private_riak.erl [new file with mode: 0644]
src/mod_roster_riak.erl [new file with mode: 0644]
src/mod_vcard_riak.erl [new file with mode: 0644]

diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl
new file mode 100644 (file)
index 0000000..ca7df68
--- /dev/null
@@ -0,0 +1,134 @@
+%%%----------------------------------------------------------------------
+%%% File    : ejabberd_riak.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Serve Riak connection
+%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_riak).
+-author('alexey@process-one.net').
+
+%% External exports
+-export([start_link/1,
+         put/4,
+         put/5,
+         get_object/3,
+         get/3,
+         get_objects_by_index/4,
+         get_by_index/4,
+         get_keys_by_index/4,
+         count_by_index/4,
+         delete/3]).
+
+-include("ejabberd.hrl").
+
+%%%----------------------------------------------------------------------
+%%% API
+%%%----------------------------------------------------------------------
+start_link(StartInterval) ->
+    {ok, Pid} = riakc_pb_socket:start_link(
+                  "127.0.0.1", 8081,
+                  [auto_reconnect]),
+    ejabberd_riak_sup:add_pid(Pid),
+    {ok, Pid}.
+
+make_bucket(Host, Table) ->
+    iolist_to_binary([Host, $@, Table]).
+
+put(Host, Table, Key, Value) ->
+    Bucket = make_bucket(Host, Table),
+    Obj = riakc_obj:new(Bucket, Key, Value),
+    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj).
+
+put(Host, Table, Key, Value, Indexes) ->
+    Bucket = make_bucket(Host, Table),
+    Obj = riakc_obj:new(Bucket, Key, Value),
+    MetaData = dict:store(<<"index">>, Indexes, dict:new()),
+    Obj2 = riakc_obj:update_metadata(Obj, MetaData),
+    riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2).
+
+get_object(Host, Table, Key) ->
+    Bucket = make_bucket(Host, Table),
+    riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+
+get(Host, Table, Key) ->
+    case get_object(Host, Table, Key) of
+        {ok, Obj} ->
+            {ok, riakc_obj:get_value(Obj)};
+        Error ->
+            Error
+    end.
+
+get_objects_by_index(Host, Table, Index, Key) ->
+    Bucket = make_bucket(Host, Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           {index, Bucket, Index, Key},
+           [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of
+        {ok, [{_, Objs}]} ->
+            {ok, Objs};
+        Error ->
+            Error
+    end.
+
+get_by_index(Host, Table, Index, Key) ->
+    Bucket = make_bucket(Host, Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           {index, Bucket, Index, Key},
+           [{map, {modfun, riak_kv_mapreduce, map_object_value},
+             none, true}]) of
+        {ok, [{_, Objs}]} ->
+            {ok, Objs};
+        Error ->
+            Error
+    end.
+
+get_keys_by_index(Host, Table, Index, Key) ->
+    Bucket = make_bucket(Host, Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           {index, Bucket, Index, Key},
+           []) of
+        {ok, [{_, Ls}]} ->
+            {ok, [K || {_, K} <- Ls]};
+        Error ->
+            Error
+    end.
+
+count_by_index(Host, Table, Index, Key) ->
+    Bucket = make_bucket(Host, Table),
+    case riakc_pb_socket:mapred(
+           ejabberd_riak_sup:get_random_pid(),
+           {index, Bucket, Index, Key},
+           [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+             none, true}]) of
+        {ok, [{_, [Cnt]}]} ->
+            {ok, Cnt};
+        Error ->
+            Error
+    end.
+
+delete(Host, Table, Key) ->
+    Bucket = make_bucket(Host, Table),
+    riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+
diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl
new file mode 100644 (file)
index 0000000..38fb202
--- /dev/null
@@ -0,0 +1,142 @@
+%%%----------------------------------------------------------------------
+%%% File    : ejabberd_riak_sup.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Riak connections supervisor
+%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_riak_sup).
+-author('alexey@process-one.net').
+
+%% API
+-export([start/0,
+         start_link/0,
+        init/1,
+        add_pid/1,
+        remove_pid/1,
+        get_pids/0,
+        get_random_pid/0
+       ]).
+
+-include("ejabberd.hrl").
+
+-define(DEFAULT_POOL_SIZE, 10).
+-define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds
+
+% time to wait for the supervisor to start its child before returning
+% a timeout error to the request
+-define(CONNECT_TIMEOUT, 500). % milliseconds
+
+
+-record(riak_pool, {undefined, pid}).
+
+start() ->
+    SupervisorName = ?MODULE,
+    ChildSpec =
+       {SupervisorName,
+        {?MODULE, start_link, []},
+        transient,
+        infinity,
+        supervisor,
+        [?MODULE]},
+    case supervisor:start_child(ejabberd_sup, ChildSpec) of
+       {ok, _PID} ->
+           ok;
+       _Error ->
+           ?ERROR_MSG("Start of supervisor ~p failed:~n~p~nRetrying...~n",
+                       [SupervisorName, _Error]),
+            timer:sleep(5000),
+           start()
+    end.
+
+start_link() ->
+    mnesia:create_table(riak_pool,
+                       [{ram_copies, [node()]},
+                        {type, bag},
+                        {local_content, true},
+                        {attributes, record_info(fields, riak_pool)}]),
+    mnesia:add_table_copy(riak_pool, node(), ram_copies),
+    F = fun() ->
+               mnesia:delete({riak_pool, undefined})
+       end,
+    mnesia:ets(F),
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    PoolSize =
+        case ejabberd_config:get_local_option(riak_pool_size) of
+            I when is_integer(I) ->
+                I;
+           undefined ->
+                ?DEFAULT_POOL_SIZE;
+            Other ->
+                ?ERROR_MSG("Wrong riak_pool_size definition '~p' "
+                           "default to ~p~n",
+                           [Other, ?DEFAULT_POOL_SIZE]),
+                ?DEFAULT_POOL_SIZE
+        end,
+    StartInterval =
+        case ejabberd_config:get_local_option(riak_start_interval) of
+            Interval when is_integer(Interval) ->
+                Interval;
+            undefined ->
+                ?DEFAULT_RIAK_START_INTERVAL;
+            _Other2 ->
+                ?ERROR_MSG("Wrong riak_start_interval "
+                           "definition '~p', "
+                           "defaulting to ~p~n",
+                           [_Other2,
+                            ?DEFAULT_RIAK_START_INTERVAL]),
+                ?DEFAULT_RIAK_START_INTERVAL
+        end,
+    {ok, {{one_for_one, PoolSize*10, 1},
+         lists:map(
+           fun(I) ->
+                   {I,
+                    {ejabberd_riak, start_link, [StartInterval*1000]},
+                    transient,
+                     2000,
+                    worker,
+                    [?MODULE]}
+           end, lists:seq(1, PoolSize))}}.
+
+get_pids() ->
+    Rs = mnesia:dirty_read(riak_pool, undefined),
+    [R#riak_pool.pid || R <- Rs].
+
+get_random_pid() ->
+    Pids = get_pids(),
+    lists:nth(erlang:phash(now(), length(Pids)), Pids).
+
+add_pid(Pid) ->
+    F = fun() ->
+               mnesia:write(
+                 #riak_pool{pid = Pid})
+       end,
+    mnesia:ets(F).
+
+remove_pid(Pid) ->
+    F = fun() ->
+               mnesia:delete_object(
+                 #riak_pool{pid = Pid})
+       end,
+    mnesia:ets(F).
diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl
new file mode 100644 (file)
index 0000000..433e583
--- /dev/null
@@ -0,0 +1,533 @@
+%%%----------------------------------------------------------------------
+%%% File    : mod_offline_riak.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Store and manage offline messages in Riak.
+%%% Created :  4 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(mod_offline_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([count_offline_messages/2]).
+
+-export([start/2,
+        init/2,
+        stop/1,
+        store_packet/3,
+        pop_offline_messages/3,
+        remove_user/2,
+        webadmin_page/3,
+        webadmin_user/4,
+        webadmin_user_parse_query/5,
+        count_offline_messages/3]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+-include("web/ejabberd_http.hrl").
+-include("web/ejabberd_web_admin.hrl").
+
+-record(offline_msg, {user, timestamp, expire, from, to, packet}).
+
+-define(PROCNAME, ejabberd_offline).
+-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
+
+start(Host, Opts) ->
+    ejabberd_hooks:add(offline_message_hook, Host,
+                      ?MODULE, store_packet, 50),
+    ejabberd_hooks:add(resend_offline_messages_hook, Host,
+                      ?MODULE, pop_offline_messages, 50),
+    ejabberd_hooks:add(remove_user, Host,
+                      ?MODULE, remove_user, 50),
+    ejabberd_hooks:add(anonymous_purge_hook, Host,
+                      ?MODULE, remove_user, 50),
+    ejabberd_hooks:add(webadmin_page_host, Host,
+                      ?MODULE, webadmin_page, 50),
+    ejabberd_hooks:add(webadmin_user, Host,
+                      ?MODULE, webadmin_user, 50),
+    ejabberd_hooks:add(webadmin_user_parse_query, Host,
+                       ?MODULE, webadmin_user_parse_query, 50),
+    ejabberd_hooks:add(count_offline_messages, Host,
+                       ?MODULE, count_offline_messages, 50),
+    MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity),
+    register(gen_mod:get_module_proc(Host, ?PROCNAME),
+            spawn(?MODULE, init, [Host, MaxOfflineMsgs])).
+
+%% MaxOfflineMsgs is either infinity of integer > 0
+init(Host, infinity) ->
+    loop(Host, infinity);
+init(Host, MaxOfflineMsgs) 
+  when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 ->
+    loop(Host, MaxOfflineMsgs).
+
+loop(Host, MaxOfflineMsgs) ->
+    receive
+       #offline_msg{user = User} = Msg ->
+           Msgs = receive_all(User, [Msg]),
+           Len = length(Msgs),
+
+           %% Only count existing messages if needed:
+           Count = if MaxOfflineMsgs =/= infinity ->
+                           Len + count_offline_messages(User, Host);
+                      true -> 0
+                   end,
+           if 
+               Count > MaxOfflineMsgs ->
+                   discard_warn_sender(Msgs);
+               true ->
+                   lists:foreach(
+                      fun(M) ->
+                              Username = list_to_binary(User),
+                              From = M#offline_msg.from,
+                              To = M#offline_msg.to,
+                              {xmlelement, Name, Attrs, Els} =
+                                  M#offline_msg.packet,
+                              Attrs2 = jlib:replace_from_to_attrs(
+                                         jlib:jid_to_string(From),
+                                         jlib:jid_to_string(To),
+                                         Attrs),
+                              Packet = {xmlelement, Name, Attrs2,
+                                        Els ++
+                                        [jlib:timestamp_to_xml(
+                                           calendar:now_to_universal_time(
+                                             M#offline_msg.timestamp))]},
+                              XML =
+                                  iolist_to_binary(
+                                    xml:element_to_string(Packet)),
+                              {MegaSecs, Secs, MicroSecs} =
+                                  M#offline_msg.timestamp,
+                              TS =
+                                  iolist_to_binary(
+                                    io_lib:format("~6..0w~6..0w.~6..0w",
+                                                  [MegaSecs, Secs, MicroSecs])),
+                              ejabberd_riak:put(
+                                Host, <<"offline">>,
+                                undefined, XML,
+                                [{<<"user_bin">>, Username},
+                                 {<<"timestamp_bin">>, TS}
+                                ])
+                      end, Msgs)
+           end,
+           loop(Host, MaxOfflineMsgs);
+       _ ->
+           loop(Host, MaxOfflineMsgs)
+    end.
+
+receive_all(Username, Msgs) ->
+    receive
+       #offline_msg{user=Username} = Msg ->
+           receive_all(Username, [Msg | Msgs])
+    after 0 ->
+           lists:reverse(Msgs)
+    end.
+
+
+stop(Host) ->
+    ejabberd_hooks:delete(offline_message_hook, Host,
+                         ?MODULE, store_packet, 50),
+    ejabberd_hooks:delete(resend_offline_messages_hook, Host,
+                         ?MODULE, pop_offline_messages, 50),
+    ejabberd_hooks:delete(remove_user, Host,
+                         ?MODULE, remove_user, 50),
+    ejabberd_hooks:delete(anonymous_purge_hook, Host,
+                         ?MODULE, remove_user, 50),
+    ejabberd_hooks:delete(webadmin_page_host, Host,
+                         ?MODULE, webadmin_page, 50),
+    ejabberd_hooks:delete(webadmin_user, Host,
+                         ?MODULE, webadmin_user, 50),
+    ejabberd_hooks:delete(webadmin_user_parse_query, Host,
+                          ?MODULE, webadmin_user_parse_query, 50),
+    Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+    exit(whereis(Proc), stop),
+    ok.
+
+store_packet(From, To, Packet) ->
+    Type = xml:get_tag_attr_s("type", Packet),
+    if
+       (Type /= "error") and (Type /= "groupchat") and
+       (Type /= "headline") ->
+           case check_event(From, To, Packet) of
+               true ->
+                   #jid{luser = LUser} = To,
+                   TimeStamp = now(),
+                   {xmlelement, _Name, _Attrs, Els} = Packet,
+                   Expire = find_x_expire(TimeStamp, Els),
+                   gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) !
+                       #offline_msg{user = LUser,
+                                    timestamp = TimeStamp,
+                                    expire = Expire,
+                                    from = From,
+                                    to = To,
+                                    packet = Packet},
+                   stop;
+               _ ->
+                   ok
+           end;
+       true ->
+           ok
+    end.
+
+check_event(From, To, Packet) ->
+    {xmlelement, Name, Attrs, Els} = Packet,
+    case find_x_event(Els) of
+       false ->
+           true;
+       El ->
+           case xml:get_subtag(El, "id") of
+               false ->
+                   case xml:get_subtag(El, "offline") of
+                       false ->
+                           true;
+                       _ ->
+                           ID = case xml:get_tag_attr_s("id", Packet) of
+                                    "" ->
+                                        {xmlelement, "id", [], []};
+                                    S ->
+                                        {xmlelement, "id", [],
+                                         [{xmlcdata, S}]}
+                                end,
+                           ejabberd_router:route(
+                             To, From, {xmlelement, Name, Attrs,
+                                        [{xmlelement, "x",
+                                          [{"xmlns", ?NS_EVENT}],
+                                          [ID,
+                                           {xmlelement, "offline", [], []}]}]
+                                       }),
+                           true
+                       end;
+               _ ->
+                   false
+           end
+    end.
+
+find_x_event([]) ->
+    false;
+find_x_event([{xmlcdata, _} | Els]) ->
+    find_x_event(Els);
+find_x_event([El | Els]) ->
+    case xml:get_tag_attr_s("xmlns", El) of
+       ?NS_EVENT ->
+           El;
+       _ ->
+           find_x_event(Els)
+    end.
+
+find_x_expire(_, []) ->
+    never;
+find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) ->
+    find_x_expire(TimeStamp, Els);
+find_x_expire(TimeStamp, [El | Els]) ->
+    case xml:get_tag_attr_s("xmlns", El) of
+       ?NS_EXPIRE ->
+           Val = xml:get_tag_attr_s("seconds", El),
+           case catch list_to_integer(Val) of
+               {'EXIT', _} ->
+                   never;
+               Int when Int > 0 ->
+                   {MegaSecs, Secs, MicroSecs} = TimeStamp,
+                   S = MegaSecs * 1000000 + Secs + Int,
+                   MegaSecs1 = S div 1000000,
+                   Secs1 = S rem 1000000,
+                   {MegaSecs1, Secs1, MicroSecs};
+               _ ->
+                   never
+           end;
+       _ ->
+           find_x_expire(TimeStamp, Els)
+    end.
+
+
+pop_offline_messages(Ls, User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    case ejabberd_riak:get_objects_by_index(
+           LServer, <<"offline">>, <<"user_bin">>, Username) of
+        {ok, Rs} ->
+            SortedRs =
+                lists:sort(fun(X, Y) ->
+                                   MX = riak_object:get_metadata(X),
+                                   {ok, IX} = dict:find(<<"index">>, MX),
+                                   {value, TSX} = lists:keysearch(
+                                                    <<"timestamp_bin">>, 1,
+                                                    IX),
+                                   MY = riak_object:get_metadata(Y),
+                                   {ok, IY} = dict:find(<<"index">>, MY),
+                                   {value, TSY} = lists:keysearch(
+                                                    <<"timestamp_bin">>, 1,
+                                                    IY),
+                                   TSX =< TSY
+                           end, Rs),
+            Ls ++ lists:flatmap(
+                   fun(R) ->
+                            Key = riak_object:key(R),
+                            ejabberd_riak:delete(LServer, <<"offline">>, Key),
+                            XML = riak_object:get_value(R),
+                           case xml_stream:parse_element(XML) of
+                               {error, _Reason} ->
+                                   [];
+                               El ->
+                                   To = jlib:string_to_jid(
+                                          xml:get_tag_attr_s("to", El)),
+                                   From = jlib:string_to_jid(
+                                            xml:get_tag_attr_s("from", El)),
+                                   if
+                                       (To /= error) and
+                                       (From /= error) ->
+                                           [{route, From, To, El}];
+                                       true ->
+                                           []
+                                   end
+                           end
+                   end, SortedRs);
+       _ ->
+           Ls
+    end.
+
+
+remove_user(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    case ejabberd_riak:get_keys_by_index(
+           LServer, <<"offline">>, <<"user_bin">>, Username) of
+        {ok, Keys} ->
+            lists:foreach(
+              fun(Key) ->
+                      ejabberd_riak:delete(LServer, <<"offline">>, Key)
+              end, Keys);
+        _ ->
+            ok
+    end.
+
+
+%% Helper functions:
+
+%% TODO: Warning - This function is a duplicate from mod_offline.erl
+%% It is duplicate to stay consistent (many functions are duplicated
+%% in this module). It will be refactored later on.
+%% Warn senders that their messages have been discarded:
+discard_warn_sender(Msgs) ->
+    lists:foreach(
+      fun(#offline_msg{from=From, to=To, packet=Packet}) ->
+             ErrText = "Your contact offline message queue is full. The message has been discarded.",
+             Lang = xml:get_tag_attr_s("xml:lang", Packet),
+             Err = jlib:make_error_reply(
+                     Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)),
+             ejabberd_router:route(
+               To,
+               From, Err)
+      end, Msgs).
+
+
+webadmin_page(_, Host,
+             #request{us = _US,
+                      path = ["user", U, "queue"],
+                      q = Query,
+                      lang = Lang} = _Request) ->
+    Res = user_queue(U, Host, Query, Lang),
+    {stop, Res};
+
+webadmin_page(Acc, _, _) -> Acc.
+
+user_queue(User, Server, Query, Lang) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = ejabberd_odbc:escape(LUser),
+    US = {LUser, LServer},
+    Res = user_queue_parse_query(Username, LServer, Query),
+    Msgs = case catch ejabberd_odbc:sql_query(
+                       LServer,
+                       ["select username, xml from spool"
+                        "  where username='", Username, "'"
+                        "  order by seq;"]) of
+              {selected, ["username", "xml"], Rs} ->
+                  lists:flatmap(
+                    fun({_, XML}) ->
+                            case xml_stream:parse_element(XML) of
+                                {error, _Reason} ->
+                                    [];
+                                El ->
+                                    [El]
+                            end
+                    end, Rs);
+              _ ->
+                  []
+          end,
+    FMsgs =
+       lists:map(
+         fun({xmlelement, _Name, _Attrs, _Els} = Msg) ->
+                 ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))),
+                 Packet = Msg,
+                 FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
+                 ?XE("tr",
+                     [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]),
+                      ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])]
+                    )
+         end, Msgs),
+    [?XC("h1", io_lib:format(?T("~s's Offline Messages Queue"),
+                            [us_to_list(US)]))] ++
+       case Res of
+           ok -> [?XREST("Submitted")];
+           nothing -> []
+       end ++
+       [?XAE("form", [{"action", ""}, {"method", "post"}],
+             [?XE("table",
+                  [?XE("thead",
+                       [?XE("tr",
+                            [?X("td"),
+                             ?XCT("td", "Packet")
+                            ])]),
+                   ?XE("tbody",
+                       if
+                           FMsgs == [] ->
+                               [?XE("tr",
+                                    [?XAC("td", [{"colspan", "4"}], " ")]
+                                   )];
+                           true ->
+                               FMsgs
+                       end
+                      )]),
+              ?BR,
+              ?INPUTT("submit", "delete", "Delete Selected")
+             ])].
+
+user_queue_parse_query(Username, LServer, Query) ->
+    case lists:keysearch("delete", 1, Query) of
+       {value, _} ->
+           Msgs = case catch ejabberd_odbc:sql_query(
+                               LServer,
+                               ["select xml, seq from spool"
+                                "  where username='", Username, "'"
+                                "  order by seq;"]) of
+                      {selected, ["xml", "seq"], Rs} ->
+                          lists:flatmap(
+                            fun({XML, Seq}) ->
+                                    case xml_stream:parse_element(XML) of
+                                        {error, _Reason} ->
+                                            [];
+                                        El ->
+                                            [{El, Seq}]
+                                    end
+                            end, Rs);
+                      _ ->
+                          []
+                  end,
+           F = fun() ->
+                       lists:foreach(
+                         fun({Msg, Seq}) ->
+                                 ID = jlib:encode_base64(
+                                        binary_to_list(term_to_binary(Msg))),
+                                 case lists:member({"selected", ID}, Query) of
+                                     true ->
+                                         SSeq = ejabberd_odbc:escape(Seq),
+                                         catch ejabberd_odbc:sql_query(
+                                                 LServer,
+                                                 ["delete from spool"
+                                                  "  where username='", Username, "'"
+                                                  "  and seq='", SSeq, "';"]);
+                                     false ->
+                                         ok
+                                 end
+                         end, Msgs)
+               end,
+           mnesia:transaction(F),
+           ok;
+       false ->
+           nothing
+    end.
+
+us_to_list({User, Server}) ->
+    jlib:jid_to_string({User, Server, ""}).
+
+webadmin_user(Acc, User, Server, Lang) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = ejabberd_odbc:escape(LUser),
+    QueueLen = case catch ejabberd_odbc:sql_query(
+                           LServer,
+                           ["select count(*) from spool"
+                            "  where username='", Username, "';"]) of
+                  {selected, [_], [{SCount}]} ->
+                      SCount;
+                  _ ->
+                      0
+              end,
+    FQueueLen = [?AC("queue/", QueueLen)],
+    Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")].
+
+webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) ->
+    case catch odbc_queries:del_spool_msg(Server, User) of
+         {'EXIT', Reason} ->
+            ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]),
+            {stop, error};
+         {error, Reason} ->
+            ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]),
+            {stop, error};
+         _ ->
+            ?INFO_MSG("Removed all offline messages for ~s@~s", [User, Server]),
+            {stop, ok}
+    end;
+webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) ->
+    Acc.
+
+%% ------------------------------------------------
+%% mod_offline: number of messages quota management
+
+%% Returns as integer the number of offline messages for a given user
+count_offline_messages(LUser, LServer) ->
+    Username = list_to_binary([LUser, $@, LServer]),
+    case catch ejabberd_riak:count_by_index(
+                 LServer, <<"offline">>, <<"user_bin">>, Username) of
+        {ok, Res} when is_integer(Res) ->
+            Res;
+        _ ->
+            0
+    end.
+
+count_offline_messages(_Acc, User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Num = case catch ejabberd_odbc:sql_query(
+                      LServer,
+                      ["select xml from spool"
+                       "  where username='", LUser, "';"]) of
+             {selected, ["xml"], Rs} ->
+                 lists:foldl(
+                   fun({XML}, Acc) ->
+                           case xml_stream:parse_element(XML) of
+                               {error, _Reason} ->
+                                   Acc;
+                               El ->
+                                   case xml:get_subtag(El, "body") of
+                                       false ->
+                                           Acc;
+                                       _ ->
+                                           Acc + 1
+                                   end
+                           end
+                   end, 0, Rs);
+             _ ->
+                 0
+         end,
+    {stop, Num}.
diff --git a/src/mod_private_riak.erl b/src/mod_private_riak.erl
new file mode 100644 (file)
index 0000000..e1b4f89
--- /dev/null
@@ -0,0 +1,139 @@
+%%%----------------------------------------------------------------------
+%%% File    : mod_private_riak.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Private storage support
+%%% Created :  6 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(mod_private_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([start/2,
+        stop/1,
+        process_sm_iq/3,
+        remove_user/2]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+start(Host, Opts) ->
+    IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
+    ejabberd_hooks:add(remove_user, Host,
+                      ?MODULE, remove_user, 50),
+    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PRIVATE,
+                                 ?MODULE, process_sm_iq, IQDisc).
+
+stop(Host) ->
+    ejabberd_hooks:delete(remove_user, Host,
+                         ?MODULE, remove_user, 50),
+    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PRIVATE).
+
+
+process_sm_iq(From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
+    #jid{luser = LUser, lserver = LServer} = From,
+    case lists:member(LServer, ?MYHOSTS) of
+       true ->
+           {xmlelement, Name, Attrs, Els} = SubEl,
+           case Type of
+               set ->
+                    lists:foreach(
+                      fun(El) ->
+                              set_data(LUser, LServer, El)
+                      end, Els),
+                   IQ#iq{type = result,
+                         sub_el = [{xmlelement, Name, Attrs, []}]};
+               get ->
+                   case catch get_data(LUser, LServer, Els) of
+                       {'EXIT', _Reason} ->
+                           IQ#iq{type = error,
+                                 sub_el = [SubEl,
+                                           ?ERR_INTERNAL_SERVER_ERROR]};
+                       Res ->
+                           IQ#iq{type = result,
+                                 sub_el = [{xmlelement, Name, Attrs, Res}]}
+                   end
+           end;
+       false ->
+           IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}
+    end.
+
+set_data(LUser, LServer, El) ->
+    case El of
+       {xmlelement, _Name, Attrs, _Els} ->
+           XMLNS = xml:get_attr_s("xmlns", Attrs),
+           case XMLNS of
+               "" ->
+                   ignore;
+               _ ->
+                    Username = list_to_binary(LUser),
+                    Key = list_to_binary([LUser, $@, LServer, $@, XMLNS]),
+                   SData = xml:element_to_binary(El),
+                    ejabberd_riak:put(
+                      LServer, <<"private">>, Key, SData,
+                      [{<<"user_bin">>, Username}]),
+                    ok
+           end;
+       _ ->
+           ignore
+    end.
+
+get_data(LUser, LServer, Els) ->
+    get_data(LUser, LServer, Els, []).
+
+get_data(_LUser, _LServer, [], Res) ->
+    lists:reverse(Res);
+get_data(LUser, LServer, [El | Els], Res) ->
+    case El of
+       {xmlelement, _Name, Attrs, _} ->
+           XMLNS = xml:get_attr_s("xmlns", Attrs),
+            Key = list_to_binary([LUser, $@, LServer, $@, XMLNS]),
+           case ejabberd_riak:get(LServer, <<"private">>, Key) of
+               {ok, SData} ->
+                   case xml_stream:parse_element(SData) of
+                       Data when element(1, Data) == xmlelement ->
+                           get_data(LUser, LServer, Els,
+                                    [Data | Res])
+                   end;
+                _ -> 
+                    get_data(LUser, LServer, Els, [El | Res])
+           end;
+       _ ->
+           get_data(LUser, LServer, Els, Res)
+    end.
+
+
+remove_user(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    case ejabberd_riak:get_keys_by_index(
+           LServer, <<"private">>, <<"user_bin">>, Username) of
+        {ok, Keys} ->
+            lists:foreach(
+              fun(Key) ->
+                      ejabberd_riak:delete(LServer, <<"private">>, Key)
+              end, Keys);
+        _ ->
+            ok
+    end.
diff --git a/src/mod_roster_riak.erl b/src/mod_roster_riak.erl
new file mode 100644 (file)
index 0000000..e66800b
--- /dev/null
@@ -0,0 +1,1310 @@
+%%%----------------------------------------------------------------------
+%%% File    : mod_roster_riak.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Roster management
+%%% Created :  6 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+%%% @doc Roster management (Mnesia storage).
+%%%
+%%% Includes support for XEP-0237: Roster Versioning.
+%%% The roster versioning follows an all-or-nothing strategy:
+%%%  - If the version supplied by the client is the latest, return an empty response.
+%%%  - If not, return the entire new roster (with updated version string).
+%%% Roster version is a hash digest of the entire roster.
+%%% No additional data is stored in DB.
+
+-module(mod_roster_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([start/2, stop/1,
+        process_iq/3,
+        process_local_iq/3,
+        get_user_roster/2,
+        get_subscription_lists/3,
+        get_in_pending_subscriptions/3,
+        in_subscription/6,
+        out_subscription/4,
+        set_items/3,
+        remove_user/2,
+        get_jid_info/4,
+        webadmin_page/3,
+        webadmin_user/4,
+        get_versioning_feature/2,
+        roster_versioning_enabled/1]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+-include("mod_roster.hrl").
+-include("web/ejabberd_http.hrl").
+-include("web/ejabberd_web_admin.hrl").
+
+
+start(Host, _Opts) ->
+    IQDisc = no_queue,
+    ejabberd_hooks:add(roster_get, Host,
+                      ?MODULE, get_user_roster, 50),
+    ejabberd_hooks:add(roster_in_subscription, Host,
+                      ?MODULE, in_subscription, 50),
+    ejabberd_hooks:add(roster_out_subscription, Host,
+                      ?MODULE, out_subscription, 50),
+    ejabberd_hooks:add(roster_get_subscription_lists, Host,
+                      ?MODULE, get_subscription_lists, 50),
+    ejabberd_hooks:add(roster_get_jid_info, Host,
+                      ?MODULE, get_jid_info, 50),
+    ejabberd_hooks:add(remove_user, Host,
+                      ?MODULE, remove_user, 50),
+    ejabberd_hooks:add(anonymous_purge_hook, Host,
+                      ?MODULE, remove_user, 50),
+    ejabberd_hooks:add(resend_subscription_requests_hook, Host,
+                      ?MODULE, get_in_pending_subscriptions, 50),
+    ejabberd_hooks:add(roster_get_versioning_feature, Host,
+                      ?MODULE, get_versioning_feature, 50),
+    ejabberd_hooks:add(webadmin_page_host, Host,
+                      ?MODULE, webadmin_page, 50),
+    ejabberd_hooks:add(webadmin_user, Host,
+                      ?MODULE, webadmin_user, 50),
+    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_ROSTER,
+                                 ?MODULE, process_iq, IQDisc).
+
+stop(Host) ->
+    ejabberd_hooks:delete(roster_get, Host,
+                         ?MODULE, get_user_roster, 50),
+    ejabberd_hooks:delete(roster_in_subscription, Host,
+                         ?MODULE, in_subscription, 50),
+    ejabberd_hooks:delete(roster_out_subscription, Host,
+                         ?MODULE, out_subscription, 50),
+    ejabberd_hooks:delete(roster_get_subscription_lists, Host,
+                         ?MODULE, get_subscription_lists, 50),
+    ejabberd_hooks:delete(roster_get_jid_info, Host,
+                         ?MODULE, get_jid_info, 50),
+    ejabberd_hooks:delete(remove_user, Host,
+                         ?MODULE, remove_user, 50),
+    ejabberd_hooks:delete(anonymous_purge_hook, Host,
+                         ?MODULE, remove_user, 50),
+    ejabberd_hooks:delete(resend_subscription_requests_hook, Host,
+                      ?MODULE, get_in_pending_subscriptions, 50),
+    ejabberd_hooks:delete(roster_get_versioning_feature, Host,
+                         ?MODULE, get_versioning_feature, 50),
+    ejabberd_hooks:delete(webadmin_page_host, Host,
+                         ?MODULE, webadmin_page, 50),
+    ejabberd_hooks:delete(webadmin_user, Host,
+                         ?MODULE, webadmin_user, 50),
+    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_ROSTER).
+
+
+process_iq(From, To, IQ) ->
+    #iq{sub_el = SubEl} = IQ,
+    #jid{lserver = LServer} = From,
+    case lists:member(LServer, ?MYHOSTS) of
+       true ->
+           process_local_iq(From, To, IQ);
+       _ ->
+           IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]}
+    end.
+
+process_local_iq(From, To, #iq{type = Type} = IQ) ->
+    case Type of
+       set ->
+           process_iq_set(From, To, IQ);
+       get ->
+           process_iq_get(From, To, IQ)
+    end.
+
+
+roster_hash(Items) ->
+    sha:sha(term_to_binary(
+              lists:sort(
+                [R#roster{groups = lists:sort(Grs)} || 
+                    R = #roster{groups = Grs} <- Items]))).
+
+roster_versioning_enabled(Host) ->
+    gen_mod:get_module_opt(Host, ?MODULE, versioning, false).
+
+roster_version_on_db(Host) ->
+    gen_mod:get_module_opt(Host, ?MODULE, store_current_id, false).
+
+%% Returns a list that may contain an xmlelement with the XEP-237 feature if it's enabled.
+get_versioning_feature(Acc, Host) ->
+    case roster_versioning_enabled(Host) of
+       true ->
+           Feature = {xmlelement,
+                      "ver",
+                      [{"xmlns", ?NS_ROSTER_VER}],
+                      [{xmlelement, "optional", [], []}]},
+           [Feature | Acc];
+       false -> []
+    end.
+
+roster_version(LServer, LUser) ->
+    US = {LUser, LServer},
+    Username = list_to_binary(LUser),
+    case roster_version_on_db(LServer) of
+        true ->
+            case ejabberd_riak:get(LServer, <<"roster_version">>,
+                                   Username) of
+                {ok, Version} -> Version;
+                {error, notfound} ->
+                    %% If for some reason we don't had it on DB. Create a version Id and store it.
+                    %% (we did the same on process_iq_get, that is called when client get roster,
+                    %%  not sure why it can still not be on DB at this point)
+                    RosterVersion = sha:sha(term_to_binary(now())),
+                    riak_set_roster_version(LServer, Username, RosterVersion),
+                    RosterVersion
+            end;
+        false ->
+            roster_hash(ejabberd_hooks:run_fold(roster_get, LServer, [], [US]))
+    end.
+
+%% Load roster from DB only if neccesary. 
+%% It is neccesary if
+%%     - roster versioning is disabled in server OR
+%%     - roster versioning is not used by the client OR
+%%     - roster versioning is used by server and client, BUT the server isn't storing versions on db OR
+%%     - the roster version from client don't match current version.
+process_iq_get(From, To, #iq{sub_el = SubEl} = IQ) ->
+    LUser = From#jid.luser,
+    LServer = From#jid.lserver,
+    US = {LUser, LServer},
+    Username = list_to_binary(LUser),
+
+    try
+        {ItemsToSend, VersionToSend} = 
+        case {xml:get_tag_attr("ver", SubEl), 
+              roster_versioning_enabled(LServer),
+              roster_version_on_db(LServer)} of
+            {{value, RequestedVersion}, true, true} ->
+                BRequestedVersion = iolist_to_binary(RequestedVersion),
+                %% Retrieve version from DB. Only load entire roster
+                %% when neccesary.
+                case ejabberd_riak:get(LServer, <<"roster_version">>,
+                                       Username) of
+                    {ok, BRequestedVersion} ->
+                        {false, false};
+                    {ok, NewVersion} ->
+                        {lists:map(fun item_to_xml/1, 
+                                   ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), NewVersion};
+                    {error, notfound} ->
+                        RosterVersion = sha:sha(term_to_binary(now())),
+                        ejabberd_riak:put(
+                          LServer, <<"roster_version">>,
+                          Username, list_to_binary(RosterVersion)),
+                        {lists:map(fun item_to_xml/1,
+                                   ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), RosterVersion}
+                end;
+
+            {{value, RequestedVersion}, true, false} ->
+                RosterItems = ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [] , [US]),
+                case roster_hash(RosterItems) of
+                    RequestedVersion ->
+                        {false, false};
+                    New ->
+                        {lists:map(fun item_to_xml/1, RosterItems), New}
+                end;
+
+            _ ->
+                {lists:map(fun item_to_xml/1, 
+                           ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), false}
+        end,
+        IQ#iq{type = result,
+              sub_el = case {ItemsToSend, VersionToSend} of
+                           {false, false} ->
+                               [];
+                           {Items, false} ->
+                               [{xmlelement, "query",
+                                 [{"xmlns", ?NS_ROSTER}],
+                                 Items}];
+                           {Items, Version} ->
+                               [{xmlelement, "query",
+                                 [{"xmlns", ?NS_ROSTER},
+                                  {"ver", Version}],
+                                 Items}]
+                       end}
+    catch
+       _:_ ->  
+            IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
+    end.
+
+
+get_user_roster(Acc, {LUser, LServer}) ->
+    Items = get_roster(LUser, LServer),
+    lists:filter(fun(#roster{subscription = none, ask = in}) ->
+                        false;
+                   (_) ->
+                        true
+                end, Items) ++ Acc.
+
+get_roster(LUser, LServer) ->
+    Username = list_to_binary(LUser),
+    case catch riak_get_roster(LServer, Username) of
+       {ok, Items} when is_list(Items) ->
+           JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
+                           {ok, JGrps} when is_list(JGrps) ->
+                               JGrps;
+                           _ ->
+                               []
+                       end,
+            GroupsDict = dict:from_list(JIDGroups),
+           RItems = lists:flatmap(
+                      fun(I) ->
+                              case raw_to_record(LServer, I) of
+                                  %% Bad JID in database:
+                                  error ->
+                                      [];
+                                  R ->
+                                      SJID = jlib:jid_to_string(R#roster.jid),
+                                      Groups =
+                                           case dict:find(SJID, GroupsDict) of
+                                               {ok, Gs} -> Gs;
+                                               error -> []
+                                           end,
+                                      [R#roster{groups = Groups}]
+                              end
+                      end, Items),
+           RItems;
+       _ ->
+           []
+    end.
+
+
+item_to_xml(Item) ->
+    Attrs1 = [{"jid", jlib:jid_to_string(Item#roster.jid)}],
+    Attrs2 = case Item#roster.name of
+                "" ->
+                    Attrs1;
+                Name ->
+                    [{"name", Name} | Attrs1]
+            end,
+    Attrs3 = case Item#roster.subscription of
+                none ->
+                    [{"subscription", "none"} | Attrs2];
+                from ->
+                    [{"subscription", "from"} | Attrs2];
+                to ->
+                    [{"subscription", "to"} | Attrs2];
+                both ->
+                    [{"subscription", "both"} | Attrs2];
+                remove ->
+                    [{"subscription", "remove"} | Attrs2]
+            end,
+    Attrs = case ask_to_pending(Item#roster.ask) of
+               out ->
+                   [{"ask", "subscribe"} | Attrs3];
+               both ->
+                   [{"ask", "subscribe"} | Attrs3];
+               _ ->
+                   Attrs3
+           end,
+    SubEls = lists:map(fun(G) ->
+                              {xmlelement, "group", [], [{xmlcdata, G}]}
+                      end, Item#roster.groups),
+    {xmlelement, "item", Attrs, SubEls}.
+
+
+process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
+    {xmlelement, _Name, _Attrs, Els} = SubEl,
+    lists:foreach(fun(El) -> process_item_set(From, To, El) end, Els),
+    IQ#iq{type = result, sub_el = []}.
+
+process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) ->
+    JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)),
+    #jid{user = User, luser = LUser, lserver = LServer} = From,
+    case JID1 of
+       error ->
+           ok;
+       _ ->
+           LJID = jlib:jid_tolower(JID1),
+           Username = list_to_binary(LUser),
+           SJID = list_to_binary(jlib:jid_to_string(LJID)),
+           F = fun() ->
+                       Res = riak_get_roster_by_jid(LServer, Username, SJID),
+                       Item = case Res of
+                                  {error, _} ->
+                                      #roster{usj = {LUser, LServer, LJID},
+                                              us = {LUser, LServer},
+                                              jid = LJID};
+                                  {ok, I} ->
+                                      R = raw_to_record(LServer, I),
+                                      case R of
+                                          %% Bad JID in database:
+                                          error ->
+                                              #roster{usj = {LUser, LServer, LJID},
+                                                      us = {LUser, LServer},
+                                                      jid = LJID};
+                                          _ ->
+                                              R#roster{
+                                                usj = {LUser, LServer, LJID},
+                                                us = {LUser, LServer},
+                                                jid = LJID,
+                                                name = ""}
+                                      end
+                              end,
+                       Item1 = process_item_attrs(Item, Attrs),
+                       Item2 = process_item_els(Item1, Els),
+                       case Item2#roster.subscription of
+                           remove ->
+                                riak_del_roster(LServer, Username, SJID);
+                           _ ->
+                               ItemVals = record_to_string(Item2),
+                               ItemGroups = groups_to_binary(Item2),
+                               riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups)
+                       end,
+                       %% If the item exist in shared roster, take the
+                       %% subscription information from there:
+                       Item3 = ejabberd_hooks:run_fold(roster_process_item,
+                                                       LServer, Item2, [LServer]),
+                       case roster_version_on_db(LServer) of
+                            true ->
+                                RosterVersion = sha:sha(term_to_binary(now())),
+                                riak_set_roster_version(
+                                  LServer, Username, RosterVersion);
+                            false -> ok
+                       end,
+                       {ok, Item, Item3}
+               end,
+           case catch F() of
+               {ok, OldItem, Item} ->
+                   push_item(User, LServer, To, Item),
+                   case Item#roster.subscription of
+                       remove ->
+                           send_unsubscribing_presence(From, OldItem),
+                           ok;
+                       _ ->
+                           ok
+                   end;
+               E ->
+                   ?ERROR_MSG("ROSTER: roster item set error: ~p~n", [E]),
+                   ok
+           end
+    end;
+process_item_set(_From, _To, _) ->
+    ok.
+
+process_item_attrs(Item, [{Attr, Val} | Attrs]) ->
+    case Attr of
+       "jid" ->
+           case jlib:string_to_jid(Val) of
+               error ->
+                   process_item_attrs(Item, Attrs);
+               JID1 ->
+                   JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+                   process_item_attrs(Item#roster{jid = JID}, Attrs)
+           end;
+       "name" ->
+           process_item_attrs(Item#roster{name = Val}, Attrs);
+       "subscription" ->
+           case Val of
+               "remove" ->
+                   process_item_attrs(Item#roster{subscription = remove},
+                                      Attrs);
+               _ ->
+                   process_item_attrs(Item, Attrs)
+           end;
+       "ask" ->
+           process_item_attrs(Item, Attrs);
+       _ ->
+           process_item_attrs(Item, Attrs)
+    end;
+process_item_attrs(Item, []) ->
+    Item.
+
+
+process_item_els(Item, [{xmlelement, Name, _Attrs, SEls} | Els]) ->
+    case Name of
+       "group" ->
+           Groups = [xml:get_cdata(SEls) | Item#roster.groups],
+           process_item_els(Item#roster{groups = Groups}, Els);
+       _ ->
+           process_item_els(Item, Els)
+    end;
+process_item_els(Item, [{xmlcdata, _} | Els]) ->
+    process_item_els(Item, Els);
+process_item_els(Item, []) ->
+    Item.
+
+
+push_item(User, Server, From, Item) ->
+    ejabberd_sm:route(jlib:make_jid("", "", ""),
+                     jlib:make_jid(User, Server, ""),
+                     {xmlelement, "broadcast", [],
+                      [{item,
+                        Item#roster.jid,
+                        Item#roster.subscription}]}),
+    case roster_versioning_enabled(Server) of
+       true ->
+               push_item_version(Server, User, From, Item, roster_version(Server, User));
+       false ->
+           lists:foreach(fun(Resource) ->
+                         push_item(User, Server, Resource, From, Item)
+                 end, ejabberd_sm:get_user_resources(User, Server))
+    end.
+
+% TODO: don't push to those who not load roster
+push_item(User, Server, Resource, From, Item) ->
+    ResIQ = #iq{type = set, xmlns = ?NS_ROSTER,
+               id = "push" ++ randoms:get_string(),
+               sub_el = [{xmlelement, "query",
+                          [{"xmlns", ?NS_ROSTER}],
+                          [item_to_xml(Item)]}]},
+    ejabberd_router:route(
+      From,
+      jlib:make_jid(User, Server, Resource),
+      jlib:iq_to_xml(ResIQ)).
+
+%% @doc Roster push, calculate and include the version attribute.
+%% TODO: don't push to those who didn't load roster
+push_item_version(Server, User, From, Item, RosterVersion)  ->
+    lists:foreach(fun(Resource) ->
+                         push_item_version(User, Server, Resource, From, Item, RosterVersion)
+               end, ejabberd_sm:get_user_resources(User, Server)).
+
+push_item_version(User, Server, Resource, From, Item, RosterVersion) ->
+    IQPush = #iq{type = 'set', xmlns = ?NS_ROSTER,
+                id = "push" ++ randoms:get_string(),
+                sub_el = [{xmlelement, "query",
+                           [{"xmlns", ?NS_ROSTER},
+                            {"ver", RosterVersion}],
+                           [item_to_xml(Item)]}]},
+    ejabberd_router:route(
+      From,
+      jlib:make_jid(User, Server, Resource),
+      jlib:iq_to_xml(IQPush)).
+
+get_subscription_lists(_, User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    case catch riak_get_roster(LServer, Username) of
+       {ok, Items} when is_list(Items) ->
+           fill_subscription_lists(LServer, Items, [], []);
+       _ ->
+           {[], []}
+    end.
+
+fill_subscription_lists(LServer, [RawI | Is], F, T) ->
+    I = raw_to_record(LServer, RawI),
+    case I of
+       %% Bad JID in database:
+       error ->
+           fill_subscription_lists(LServer, Is, F, T);
+       _ ->
+           J = I#roster.jid,
+           case I#roster.subscription of
+               both ->
+                   fill_subscription_lists(LServer, Is, [J | F], [J | T]);
+               from ->
+                   fill_subscription_lists(LServer, Is, [J | F], T);
+               to ->
+                   fill_subscription_lists(LServer, Is, F, [J | T]);
+               _ ->
+                   fill_subscription_lists(LServer, Is, F, T)
+           end
+    end;
+fill_subscription_lists(_LServer, [], F, T) ->
+    {F, T}.
+
+ask_to_pending(subscribe) -> out;
+ask_to_pending(unsubscribe) -> none;
+ask_to_pending(Ask) -> Ask.
+
+
+
+in_subscription(_, User, Server, JID, Type, Reason) ->
+    process_subscription(in, User, Server, JID, Type, Reason).
+
+out_subscription(User, Server, JID, Type) ->
+    process_subscription(out, User, Server, JID, Type, []).
+
+process_subscription(Direction, User, Server, JID1, Type, Reason) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    LJID = jlib:jid_tolower(JID1),
+    Username = list_to_binary(LUser),
+    SJID = list_to_binary(jlib:jid_to_string(LJID)),
+    F = fun() ->
+               Item =
+                   case riak_get_roster_by_jid(LServer, Username, SJID) of
+                       {ok, I} ->
+                           %% raw_to_record can return error, but
+                           %% jlib_to_string would fail before this point
+                           R = raw_to_record(LServer, I),
+                           Groups =
+                               case riak_get_roster_groups(LServer, Username, SJID) of
+                                   {selected, ["grp"], JGrps} when is_list(JGrps) ->
+                                       [JGrp || {JGrp} <- JGrps];
+                                   _ ->
+                                       []
+                               end,
+                           R#roster{groups = Groups};
+                       {error, _} ->
+                           #roster{usj = {LUser, LServer, LJID},
+                                   us = {LUser, LServer},
+                                   jid = LJID}
+                   end,
+               NewState = case Direction of
+                              out ->
+                                  out_state_change(Item#roster.subscription,
+                                                   Item#roster.ask,
+                                                   Type);
+                              in ->
+                                  in_state_change(Item#roster.subscription,
+                                                  Item#roster.ask,
+                                                  Type)
+                          end,
+               AutoReply = case Direction of
+                               out ->
+                                   none;
+                               in ->
+                                   in_auto_reply(Item#roster.subscription,
+                                                 Item#roster.ask,
+                                                 Type)
+                           end,
+               AskMessage = case NewState of
+                                {_, both} -> Reason;
+                                {_, in}   -> Reason;
+                                _         -> ""
+                            end,
+               case NewState of
+                   none ->
+                       {ok, none, AutoReply};
+                   {none, none} when Item#roster.subscription == none,
+                                     Item#roster.ask == in ->
+                       riak_del_roster(LServer, Username, SJID),
+                       {ok, none, AutoReply};
+                   {Subscription, Pending} ->
+                       NewItem = Item#roster{subscription = Subscription,
+                                             ask = Pending,
+                                             askmessage = AskMessage},
+                       ItemVals = record_to_string(NewItem),
+                       riak_roster_subscribe(LServer, Username, SJID, ItemVals),
+                       case roster_version_on_db(LServer) of
+                            true ->
+                                riak_set_roster_version(
+                                  LServer, Username,
+                                  sha:sha(term_to_binary(now())));
+                            false -> ok
+                       end,
+                       {ok, {push, NewItem}, AutoReply}
+               end
+       end,
+    case catch F() of
+       {ok, Push, AutoReply} ->
+           case AutoReply of
+               none ->
+                   ok;
+               _ ->
+                   T = case AutoReply of
+                           subscribed -> "subscribed";
+                           unsubscribed -> "unsubscribed"
+                       end,
+                   ejabberd_router:route(
+                     jlib:make_jid(User, Server, ""), JID1,
+                     {xmlelement, "presence", [{"type", T}], []})
+           end,
+           case Push of
+               {push, Item} ->
+                   if
+                       Item#roster.subscription == none,
+                       Item#roster.ask == in ->
+                           ok;
+                       true ->
+                           push_item(User, Server,
+                                     jlib:make_jid(User, Server, ""), Item)
+                   end,
+                   true;
+               none ->
+                   false
+           end;
+       E ->
+            ?ERROR_MSG("subscription error: ~p~n", [E]),
+           false
+    end.
+
+
+%% in_state_change(Subscription, Pending, Type) -> NewState
+%% NewState = none | {NewSubscription, NewPending}
+-ifdef(ROSTER_GATEWAY_WORKAROUND).
+-define(NNSD, {to, none}).
+-define(NISD, {to, in}).
+-else.
+-define(NNSD, none).
+-define(NISD, none).
+-endif.
+
+in_state_change(none, none, subscribe)    -> {none, in};
+in_state_change(none, none, subscribed)   -> ?NNSD;
+in_state_change(none, none, unsubscribe)  -> none;
+in_state_change(none, none, unsubscribed) -> none;
+in_state_change(none, out,  subscribe)    -> {none, both};
+in_state_change(none, out,  subscribed)   -> {to, none};
+in_state_change(none, out,  unsubscribe)  -> none;
+in_state_change(none, out,  unsubscribed) -> {none, none};
+in_state_change(none, in,   subscribe)    -> none;
+in_state_change(none, in,   subscribed)   -> ?NISD;
+in_state_change(none, in,   unsubscribe)  -> {none, none};
+in_state_change(none, in,   unsubscribed) -> none;
+in_state_change(none, both, subscribe)    -> none;
+in_state_change(none, both, subscribed)   -> {to, in};
+in_state_change(none, both, unsubscribe)  -> {none, out};
+in_state_change(none, both, unsubscribed) -> {none, in};
+in_state_change(to,   none, subscribe)    -> {to, in};
+in_state_change(to,   none, subscribed)   -> none;
+in_state_change(to,   none, unsubscribe)  -> none;
+in_state_change(to,   none, unsubscribed) -> {none, none};
+in_state_change(to,   in,   subscribe)    -> none;
+in_state_change(to,   in,   subscribed)   -> none;
+in_state_change(to,   in,   unsubscribe)  -> {to, none};
+in_state_change(to,   in,   unsubscribed) -> {none, in};
+in_state_change(from, none, subscribe)    -> none;
+in_state_change(from, none, subscribed)   -> {both, none};
+in_state_change(from, none, unsubscribe)  -> {none, none};
+in_state_change(from, none, unsubscribed) -> none;
+in_state_change(from, out,  subscribe)    -> none;
+in_state_change(from, out,  subscribed)   -> {both, none};
+in_state_change(from, out,  unsubscribe)  -> {none, out};
+in_state_change(from, out,  unsubscribed) -> {from, none};
+in_state_change(both, none, subscribe)    -> none;
+in_state_change(both, none, subscribed)   -> none;
+in_state_change(both, none, unsubscribe)  -> {to, none};
+in_state_change(both, none, unsubscribed) -> {from, none}.
+
+out_state_change(none, none, subscribe)    -> {none, out};
+out_state_change(none, none, subscribed)   -> none;
+out_state_change(none, none, unsubscribe)  -> none;
+out_state_change(none, none, unsubscribed) -> none;
+out_state_change(none, out,  subscribe)    -> {none, out}; %% We need to resend query (RFC3921, section 9.2)
+out_state_change(none, out,  subscribed)   -> none;
+out_state_change(none, out,  unsubscribe)  -> {none, none};
+out_state_change(none, out,  unsubscribed) -> none;
+out_state_change(none, in,   subscribe)    -> {none, both};
+out_state_change(none, in,   subscribed)   -> {from, none};
+out_state_change(none, in,   unsubscribe)  -> none;
+out_state_change(none, in,   unsubscribed) -> {none, none};
+out_state_change(none, both, subscribe)    -> none;
+out_state_change(none, both, subscribed)   -> {from, out};
+out_state_change(none, both, unsubscribe)  -> {none, in};
+out_state_change(none, both, unsubscribed) -> {none, out};
+out_state_change(to,   none, subscribe)    -> none;
+out_state_change(to,   none, subscribed)   -> {both, none};
+out_state_change(to,   none, unsubscribe)  -> {none, none};
+out_state_change(to,   none, unsubscribed) -> none;
+out_state_change(to,   in,   subscribe)    -> none;
+out_state_change(to,   in,   subscribed)   -> {both, none};
+out_state_change(to,   in,   unsubscribe)  -> {none, in};
+out_state_change(to,   in,   unsubscribed) -> {to, none};
+out_state_change(from, none, subscribe)    -> {from, out};
+out_state_change(from, none, subscribed)   -> none;
+out_state_change(from, none, unsubscribe)  -> none;
+out_state_change(from, none, unsubscribed) -> {none, none};
+out_state_change(from, out,  subscribe)    -> none;
+out_state_change(from, out,  subscribed)   -> none;
+out_state_change(from, out,  unsubscribe)  -> {from, none};
+out_state_change(from, out,  unsubscribed) -> {none, out};
+out_state_change(both, none, subscribe)    -> none;
+out_state_change(both, none, subscribed)   -> none;
+out_state_change(both, none, unsubscribe)  -> {from, none};
+out_state_change(both, none, unsubscribed) -> {to, none}.
+
+in_auto_reply(from, none, subscribe)    -> subscribed;
+in_auto_reply(from, out,  subscribe)    -> subscribed;
+in_auto_reply(both, none, subscribe)    -> subscribed;
+in_auto_reply(none, in,   unsubscribe)  -> unsubscribed;
+in_auto_reply(none, both, unsubscribe)  -> unsubscribed;
+in_auto_reply(to,   in,   unsubscribe)  -> unsubscribed;
+in_auto_reply(from, none, unsubscribe)  -> unsubscribed;
+in_auto_reply(from, out,  unsubscribe)  -> unsubscribed;
+in_auto_reply(both, none, unsubscribe)  -> unsubscribed;
+in_auto_reply(_,    _,    _)  ->           none.
+
+
+remove_user(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    send_unsubscription_to_rosteritems(LUser, LServer),
+    riak_del_user_roster(LServer, Username),
+    ok.
+
+%% For each contact with Subscription:
+%% Both or From, send a "unsubscribed" presence stanza;
+%% Both or To, send a "unsubscribe" presence stanza.
+send_unsubscription_to_rosteritems(LUser, LServer) ->
+    RosterItems = get_user_roster([], {LUser, LServer}),
+    From = jlib:make_jid({LUser, LServer, ""}),
+    lists:foreach(fun(RosterItem) ->
+                         send_unsubscribing_presence(From, RosterItem)
+                 end,
+                 RosterItems).
+
+%% @spec (From::jid(), Item::roster()) -> ok
+send_unsubscribing_presence(From, Item) ->
+    IsTo = case Item#roster.subscription of
+              both -> true;
+              to -> true;
+              _ -> false
+          end,
+    IsFrom = case Item#roster.subscription of
+                both -> true;
+                from -> true;
+                _ -> false
+            end,
+    if IsTo ->
+           send_presence_type(
+             jlib:jid_remove_resource(From),
+             jlib:make_jid(Item#roster.jid), "unsubscribe");
+       true -> ok
+    end,
+    if IsFrom ->
+           send_presence_type(
+             jlib:jid_remove_resource(From),
+             jlib:make_jid(Item#roster.jid), "unsubscribed");
+       true -> ok
+    end,
+    ok.
+
+send_presence_type(From, To, Type) ->
+    ejabberd_router:route(
+      From, To,
+      {xmlelement, "presence",
+       [{"type", Type}],
+       []}).
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+set_items(User, Server, SubEl) ->
+    {xmlelement, _Name, _Attrs, Els} = SubEl,
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    lists:foreach(fun(El) ->
+                          process_item_set_t(LUser, LServer, El)
+                  end, Els).
+
+process_item_set_t(LUser, LServer, {xmlelement, _Name, Attrs, Els}) ->
+    JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)),
+    case JID1 of
+       error ->
+           [];
+       _ ->
+           LJID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+           Username = list_to_binary(LUser),
+           SJID = list_to_binary(jlib:jid_to_string(LJID)),
+           Item = #roster{usj = {LUser, LServer, LJID},
+                          us = {LUser, LServer},
+                          jid = LJID},
+           Item1 = process_item_attrs_ws(Item, Attrs),
+           Item2 = process_item_els(Item1, Els),
+           case Item2#roster.subscription of
+               remove ->
+                   riak_del_roster(LServer, Username, SJID);
+               _ ->
+                   ItemVals = record_to_string(Item1),
+                   ItemGroups = groups_to_binary(Item2),
+                   riak_update_roster(
+                      LServer, Username, SJID, ItemVals, ItemGroups)
+           end
+    end;
+process_item_set_t(_LUser, _LServer, _) ->
+    [].
+
+process_item_attrs_ws(Item, [{Attr, Val} | Attrs]) ->
+    case Attr of
+       "jid" ->
+           case jlib:string_to_jid(Val) of
+               error ->
+                   process_item_attrs_ws(Item, Attrs);
+               JID1 ->
+                   JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+                   process_item_attrs_ws(Item#roster{jid = JID}, Attrs)
+           end;
+       "name" ->
+           process_item_attrs_ws(Item#roster{name = Val}, Attrs);
+       "subscription" ->
+           case Val of
+               "remove" ->
+                   process_item_attrs_ws(Item#roster{subscription = remove},
+                                         Attrs);
+               "none" ->
+                   process_item_attrs_ws(Item#roster{subscription = none},
+                                         Attrs);
+               "both" ->
+                   process_item_attrs_ws(Item#roster{subscription = both},
+                                         Attrs);
+               "from" ->
+                   process_item_attrs_ws(Item#roster{subscription = from},
+                                         Attrs);
+               "to" ->
+                   process_item_attrs_ws(Item#roster{subscription = to},
+                                         Attrs);
+               _ ->
+                   process_item_attrs_ws(Item, Attrs)
+           end;
+       "ask" ->
+           process_item_attrs_ws(Item, Attrs);
+       _ ->
+           process_item_attrs_ws(Item, Attrs)
+    end;
+process_item_attrs_ws(Item, []) ->
+    Item.
+
+get_in_pending_subscriptions(Ls, User, Server) ->
+    JID = jlib:make_jid(User, Server, ""),
+    LUser = JID#jid.luser,
+    LServer = JID#jid.lserver,
+    Username = list_to_binary(LUser),
+    case catch riak_get_roster(LServer, Username) of
+       {ok, Items} when is_list(Items) ->
+           Ls ++ lists:map(
+                   fun(R) ->
+                           Message = R#roster.askmessage,
+                           {xmlelement, "presence",
+                            [{"from", jlib:jid_to_string(R#roster.jid)},
+                             {"to", jlib:jid_to_string(JID)},
+                             {"type", "subscribe"}],
+                            [{xmlelement, "status", [],
+                              [{xmlcdata, Message}]}]}
+                   end,
+                   lists:flatmap(
+                     fun(I) ->
+                             case raw_to_record(LServer, I) of
+                                 %% Bad JID in database:
+                                 error ->
+                                     [];
+                                 R ->
+                                     case R#roster.ask of
+                                         in   -> [R];
+                                         both -> [R];
+                                         _ -> []
+                                     end
+                             end
+                     end,
+                     Items));
+       _ ->
+           Ls
+    end.
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+get_jid_info(_, User, Server, JID) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    LJID = jlib:jid_tolower(JID),
+    Username = list_to_binary(LUser),
+    SJID = list_to_binary(jlib:jid_to_string(LJID)),
+    case catch riak_get_subscription(LServer, Username, SJID) of
+       {ok, Subscription} ->
+           Groups =
+                case catch riak_get_roster_groups(LServer, Username, SJID) of
+                    {ok, JGrps} when is_list(JGrps) ->
+                        lists:map(fun binary_to_list/1, JGrps);
+                    _ ->
+                        []
+                end,
+           {Subscription, Groups};
+       _ ->
+           LRJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
+           if
+               LRJID == LJID ->
+                   {none, []};
+               true ->
+                   SRJID = list_to_binary(jlib:jid_to_string(LRJID)),
+                   case catch riak_get_subscription(LServer, Username, SRJID) of
+                       {ok, Subscription} ->
+                           Groups = case catch riak_get_roster_groups(LServer, Username, SRJID) of
+                                         {ok, JGrps} when is_list(JGrps) ->
+                                             lists:map(fun binary_to_list/1,
+                                                       JGrps);
+                                        _ ->
+                                            []
+                                    end,
+                           {Subscription, Groups};
+                       _ ->
+                           {none, []}
+                   end
+           end
+    end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+raw_to_record(LServer,
+              <<UsernameLen:16, Username:UsernameLen/binary,
+               SJIDLen:16, SJID:SJIDLen/binary,
+               NickLen:16, Nick:NickLen/binary,
+               SSubscription, SAsk,
+               SAskMessageLen:16, SAskMessage:SAskMessageLen/binary>>) ->
+    User = binary_to_list(Username),
+    case jlib:string_to_jid(binary_to_list(SJID)) of
+       error ->
+           error;
+       JID ->
+           LJID = jlib:jid_tolower(JID),
+           Subscription = case SSubscription of
+                              $B -> both;
+                              $T -> to;
+                              $F -> from;
+                              _ -> none
+                          end,
+           Ask = case SAsk of
+                     $S -> subscribe;
+                     $U -> unsubscribe;
+                     $B -> both;
+                     $O -> out;
+                     $I -> in;
+                     _ -> none
+                 end,
+           #roster{usj = {User, LServer, LJID},
+                   us = {User, LServer},
+                   jid = LJID,
+                   name = binary_to_list(Nick),
+                   subscription = Subscription,
+                   ask = Ask,
+                   askmessage = SAskMessage}
+    end.
+
+record_to_string(#roster{us = {User, _Server},
+                        jid = JID,
+                        name = Name,
+                        subscription = Subscription,
+                        ask = Ask,
+                        askmessage = AskMessage}) ->
+    Username = list_to_binary(User),
+    UsernameLen = size(Username),
+    SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))),
+    SJIDLen = size(SJID),
+    Nick = list_to_binary(Name),
+    NickLen = size(Nick),
+    SSubscription = case Subscription of
+                       both -> $B;
+                       to   -> $T;
+                       from -> $F;
+                       none -> $N
+                   end,
+    SAsk = case Ask of
+              subscribe   -> $S;
+              unsubscribe -> $U;
+              both        -> $B;
+              out         -> $O;
+              in          -> $I;
+              none        -> $N
+          end,
+    SAskMessage = iolist_to_binary(AskMessage),
+    SAskMessageLen = size(SAskMessage),
+    <<UsernameLen:16, Username/binary,
+     SJIDLen:16, SJID/binary,
+     NickLen:16, Nick/binary,
+     SSubscription, SAsk,
+     SAskMessageLen:16, SAskMessage/binary>>.
+
+groups_to_binary(#roster{jid = JID, groups = Groups}) ->
+    SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))),
+    SJIDLen = size(SJID),
+    %% Empty groups do not need to be converted to string to be inserted in
+    %% the database
+    lists:foldl(
+      fun([], Acc) ->
+              Acc;
+        (Group, Acc) ->
+             G = list_to_binary(Group),
+              Len = size(G),
+             <<Acc/binary, Len:16, G/binary>>
+      end, <<SJIDLen:16, SJID/binary>>, Groups).
+
+binary_to_groups(<<Len:16, SJID:Len/binary, Rest/binary>>) ->
+    {binary_to_list(SJID), binary_to_groups(Rest, [])}.
+
+binary_to_groups(<<Len:16, G:Len/binary, Rest/binary>>, Res) ->
+    binary_to_groups(Rest, [G | Res]);
+binary_to_groups(_, Res) ->
+    Res.
+
+
+webadmin_page(_, Host,
+             #request{us = _US,
+                      path = ["user", U, "roster"],
+                      q = Query,
+                      lang = Lang} = _Request) ->
+    Res = user_roster(U, Host, Query, Lang),
+    {stop, Res};
+
+webadmin_page(Acc, _, _) -> Acc.
+
+user_roster(User, Server, Query, Lang) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    US = {LUser, LServer},
+    Items1 = get_roster(LUser, LServer),
+    Res = user_roster_parse_query(User, Server, Items1, Query),
+    Items = get_roster(LUser, LServer),
+    SItems = lists:sort(Items),
+    FItems =
+       case SItems of
+           [] ->
+               [?CT("None")];
+           _ ->
+               [?XE("table",
+                    [?XE("thead",
+                         [?XE("tr",
+                              [?XCT("td", "Jabber ID"),
+                               ?XCT("td", "Nickname"),
+                               ?XCT("td", "Subscription"),
+                               ?XCT("td", "Pending"),
+                               ?XCT("td", "Groups")
+                              ])]),
+                     ?XE("tbody",
+                         lists:map(
+                           fun(R) ->
+                                   Groups =
+                                       lists:flatmap(
+                                         fun(Group) ->
+                                                 [?C(Group), ?BR]
+                                         end, R#roster.groups),
+                                   Pending = ask_to_pending(R#roster.ask),
+                                   TDJID = build_contact_jid_td(R#roster.jid),
+                                   ?XE("tr",
+                                       [TDJID,
+                                        ?XAC("td", [{"class", "valign"}],
+                                             R#roster.name),
+                                        ?XAC("td", [{"class", "valign"}],
+                                             atom_to_list(R#roster.subscription)),
+                                        ?XAC("td", [{"class", "valign"}],
+                                             atom_to_list(Pending)),
+                                        ?XAE("td", [{"class", "valign"}], Groups),
+                                        if
+                                            Pending == in ->
+                                                ?XAE("td", [{"class", "valign"}],
+                                                     [?INPUTT("submit",
+                                                              "validate" ++
+                                                              ejabberd_web_admin:term_to_id(R#roster.jid),
+                                                              "Validate")]);
+                                            true ->
+                                                ?X("td")
+                                        end,
+                                        ?XAE("td", [{"class", "valign"}],
+                                             [?INPUTT("submit",
+                                                      "remove" ++
+                                                      ejabberd_web_admin:term_to_id(R#roster.jid),
+                                                      "Remove")])])
+                           end, SItems))])]
+       end,
+    [?XC("h1", ?T("Roster of ") ++ us_to_list(US))] ++
+       case Res of
+           ok -> [?XREST("Submitted")];
+           error -> [?XREST("Bad format")];
+           nothing -> []
+       end ++
+       [?XAE("form", [{"action", ""}, {"method", "post"}],
+             FItems ++
+             [?P,
+              ?INPUT("text", "newjid", ""), ?C(" "),
+              ?INPUTT("submit", "addjid", "Add Jabber ID")
+             ])].
+
+build_contact_jid_td(RosterJID) ->
+    %% Convert {U, S, R} into {jid, U, S, R, U, S, R}:
+    ContactJID = jlib:make_jid(RosterJID),
+    JIDURI = case {ContactJID#jid.luser, ContactJID#jid.lserver} of
+                {"", _} -> "";
+                {CUser, CServer} ->
+                    case lists:member(CServer, ?MYHOSTS) of
+                        false -> "";
+                        true -> "/admin/server/" ++ CServer ++ "/user/" ++ CUser ++ "/"
+                    end
+            end,
+    case JIDURI of
+       [] ->
+           ?XAC("td", [{"class", "valign"}], jlib:jid_to_string(RosterJID));
+       URI when is_list(URI) ->
+           ?XAE("td", [{"class", "valign"}], [?AC(JIDURI, jlib:jid_to_string(RosterJID))])
+    end.
+
+user_roster_parse_query(User, Server, Items, Query) ->
+    case lists:keysearch("addjid", 1, Query) of
+       {value, _} ->
+           case lists:keysearch("newjid", 1, Query) of
+               {value, {_, undefined}} ->
+                   error;
+               {value, {_, SJID}} ->
+                   case jlib:string_to_jid(SJID) of
+                       JID when is_record(JID, jid) ->
+                           user_roster_subscribe_jid(User, Server, JID),
+                           ok;
+                       error ->
+                           error
+                   end;
+               false ->
+                   error
+           end;
+       false ->
+           case catch user_roster_item_parse_query(
+                        User, Server, Items, Query) of
+               submitted ->
+                   ok;
+               {'EXIT', _Reason} ->
+                   error;
+               _ ->
+                   nothing
+           end
+    end.
+
+
+user_roster_subscribe_jid(User, Server, JID) ->
+    out_subscription(User, Server, JID, subscribe),
+    UJID = jlib:make_jid(User, Server, ""),
+    ejabberd_router:route(
+      UJID, JID, {xmlelement, "presence", [{"type", "subscribe"}], []}).
+
+user_roster_item_parse_query(User, Server, Items, Query) ->
+    lists:foreach(
+      fun(R) ->
+             JID = R#roster.jid,
+             case lists:keysearch(
+                    "validate" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+                 {value, _} ->
+                     JID1 = jlib:make_jid(JID),
+                     out_subscription(
+                       User, Server, JID1, subscribed),
+                     UJID = jlib:make_jid(User, Server, ""),
+                     ejabberd_router:route(
+                       UJID, JID1, {xmlelement, "presence",
+                                    [{"type", "subscribed"}], []}),
+                     throw(submitted);
+                 false ->
+                     case lists:keysearch(
+                            "remove" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+                         {value, _} ->
+                             UJID = jlib:make_jid(User, Server, ""),
+                             process_iq(
+                               UJID, UJID,
+                               #iq{type = set,
+                                   sub_el = {xmlelement, "query",
+                                             [{"xmlns", ?NS_ROSTER}],
+                                             [{xmlelement, "item",
+                                               [{"jid", jlib:jid_to_string(JID)},
+                                                {"subscription", "remove"}],
+                                               []}]}}),
+                             throw(submitted);
+                         false ->
+                             ok
+                     end
+
+             end
+      end, Items),
+    nothing.
+
+us_to_list({User, Server}) ->
+    jlib:jid_to_string({User, Server, ""}).
+
+webadmin_user(Acc, _User, _Server, Lang) ->
+    Acc ++ [?XE("h3", [?ACT("roster/", "Roster")])].
+
+
+riak_get_roster(LServer, Username) ->
+    ejabberd_riak:get_by_index(
+      LServer, <<"roster">>, <<"user_bin">>, Username).
+
+riak_get_roster_jid_groups(LServer, Username) ->
+    case ejabberd_riak:get_by_index(
+           LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+        {ok, JGs} ->
+            Res = lists:map(fun binary_to_groups/1, JGs),
+            {ok, Res};
+        Error -> Error
+    end.
+
+riak_get_roster_groups(LServer, Username, SJID) ->
+    Key = <<Username/binary, $/, SJID/binary>>,
+    case ejabberd_riak:get(LServer, <<"roster_groups">>, Key) of
+        {ok, Gs} ->
+            {_, Res} = binary_to_groups(Gs),
+            {ok, Res};
+        {error, notfound} ->
+            {ok, []};
+        Error -> Error
+    end.
+
+riak_get_roster_by_jid(LServer, Username, SJID) ->
+    Key = <<Username/binary, $/, SJID/binary>>,
+    ejabberd_riak:get(LServer, <<"roster">>, Key).
+
+riak_del_roster(LServer, Username, SJID) ->
+    Key = <<Username/binary, $/, SJID/binary>>,
+    ejabberd_riak:delete(LServer, <<"roster">>, Key).
+
+riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) ->
+    Key = <<Username/binary, $/, SJID/binary>>,
+    ejabberd_riak:put(
+      LServer, <<"roster">>, Key, ItemVals,
+      [{<<"user_bin">>, Username}]),
+    ejabberd_riak:put(
+      LServer, <<"roster_groups">>, Key, ItemGroups,
+      [{<<"user_bin">>, Username}]).
+
+riak_roster_subscribe(LServer, Username, SJID, ItemVals) ->
+    Key = <<Username/binary, $/, SJID/binary>>,
+    ejabberd_riak:put(
+      LServer, <<"roster">>, Key, ItemVals,
+      [{<<"user_bin">>, Username}]).
+
+riak_get_subscription(LServer, Username, SJID) ->
+    case riak_get_roster_by_jid(LServer, Username, SJID) of
+        {ok, SR} ->
+            case raw_to_record(LServer, SR) of
+                error ->
+                    {error, bad_record};
+                R ->
+                    {ok, R#roster.subscription}
+            end;
+        Error ->
+            Error
+    end.
+
+riak_set_roster_version(LServer, Username, RosterVersion) ->
+    ejabberd_riak:put(LServer, <<"roster_version">>,
+                      Username, list_to_binary(RosterVersion)).
+
+
+riak_del_user_roster(LServer, Username) ->
+    case ejabberd_riak:get_keys_by_index(
+           LServer, <<"roster">>, <<"user_bin">>, Username) of
+        {ok, Keys} ->
+            lists:foreach(
+              fun(Key) ->
+                      ejabberd_riak:delete(LServer, <<"roster">>, Key)
+              end, Keys);
+        _ ->
+            ok
+    end,
+    case ejabberd_riak:get_keys_by_index(
+           LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+        {ok, GKeys} ->
+            lists:foreach(
+              fun(Key) ->
+                      ejabberd_riak:delete(LServer, <<"roster_groups">>, Key)
+              end, GKeys);
+        _ ->
+            ok
+    end,
+    ejabberd_riak:delete(LServer, <<"roster_version">>, Username).
+
diff --git a/src/mod_vcard_riak.erl b/src/mod_vcard_riak.erl
new file mode 100644 (file)
index 0000000..44ce35f
--- /dev/null
@@ -0,0 +1,209 @@
+%%%----------------------------------------------------------------------
+%%% File    : mod_vcard_riak.erl
+%%% Author  : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : vCard support via Riak
+%%% Created :  6 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011   ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(mod_vcard_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([start/2, stop/1,
+        get_sm_features/5,
+        process_local_iq/3,
+        process_sm_iq/3,
+        remove_user/2]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+
+
+-define(JUD_MATCHES, 30).
+-define(PROCNAME, ejabberd_mod_vcard).
+
+start(Host, Opts) ->
+    ejabberd_hooks:add(remove_user, Host,
+                      ?MODULE, remove_user, 50),
+    IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
+    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD,
+                                 ?MODULE, process_local_iq, IQDisc),
+    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_VCARD,
+                                 ?MODULE, process_sm_iq, IQDisc),
+    ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
+    ok.
+
+stop(Host) ->
+    ejabberd_hooks:delete(remove_user, Host,
+                         ?MODULE, remove_user, 50),
+    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD),
+    gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_VCARD),
+    ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
+    ok.
+
+get_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) ->
+    Acc;
+
+get_sm_features(Acc, _From, _To, Node, _Lang) ->
+    case Node of
+       [] ->
+           case Acc of
+               {result, Features} ->
+                   {result, [?NS_VCARD | Features]};
+               empty ->
+                   {result, [?NS_VCARD]}
+           end;
+       _ ->
+           Acc
+     end.
+
+process_local_iq(_From, _To, #iq{type = Type, lang = Lang, sub_el = SubEl} = IQ) ->
+    case Type of
+       set ->
+           IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]};
+       get ->
+           IQ#iq{type = result,
+                 sub_el = [{xmlelement, "vCard",
+                            [{"xmlns", ?NS_VCARD}],
+                            [{xmlelement, "FN", [],
+                              [{xmlcdata, "ejabberd"}]},
+                             {xmlelement, "URL", [],
+                              [{xmlcdata, ?EJABBERD_URI}]},
+                             {xmlelement, "DESC", [],
+                              [{xmlcdata,
+                                translate:translate(
+                                  Lang,
+                                  "Erlang Jabber Server") ++
+                                  "\nCopyright (c) 2002-2011 ProcessOne"}]},
+                             {xmlelement, "BDAY", [],
+                              [{xmlcdata, "2002-11-16"}]}
+                            ]}]}
+    end.
+
+
+process_sm_iq(From, To, #iq{type = Type, sub_el = SubEl} = IQ) ->
+    case Type of
+       set ->
+           #jid{user = User, lserver = LServer} = From,
+           case lists:member(LServer, ?MYHOSTS) of
+               true ->
+                   set_vcard(User, LServer, SubEl),
+                   IQ#iq{type = result, sub_el = []};
+               false ->
+                   IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}
+           end;
+       get ->
+           #jid{luser = LUser, lserver = LServer} = To,
+            Username = list_to_binary(LUser),
+           case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of
+               {ok, SVCARD} ->
+                   case xml_stream:parse_element(SVCARD) of
+                       {error, _Reason} ->
+                           IQ#iq{type = error,
+                                 sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]};
+                       VCARD ->
+                           IQ#iq{type = result, sub_el = [VCARD]}
+                   end;
+               {error, notfound} ->
+                   IQ#iq{type = result, sub_el = []};
+               _ ->
+                   IQ#iq{type = error,
+                         sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
+           end
+    end.
+
+set_vcard(User, LServer, VCARD) ->
+    FN       = xml:get_path_s(VCARD, [{elem, "FN"},                     cdata]),
+    Family   = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "FAMILY"},    cdata]),
+    Given    = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "GIVEN"},     cdata]),
+    Middle   = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "MIDDLE"},    cdata]),
+    Nickname = xml:get_path_s(VCARD, [{elem, "NICKNAME"},               cdata]),
+    BDay     = xml:get_path_s(VCARD, [{elem, "BDAY"},                   cdata]),
+    CTRY     = xml:get_path_s(VCARD, [{elem, "ADR"}, {elem, "CTRY"},    cdata]),
+    Locality = xml:get_path_s(VCARD, [{elem, "ADR"}, {elem, "LOCALITY"},cdata]),
+    EMail1   = xml:get_path_s(VCARD, [{elem, "EMAIL"}, {elem, "USERID"},cdata]),
+    EMail2   = xml:get_path_s(VCARD, [{elem, "EMAIL"},                  cdata]),
+    OrgName  = xml:get_path_s(VCARD, [{elem, "ORG"}, {elem, "ORGNAME"}, cdata]),
+    OrgUnit  = xml:get_path_s(VCARD, [{elem, "ORG"}, {elem, "ORGUNIT"}, cdata]),
+    EMail = case EMail1 of
+               "" ->
+                   EMail2;
+               _ ->
+                   EMail1
+           end,
+
+    LUser     = jlib:nodeprep(User),
+    LFN       = stringprep:tolower(FN),
+    LFamily   = stringprep:tolower(Family),
+    LGiven    = stringprep:tolower(Given),
+    LMiddle   = stringprep:tolower(Middle),
+    LNickname = stringprep:tolower(Nickname),
+    LBDay     = stringprep:tolower(BDay),
+    LCTRY     = stringprep:tolower(CTRY),
+    LLocality = stringprep:tolower(Locality),
+    LEMail    = stringprep:tolower(EMail),
+    LOrgName  = stringprep:tolower(OrgName),
+    LOrgUnit  = stringprep:tolower(OrgUnit),
+
+    if
+       (LUser     == error) or
+       (LFN       == error) or
+       (LFamily   == error) or
+       (LGiven    == error) or
+       (LMiddle   == error) or
+       (LNickname == error) or
+       (LBDay     == error) or
+       (LCTRY     == error) or
+       (LLocality == error) or
+       (LEMail    == error) or
+       (LOrgName  == error) or
+       (LOrgUnit  == error) ->
+           {error, badarg};
+       true ->
+            Username = list_to_binary(LUser),
+           SVCARD = xml:element_to_binary(VCARD),
+
+            ejabberd_riak:put(
+              LServer, <<"vcard">>, Username, SVCARD,
+              [{<<"bday_bin">>, list_to_binary(LBDay)},
+               {<<"ctry_bin">>, list_to_binary(LCTRY)},
+               {<<"email_bin">>, list_to_binary(LEMail)},
+               {<<"fn_bin">>, list_to_binary(LFN)},
+               {<<"family_bin">>, list_to_binary(LFamily)},
+               {<<"given_bin">>, list_to_binary(LGiven)},
+               {<<"locality_bin">>, list_to_binary(LLocality)},
+               {<<"middle_bin">>, list_to_binary(LMiddle)},
+               {<<"nickname_bin">>, list_to_binary(LNickname)},
+               {<<"orgname_bin">>, list_to_binary(LOrgName)},
+               {<<"orgunit_bin">>, list_to_binary(LOrgUnit)},
+               {<<"user_bin">>, Username}]),
+
+           ejabberd_hooks:run(vcard_set, LServer, [LUser, LServer, VCARD])
+    end.
+
+remove_user(User, Server) ->
+    LUser = jlib:nodeprep(User),
+    LServer = jlib:nameprep(Server),
+    Username = list_to_binary(LUser),
+    ejabberd_riak:delete(LServer, <<"vcard">>, Username),
+    ok.