--- /dev/null
+%%%----------------------------------------------------------------------
+%%% File : mod_offline_riak.erl
+%%% Author : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Store and manage offline messages in Riak.
+%%% Created : 4 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+-module(mod_offline_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([count_offline_messages/2]).
+
+-export([start/2,
+ init/2,
+ stop/1,
+ store_packet/3,
+ pop_offline_messages/3,
+ remove_user/2,
+ webadmin_page/3,
+ webadmin_user/4,
+ webadmin_user_parse_query/5,
+ count_offline_messages/3]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+-include("web/ejabberd_http.hrl").
+-include("web/ejabberd_web_admin.hrl").
+
+-record(offline_msg, {user, timestamp, expire, from, to, packet}).
+
+-define(PROCNAME, ejabberd_offline).
+-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
+
+start(Host, Opts) ->
+ ejabberd_hooks:add(offline_message_hook, Host,
+ ?MODULE, store_packet, 50),
+ ejabberd_hooks:add(resend_offline_messages_hook, Host,
+ ?MODULE, pop_offline_messages, 50),
+ ejabberd_hooks:add(remove_user, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:add(anonymous_purge_hook, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:add(webadmin_page_host, Host,
+ ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:add(webadmin_user, Host,
+ ?MODULE, webadmin_user, 50),
+ ejabberd_hooks:add(webadmin_user_parse_query, Host,
+ ?MODULE, webadmin_user_parse_query, 50),
+ ejabberd_hooks:add(count_offline_messages, Host,
+ ?MODULE, count_offline_messages, 50),
+ MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity),
+ register(gen_mod:get_module_proc(Host, ?PROCNAME),
+ spawn(?MODULE, init, [Host, MaxOfflineMsgs])).
+
+%% MaxOfflineMsgs is either infinity of integer > 0
+init(Host, infinity) ->
+ loop(Host, infinity);
+init(Host, MaxOfflineMsgs)
+ when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 ->
+ loop(Host, MaxOfflineMsgs).
+
+loop(Host, MaxOfflineMsgs) ->
+ receive
+ #offline_msg{user = User} = Msg ->
+ Msgs = receive_all(User, [Msg]),
+ Len = length(Msgs),
+
+ %% Only count existing messages if needed:
+ Count = if MaxOfflineMsgs =/= infinity ->
+ Len + count_offline_messages(User, Host);
+ true -> 0
+ end,
+ if
+ Count > MaxOfflineMsgs ->
+ discard_warn_sender(Msgs);
+ true ->
+ lists:foreach(
+ fun(M) ->
+ Username = list_to_binary(User),
+ From = M#offline_msg.from,
+ To = M#offline_msg.to,
+ {xmlelement, Name, Attrs, Els} =
+ M#offline_msg.packet,
+ Attrs2 = jlib:replace_from_to_attrs(
+ jlib:jid_to_string(From),
+ jlib:jid_to_string(To),
+ Attrs),
+ Packet = {xmlelement, Name, Attrs2,
+ Els ++
+ [jlib:timestamp_to_xml(
+ calendar:now_to_universal_time(
+ M#offline_msg.timestamp))]},
+ XML =
+ iolist_to_binary(
+ xml:element_to_string(Packet)),
+ {MegaSecs, Secs, MicroSecs} =
+ M#offline_msg.timestamp,
+ TS =
+ iolist_to_binary(
+ io_lib:format("~6..0w~6..0w.~6..0w",
+ [MegaSecs, Secs, MicroSecs])),
+ ejabberd_riak:put(
+ Host, <<"offline">>,
+ undefined, XML,
+ [{<<"user_bin">>, Username},
+ {<<"timestamp_bin">>, TS}
+ ])
+ end, Msgs)
+ end,
+ loop(Host, MaxOfflineMsgs);
+ _ ->
+ loop(Host, MaxOfflineMsgs)
+ end.
+
+receive_all(Username, Msgs) ->
+ receive
+ #offline_msg{user=Username} = Msg ->
+ receive_all(Username, [Msg | Msgs])
+ after 0 ->
+ lists:reverse(Msgs)
+ end.
+
+
+stop(Host) ->
+ ejabberd_hooks:delete(offline_message_hook, Host,
+ ?MODULE, store_packet, 50),
+ ejabberd_hooks:delete(resend_offline_messages_hook, Host,
+ ?MODULE, pop_offline_messages, 50),
+ ejabberd_hooks:delete(remove_user, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:delete(anonymous_purge_hook, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:delete(webadmin_page_host, Host,
+ ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:delete(webadmin_user, Host,
+ ?MODULE, webadmin_user, 50),
+ ejabberd_hooks:delete(webadmin_user_parse_query, Host,
+ ?MODULE, webadmin_user_parse_query, 50),
+ Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
+ exit(whereis(Proc), stop),
+ ok.
+
+store_packet(From, To, Packet) ->
+ Type = xml:get_tag_attr_s("type", Packet),
+ if
+ (Type /= "error") and (Type /= "groupchat") and
+ (Type /= "headline") ->
+ case check_event(From, To, Packet) of
+ true ->
+ #jid{luser = LUser} = To,
+ TimeStamp = now(),
+ {xmlelement, _Name, _Attrs, Els} = Packet,
+ Expire = find_x_expire(TimeStamp, Els),
+ gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) !
+ #offline_msg{user = LUser,
+ timestamp = TimeStamp,
+ expire = Expire,
+ from = From,
+ to = To,
+ packet = Packet},
+ stop;
+ _ ->
+ ok
+ end;
+ true ->
+ ok
+ end.
+
+check_event(From, To, Packet) ->
+ {xmlelement, Name, Attrs, Els} = Packet,
+ case find_x_event(Els) of
+ false ->
+ true;
+ El ->
+ case xml:get_subtag(El, "id") of
+ false ->
+ case xml:get_subtag(El, "offline") of
+ false ->
+ true;
+ _ ->
+ ID = case xml:get_tag_attr_s("id", Packet) of
+ "" ->
+ {xmlelement, "id", [], []};
+ S ->
+ {xmlelement, "id", [],
+ [{xmlcdata, S}]}
+ end,
+ ejabberd_router:route(
+ To, From, {xmlelement, Name, Attrs,
+ [{xmlelement, "x",
+ [{"xmlns", ?NS_EVENT}],
+ [ID,
+ {xmlelement, "offline", [], []}]}]
+ }),
+ true
+ end;
+ _ ->
+ false
+ end
+ end.
+
+find_x_event([]) ->
+ false;
+find_x_event([{xmlcdata, _} | Els]) ->
+ find_x_event(Els);
+find_x_event([El | Els]) ->
+ case xml:get_tag_attr_s("xmlns", El) of
+ ?NS_EVENT ->
+ El;
+ _ ->
+ find_x_event(Els)
+ end.
+
+find_x_expire(_, []) ->
+ never;
+find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) ->
+ find_x_expire(TimeStamp, Els);
+find_x_expire(TimeStamp, [El | Els]) ->
+ case xml:get_tag_attr_s("xmlns", El) of
+ ?NS_EXPIRE ->
+ Val = xml:get_tag_attr_s("seconds", El),
+ case catch list_to_integer(Val) of
+ {'EXIT', _} ->
+ never;
+ Int when Int > 0 ->
+ {MegaSecs, Secs, MicroSecs} = TimeStamp,
+ S = MegaSecs * 1000000 + Secs + Int,
+ MegaSecs1 = S div 1000000,
+ Secs1 = S rem 1000000,
+ {MegaSecs1, Secs1, MicroSecs};
+ _ ->
+ never
+ end;
+ _ ->
+ find_x_expire(TimeStamp, Els)
+ end.
+
+
+pop_offline_messages(Ls, User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = list_to_binary(LUser),
+ case ejabberd_riak:get_objects_by_index(
+ LServer, <<"offline">>, <<"user_bin">>, Username) of
+ {ok, Rs} ->
+ SortedRs =
+ lists:sort(fun(X, Y) ->
+ MX = riak_object:get_metadata(X),
+ {ok, IX} = dict:find(<<"index">>, MX),
+ {value, TSX} = lists:keysearch(
+ <<"timestamp_bin">>, 1,
+ IX),
+ MY = riak_object:get_metadata(Y),
+ {ok, IY} = dict:find(<<"index">>, MY),
+ {value, TSY} = lists:keysearch(
+ <<"timestamp_bin">>, 1,
+ IY),
+ TSX =< TSY
+ end, Rs),
+ Ls ++ lists:flatmap(
+ fun(R) ->
+ Key = riak_object:key(R),
+ ejabberd_riak:delete(LServer, <<"offline">>, Key),
+ XML = riak_object:get_value(R),
+ case xml_stream:parse_element(XML) of
+ {error, _Reason} ->
+ [];
+ El ->
+ To = jlib:string_to_jid(
+ xml:get_tag_attr_s("to", El)),
+ From = jlib:string_to_jid(
+ xml:get_tag_attr_s("from", El)),
+ if
+ (To /= error) and
+ (From /= error) ->
+ [{route, From, To, El}];
+ true ->
+ []
+ end
+ end
+ end, SortedRs);
+ _ ->
+ Ls
+ end.
+
+
+remove_user(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = list_to_binary(LUser),
+ case ejabberd_riak:get_keys_by_index(
+ LServer, <<"offline">>, <<"user_bin">>, Username) of
+ {ok, Keys} ->
+ lists:foreach(
+ fun(Key) ->
+ ejabberd_riak:delete(LServer, <<"offline">>, Key)
+ end, Keys);
+ _ ->
+ ok
+ end.
+
+
+%% Helper functions:
+
+%% TODO: Warning - This function is a duplicate from mod_offline.erl
+%% It is duplicate to stay consistent (many functions are duplicated
+%% in this module). It will be refactored later on.
+%% Warn senders that their messages have been discarded:
+discard_warn_sender(Msgs) ->
+ lists:foreach(
+ fun(#offline_msg{from=From, to=To, packet=Packet}) ->
+ ErrText = "Your contact offline message queue is full. The message has been discarded.",
+ Lang = xml:get_tag_attr_s("xml:lang", Packet),
+ Err = jlib:make_error_reply(
+ Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)),
+ ejabberd_router:route(
+ To,
+ From, Err)
+ end, Msgs).
+
+
+webadmin_page(_, Host,
+ #request{us = _US,
+ path = ["user", U, "queue"],
+ q = Query,
+ lang = Lang} = _Request) ->
+ Res = user_queue(U, Host, Query, Lang),
+ {stop, Res};
+
+webadmin_page(Acc, _, _) -> Acc.
+
+user_queue(User, Server, Query, Lang) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = ejabberd_odbc:escape(LUser),
+ US = {LUser, LServer},
+ Res = user_queue_parse_query(Username, LServer, Query),
+ Msgs = case catch ejabberd_odbc:sql_query(
+ LServer,
+ ["select username, xml from spool"
+ " where username='", Username, "'"
+ " order by seq;"]) of
+ {selected, ["username", "xml"], Rs} ->
+ lists:flatmap(
+ fun({_, XML}) ->
+ case xml_stream:parse_element(XML) of
+ {error, _Reason} ->
+ [];
+ El ->
+ [El]
+ end
+ end, Rs);
+ _ ->
+ []
+ end,
+ FMsgs =
+ lists:map(
+ fun({xmlelement, _Name, _Attrs, _Els} = Msg) ->
+ ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))),
+ Packet = Msg,
+ FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
+ ?XE("tr",
+ [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]),
+ ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])]
+ )
+ end, Msgs),
+ [?XC("h1", io_lib:format(?T("~s's Offline Messages Queue"),
+ [us_to_list(US)]))] ++
+ case Res of
+ ok -> [?XREST("Submitted")];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?X("td"),
+ ?XCT("td", "Packet")
+ ])]),
+ ?XE("tbody",
+ if
+ FMsgs == [] ->
+ [?XE("tr",
+ [?XAC("td", [{"colspan", "4"}], " ")]
+ )];
+ true ->
+ FMsgs
+ end
+ )]),
+ ?BR,
+ ?INPUTT("submit", "delete", "Delete Selected")
+ ])].
+
+user_queue_parse_query(Username, LServer, Query) ->
+ case lists:keysearch("delete", 1, Query) of
+ {value, _} ->
+ Msgs = case catch ejabberd_odbc:sql_query(
+ LServer,
+ ["select xml, seq from spool"
+ " where username='", Username, "'"
+ " order by seq;"]) of
+ {selected, ["xml", "seq"], Rs} ->
+ lists:flatmap(
+ fun({XML, Seq}) ->
+ case xml_stream:parse_element(XML) of
+ {error, _Reason} ->
+ [];
+ El ->
+ [{El, Seq}]
+ end
+ end, Rs);
+ _ ->
+ []
+ end,
+ F = fun() ->
+ lists:foreach(
+ fun({Msg, Seq}) ->
+ ID = jlib:encode_base64(
+ binary_to_list(term_to_binary(Msg))),
+ case lists:member({"selected", ID}, Query) of
+ true ->
+ SSeq = ejabberd_odbc:escape(Seq),
+ catch ejabberd_odbc:sql_query(
+ LServer,
+ ["delete from spool"
+ " where username='", Username, "'"
+ " and seq='", SSeq, "';"]);
+ false ->
+ ok
+ end
+ end, Msgs)
+ end,
+ mnesia:transaction(F),
+ ok;
+ false ->
+ nothing
+ end.
+
+us_to_list({User, Server}) ->
+ jlib:jid_to_string({User, Server, ""}).
+
+webadmin_user(Acc, User, Server, Lang) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = ejabberd_odbc:escape(LUser),
+ QueueLen = case catch ejabberd_odbc:sql_query(
+ LServer,
+ ["select count(*) from spool"
+ " where username='", Username, "';"]) of
+ {selected, [_], [{SCount}]} ->
+ SCount;
+ _ ->
+ 0
+ end,
+ FQueueLen = [?AC("queue/", QueueLen)],
+ Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")].
+
+webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) ->
+ case catch odbc_queries:del_spool_msg(Server, User) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]),
+ {stop, error};
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]),
+ {stop, error};
+ _ ->
+ ?INFO_MSG("Removed all offline messages for ~s@~s", [User, Server]),
+ {stop, ok}
+ end;
+webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) ->
+ Acc.
+
+%% ------------------------------------------------
+%% mod_offline: number of messages quota management
+
+%% Returns as integer the number of offline messages for a given user
+count_offline_messages(LUser, LServer) ->
+ Username = list_to_binary([LUser, $@, LServer]),
+ case catch ejabberd_riak:count_by_index(
+ LServer, <<"offline">>, <<"user_bin">>, Username) of
+ {ok, Res} when is_integer(Res) ->
+ Res;
+ _ ->
+ 0
+ end.
+
+count_offline_messages(_Acc, User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Num = case catch ejabberd_odbc:sql_query(
+ LServer,
+ ["select xml from spool"
+ " where username='", LUser, "';"]) of
+ {selected, ["xml"], Rs} ->
+ lists:foldl(
+ fun({XML}, Acc) ->
+ case xml_stream:parse_element(XML) of
+ {error, _Reason} ->
+ Acc;
+ El ->
+ case xml:get_subtag(El, "body") of
+ false ->
+ Acc;
+ _ ->
+ Acc + 1
+ end
+ end
+ end, 0, Rs);
+ _ ->
+ 0
+ end,
+ {stop, Num}.
--- /dev/null
+%%%----------------------------------------------------------------------
+%%% File : mod_roster_riak.erl
+%%% Author : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Roster management
+%%% Created : 6 Jan 2012 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2011 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
+%%% 02111-1307 USA
+%%%
+%%%----------------------------------------------------------------------
+
+%%% @doc Roster management (Mnesia storage).
+%%%
+%%% Includes support for XEP-0237: Roster Versioning.
+%%% The roster versioning follows an all-or-nothing strategy:
+%%% - If the version supplied by the client is the latest, return an empty response.
+%%% - If not, return the entire new roster (with updated version string).
+%%% Roster version is a hash digest of the entire roster.
+%%% No additional data is stored in DB.
+
+-module(mod_roster_riak).
+-author('alexey@process-one.net').
+
+-behaviour(gen_mod).
+
+-export([start/2, stop/1,
+ process_iq/3,
+ process_local_iq/3,
+ get_user_roster/2,
+ get_subscription_lists/3,
+ get_in_pending_subscriptions/3,
+ in_subscription/6,
+ out_subscription/4,
+ set_items/3,
+ remove_user/2,
+ get_jid_info/4,
+ webadmin_page/3,
+ webadmin_user/4,
+ get_versioning_feature/2,
+ roster_versioning_enabled/1]).
+
+-include("ejabberd.hrl").
+-include("jlib.hrl").
+-include("mod_roster.hrl").
+-include("web/ejabberd_http.hrl").
+-include("web/ejabberd_web_admin.hrl").
+
+
+start(Host, _Opts) ->
+ IQDisc = no_queue,
+ ejabberd_hooks:add(roster_get, Host,
+ ?MODULE, get_user_roster, 50),
+ ejabberd_hooks:add(roster_in_subscription, Host,
+ ?MODULE, in_subscription, 50),
+ ejabberd_hooks:add(roster_out_subscription, Host,
+ ?MODULE, out_subscription, 50),
+ ejabberd_hooks:add(roster_get_subscription_lists, Host,
+ ?MODULE, get_subscription_lists, 50),
+ ejabberd_hooks:add(roster_get_jid_info, Host,
+ ?MODULE, get_jid_info, 50),
+ ejabberd_hooks:add(remove_user, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:add(anonymous_purge_hook, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:add(resend_subscription_requests_hook, Host,
+ ?MODULE, get_in_pending_subscriptions, 50),
+ ejabberd_hooks:add(roster_get_versioning_feature, Host,
+ ?MODULE, get_versioning_feature, 50),
+ ejabberd_hooks:add(webadmin_page_host, Host,
+ ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:add(webadmin_user, Host,
+ ?MODULE, webadmin_user, 50),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_ROSTER,
+ ?MODULE, process_iq, IQDisc).
+
+stop(Host) ->
+ ejabberd_hooks:delete(roster_get, Host,
+ ?MODULE, get_user_roster, 50),
+ ejabberd_hooks:delete(roster_in_subscription, Host,
+ ?MODULE, in_subscription, 50),
+ ejabberd_hooks:delete(roster_out_subscription, Host,
+ ?MODULE, out_subscription, 50),
+ ejabberd_hooks:delete(roster_get_subscription_lists, Host,
+ ?MODULE, get_subscription_lists, 50),
+ ejabberd_hooks:delete(roster_get_jid_info, Host,
+ ?MODULE, get_jid_info, 50),
+ ejabberd_hooks:delete(remove_user, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:delete(anonymous_purge_hook, Host,
+ ?MODULE, remove_user, 50),
+ ejabberd_hooks:delete(resend_subscription_requests_hook, Host,
+ ?MODULE, get_in_pending_subscriptions, 50),
+ ejabberd_hooks:delete(roster_get_versioning_feature, Host,
+ ?MODULE, get_versioning_feature, 50),
+ ejabberd_hooks:delete(webadmin_page_host, Host,
+ ?MODULE, webadmin_page, 50),
+ ejabberd_hooks:delete(webadmin_user, Host,
+ ?MODULE, webadmin_user, 50),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_ROSTER).
+
+
+process_iq(From, To, IQ) ->
+ #iq{sub_el = SubEl} = IQ,
+ #jid{lserver = LServer} = From,
+ case lists:member(LServer, ?MYHOSTS) of
+ true ->
+ process_local_iq(From, To, IQ);
+ _ ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]}
+ end.
+
+process_local_iq(From, To, #iq{type = Type} = IQ) ->
+ case Type of
+ set ->
+ process_iq_set(From, To, IQ);
+ get ->
+ process_iq_get(From, To, IQ)
+ end.
+
+
+roster_hash(Items) ->
+ sha:sha(term_to_binary(
+ lists:sort(
+ [R#roster{groups = lists:sort(Grs)} ||
+ R = #roster{groups = Grs} <- Items]))).
+
+roster_versioning_enabled(Host) ->
+ gen_mod:get_module_opt(Host, ?MODULE, versioning, false).
+
+roster_version_on_db(Host) ->
+ gen_mod:get_module_opt(Host, ?MODULE, store_current_id, false).
+
+%% Returns a list that may contain an xmlelement with the XEP-237 feature if it's enabled.
+get_versioning_feature(Acc, Host) ->
+ case roster_versioning_enabled(Host) of
+ true ->
+ Feature = {xmlelement,
+ "ver",
+ [{"xmlns", ?NS_ROSTER_VER}],
+ [{xmlelement, "optional", [], []}]},
+ [Feature | Acc];
+ false -> []
+ end.
+
+roster_version(LServer, LUser) ->
+ US = {LUser, LServer},
+ Username = list_to_binary(LUser),
+ case roster_version_on_db(LServer) of
+ true ->
+ case ejabberd_riak:get(LServer, <<"roster_version">>,
+ Username) of
+ {ok, Version} -> Version;
+ {error, notfound} ->
+ %% If for some reason we don't had it on DB. Create a version Id and store it.
+ %% (we did the same on process_iq_get, that is called when client get roster,
+ %% not sure why it can still not be on DB at this point)
+ RosterVersion = sha:sha(term_to_binary(now())),
+ riak_set_roster_version(LServer, Username, RosterVersion),
+ RosterVersion
+ end;
+ false ->
+ roster_hash(ejabberd_hooks:run_fold(roster_get, LServer, [], [US]))
+ end.
+
+%% Load roster from DB only if neccesary.
+%% It is neccesary if
+%% - roster versioning is disabled in server OR
+%% - roster versioning is not used by the client OR
+%% - roster versioning is used by server and client, BUT the server isn't storing versions on db OR
+%% - the roster version from client don't match current version.
+process_iq_get(From, To, #iq{sub_el = SubEl} = IQ) ->
+ LUser = From#jid.luser,
+ LServer = From#jid.lserver,
+ US = {LUser, LServer},
+ Username = list_to_binary(LUser),
+
+ try
+ {ItemsToSend, VersionToSend} =
+ case {xml:get_tag_attr("ver", SubEl),
+ roster_versioning_enabled(LServer),
+ roster_version_on_db(LServer)} of
+ {{value, RequestedVersion}, true, true} ->
+ BRequestedVersion = iolist_to_binary(RequestedVersion),
+ %% Retrieve version from DB. Only load entire roster
+ %% when neccesary.
+ case ejabberd_riak:get(LServer, <<"roster_version">>,
+ Username) of
+ {ok, BRequestedVersion} ->
+ {false, false};
+ {ok, NewVersion} ->
+ {lists:map(fun item_to_xml/1,
+ ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), NewVersion};
+ {error, notfound} ->
+ RosterVersion = sha:sha(term_to_binary(now())),
+ ejabberd_riak:put(
+ LServer, <<"roster_version">>,
+ Username, list_to_binary(RosterVersion)),
+ {lists:map(fun item_to_xml/1,
+ ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), RosterVersion}
+ end;
+
+ {{value, RequestedVersion}, true, false} ->
+ RosterItems = ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [] , [US]),
+ case roster_hash(RosterItems) of
+ RequestedVersion ->
+ {false, false};
+ New ->
+ {lists:map(fun item_to_xml/1, RosterItems), New}
+ end;
+
+ _ ->
+ {lists:map(fun item_to_xml/1,
+ ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), false}
+ end,
+ IQ#iq{type = result,
+ sub_el = case {ItemsToSend, VersionToSend} of
+ {false, false} ->
+ [];
+ {Items, false} ->
+ [{xmlelement, "query",
+ [{"xmlns", ?NS_ROSTER}],
+ Items}];
+ {Items, Version} ->
+ [{xmlelement, "query",
+ [{"xmlns", ?NS_ROSTER},
+ {"ver", Version}],
+ Items}]
+ end}
+ catch
+ _:_ ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
+ end.
+
+
+get_user_roster(Acc, {LUser, LServer}) ->
+ Items = get_roster(LUser, LServer),
+ lists:filter(fun(#roster{subscription = none, ask = in}) ->
+ false;
+ (_) ->
+ true
+ end, Items) ++ Acc.
+
+get_roster(LUser, LServer) ->
+ Username = list_to_binary(LUser),
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
+ {ok, JGrps} when is_list(JGrps) ->
+ JGrps;
+ _ ->
+ []
+ end,
+ GroupsDict = dict:from_list(JIDGroups),
+ RItems = lists:flatmap(
+ fun(I) ->
+ case raw_to_record(LServer, I) of
+ %% Bad JID in database:
+ error ->
+ [];
+ R ->
+ SJID = jlib:jid_to_string(R#roster.jid),
+ Groups =
+ case dict:find(SJID, GroupsDict) of
+ {ok, Gs} -> Gs;
+ error -> []
+ end,
+ [R#roster{groups = Groups}]
+ end
+ end, Items),
+ RItems;
+ _ ->
+ []
+ end.
+
+
+item_to_xml(Item) ->
+ Attrs1 = [{"jid", jlib:jid_to_string(Item#roster.jid)}],
+ Attrs2 = case Item#roster.name of
+ "" ->
+ Attrs1;
+ Name ->
+ [{"name", Name} | Attrs1]
+ end,
+ Attrs3 = case Item#roster.subscription of
+ none ->
+ [{"subscription", "none"} | Attrs2];
+ from ->
+ [{"subscription", "from"} | Attrs2];
+ to ->
+ [{"subscription", "to"} | Attrs2];
+ both ->
+ [{"subscription", "both"} | Attrs2];
+ remove ->
+ [{"subscription", "remove"} | Attrs2]
+ end,
+ Attrs = case ask_to_pending(Item#roster.ask) of
+ out ->
+ [{"ask", "subscribe"} | Attrs3];
+ both ->
+ [{"ask", "subscribe"} | Attrs3];
+ _ ->
+ Attrs3
+ end,
+ SubEls = lists:map(fun(G) ->
+ {xmlelement, "group", [], [{xmlcdata, G}]}
+ end, Item#roster.groups),
+ {xmlelement, "item", Attrs, SubEls}.
+
+
+process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
+ {xmlelement, _Name, _Attrs, Els} = SubEl,
+ lists:foreach(fun(El) -> process_item_set(From, To, El) end, Els),
+ IQ#iq{type = result, sub_el = []}.
+
+process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) ->
+ JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)),
+ #jid{user = User, luser = LUser, lserver = LServer} = From,
+ case JID1 of
+ error ->
+ ok;
+ _ ->
+ LJID = jlib:jid_tolower(JID1),
+ Username = list_to_binary(LUser),
+ SJID = list_to_binary(jlib:jid_to_string(LJID)),
+ F = fun() ->
+ Res = riak_get_roster_by_jid(LServer, Username, SJID),
+ Item = case Res of
+ {error, _} ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID};
+ {ok, I} ->
+ R = raw_to_record(LServer, I),
+ case R of
+ %% Bad JID in database:
+ error ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID};
+ _ ->
+ R#roster{
+ usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID,
+ name = ""}
+ end
+ end,
+ Item1 = process_item_attrs(Item, Attrs),
+ Item2 = process_item_els(Item1, Els),
+ case Item2#roster.subscription of
+ remove ->
+ riak_del_roster(LServer, Username, SJID);
+ _ ->
+ ItemVals = record_to_string(Item2),
+ ItemGroups = groups_to_binary(Item2),
+ riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups)
+ end,
+ %% If the item exist in shared roster, take the
+ %% subscription information from there:
+ Item3 = ejabberd_hooks:run_fold(roster_process_item,
+ LServer, Item2, [LServer]),
+ case roster_version_on_db(LServer) of
+ true ->
+ RosterVersion = sha:sha(term_to_binary(now())),
+ riak_set_roster_version(
+ LServer, Username, RosterVersion);
+ false -> ok
+ end,
+ {ok, Item, Item3}
+ end,
+ case catch F() of
+ {ok, OldItem, Item} ->
+ push_item(User, LServer, To, Item),
+ case Item#roster.subscription of
+ remove ->
+ send_unsubscribing_presence(From, OldItem),
+ ok;
+ _ ->
+ ok
+ end;
+ E ->
+ ?ERROR_MSG("ROSTER: roster item set error: ~p~n", [E]),
+ ok
+ end
+ end;
+process_item_set(_From, _To, _) ->
+ ok.
+
+process_item_attrs(Item, [{Attr, Val} | Attrs]) ->
+ case Attr of
+ "jid" ->
+ case jlib:string_to_jid(Val) of
+ error ->
+ process_item_attrs(Item, Attrs);
+ JID1 ->
+ JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+ process_item_attrs(Item#roster{jid = JID}, Attrs)
+ end;
+ "name" ->
+ process_item_attrs(Item#roster{name = Val}, Attrs);
+ "subscription" ->
+ case Val of
+ "remove" ->
+ process_item_attrs(Item#roster{subscription = remove},
+ Attrs);
+ _ ->
+ process_item_attrs(Item, Attrs)
+ end;
+ "ask" ->
+ process_item_attrs(Item, Attrs);
+ _ ->
+ process_item_attrs(Item, Attrs)
+ end;
+process_item_attrs(Item, []) ->
+ Item.
+
+
+process_item_els(Item, [{xmlelement, Name, _Attrs, SEls} | Els]) ->
+ case Name of
+ "group" ->
+ Groups = [xml:get_cdata(SEls) | Item#roster.groups],
+ process_item_els(Item#roster{groups = Groups}, Els);
+ _ ->
+ process_item_els(Item, Els)
+ end;
+process_item_els(Item, [{xmlcdata, _} | Els]) ->
+ process_item_els(Item, Els);
+process_item_els(Item, []) ->
+ Item.
+
+
+push_item(User, Server, From, Item) ->
+ ejabberd_sm:route(jlib:make_jid("", "", ""),
+ jlib:make_jid(User, Server, ""),
+ {xmlelement, "broadcast", [],
+ [{item,
+ Item#roster.jid,
+ Item#roster.subscription}]}),
+ case roster_versioning_enabled(Server) of
+ true ->
+ push_item_version(Server, User, From, Item, roster_version(Server, User));
+ false ->
+ lists:foreach(fun(Resource) ->
+ push_item(User, Server, Resource, From, Item)
+ end, ejabberd_sm:get_user_resources(User, Server))
+ end.
+
+% TODO: don't push to those who not load roster
+push_item(User, Server, Resource, From, Item) ->
+ ResIQ = #iq{type = set, xmlns = ?NS_ROSTER,
+ id = "push" ++ randoms:get_string(),
+ sub_el = [{xmlelement, "query",
+ [{"xmlns", ?NS_ROSTER}],
+ [item_to_xml(Item)]}]},
+ ejabberd_router:route(
+ From,
+ jlib:make_jid(User, Server, Resource),
+ jlib:iq_to_xml(ResIQ)).
+
+%% @doc Roster push, calculate and include the version attribute.
+%% TODO: don't push to those who didn't load roster
+push_item_version(Server, User, From, Item, RosterVersion) ->
+ lists:foreach(fun(Resource) ->
+ push_item_version(User, Server, Resource, From, Item, RosterVersion)
+ end, ejabberd_sm:get_user_resources(User, Server)).
+
+push_item_version(User, Server, Resource, From, Item, RosterVersion) ->
+ IQPush = #iq{type = 'set', xmlns = ?NS_ROSTER,
+ id = "push" ++ randoms:get_string(),
+ sub_el = [{xmlelement, "query",
+ [{"xmlns", ?NS_ROSTER},
+ {"ver", RosterVersion}],
+ [item_to_xml(Item)]}]},
+ ejabberd_router:route(
+ From,
+ jlib:make_jid(User, Server, Resource),
+ jlib:iq_to_xml(IQPush)).
+
+get_subscription_lists(_, User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = list_to_binary(LUser),
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ fill_subscription_lists(LServer, Items, [], []);
+ _ ->
+ {[], []}
+ end.
+
+fill_subscription_lists(LServer, [RawI | Is], F, T) ->
+ I = raw_to_record(LServer, RawI),
+ case I of
+ %% Bad JID in database:
+ error ->
+ fill_subscription_lists(LServer, Is, F, T);
+ _ ->
+ J = I#roster.jid,
+ case I#roster.subscription of
+ both ->
+ fill_subscription_lists(LServer, Is, [J | F], [J | T]);
+ from ->
+ fill_subscription_lists(LServer, Is, [J | F], T);
+ to ->
+ fill_subscription_lists(LServer, Is, F, [J | T]);
+ _ ->
+ fill_subscription_lists(LServer, Is, F, T)
+ end
+ end;
+fill_subscription_lists(_LServer, [], F, T) ->
+ {F, T}.
+
+ask_to_pending(subscribe) -> out;
+ask_to_pending(unsubscribe) -> none;
+ask_to_pending(Ask) -> Ask.
+
+
+
+in_subscription(_, User, Server, JID, Type, Reason) ->
+ process_subscription(in, User, Server, JID, Type, Reason).
+
+out_subscription(User, Server, JID, Type) ->
+ process_subscription(out, User, Server, JID, Type, []).
+
+process_subscription(Direction, User, Server, JID1, Type, Reason) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ LJID = jlib:jid_tolower(JID1),
+ Username = list_to_binary(LUser),
+ SJID = list_to_binary(jlib:jid_to_string(LJID)),
+ F = fun() ->
+ Item =
+ case riak_get_roster_by_jid(LServer, Username, SJID) of
+ {ok, I} ->
+ %% raw_to_record can return error, but
+ %% jlib_to_string would fail before this point
+ R = raw_to_record(LServer, I),
+ Groups =
+ case riak_get_roster_groups(LServer, Username, SJID) of
+ {selected, ["grp"], JGrps} when is_list(JGrps) ->
+ [JGrp || {JGrp} <- JGrps];
+ _ ->
+ []
+ end,
+ R#roster{groups = Groups};
+ {error, _} ->
+ #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID}
+ end,
+ NewState = case Direction of
+ out ->
+ out_state_change(Item#roster.subscription,
+ Item#roster.ask,
+ Type);
+ in ->
+ in_state_change(Item#roster.subscription,
+ Item#roster.ask,
+ Type)
+ end,
+ AutoReply = case Direction of
+ out ->
+ none;
+ in ->
+ in_auto_reply(Item#roster.subscription,
+ Item#roster.ask,
+ Type)
+ end,
+ AskMessage = case NewState of
+ {_, both} -> Reason;
+ {_, in} -> Reason;
+ _ -> ""
+ end,
+ case NewState of
+ none ->
+ {ok, none, AutoReply};
+ {none, none} when Item#roster.subscription == none,
+ Item#roster.ask == in ->
+ riak_del_roster(LServer, Username, SJID),
+ {ok, none, AutoReply};
+ {Subscription, Pending} ->
+ NewItem = Item#roster{subscription = Subscription,
+ ask = Pending,
+ askmessage = AskMessage},
+ ItemVals = record_to_string(NewItem),
+ riak_roster_subscribe(LServer, Username, SJID, ItemVals),
+ case roster_version_on_db(LServer) of
+ true ->
+ riak_set_roster_version(
+ LServer, Username,
+ sha:sha(term_to_binary(now())));
+ false -> ok
+ end,
+ {ok, {push, NewItem}, AutoReply}
+ end
+ end,
+ case catch F() of
+ {ok, Push, AutoReply} ->
+ case AutoReply of
+ none ->
+ ok;
+ _ ->
+ T = case AutoReply of
+ subscribed -> "subscribed";
+ unsubscribed -> "unsubscribed"
+ end,
+ ejabberd_router:route(
+ jlib:make_jid(User, Server, ""), JID1,
+ {xmlelement, "presence", [{"type", T}], []})
+ end,
+ case Push of
+ {push, Item} ->
+ if
+ Item#roster.subscription == none,
+ Item#roster.ask == in ->
+ ok;
+ true ->
+ push_item(User, Server,
+ jlib:make_jid(User, Server, ""), Item)
+ end,
+ true;
+ none ->
+ false
+ end;
+ E ->
+ ?ERROR_MSG("subscription error: ~p~n", [E]),
+ false
+ end.
+
+
+%% in_state_change(Subscription, Pending, Type) -> NewState
+%% NewState = none | {NewSubscription, NewPending}
+-ifdef(ROSTER_GATEWAY_WORKAROUND).
+-define(NNSD, {to, none}).
+-define(NISD, {to, in}).
+-else.
+-define(NNSD, none).
+-define(NISD, none).
+-endif.
+
+in_state_change(none, none, subscribe) -> {none, in};
+in_state_change(none, none, subscribed) -> ?NNSD;
+in_state_change(none, none, unsubscribe) -> none;
+in_state_change(none, none, unsubscribed) -> none;
+in_state_change(none, out, subscribe) -> {none, both};
+in_state_change(none, out, subscribed) -> {to, none};
+in_state_change(none, out, unsubscribe) -> none;
+in_state_change(none, out, unsubscribed) -> {none, none};
+in_state_change(none, in, subscribe) -> none;
+in_state_change(none, in, subscribed) -> ?NISD;
+in_state_change(none, in, unsubscribe) -> {none, none};
+in_state_change(none, in, unsubscribed) -> none;
+in_state_change(none, both, subscribe) -> none;
+in_state_change(none, both, subscribed) -> {to, in};
+in_state_change(none, both, unsubscribe) -> {none, out};
+in_state_change(none, both, unsubscribed) -> {none, in};
+in_state_change(to, none, subscribe) -> {to, in};
+in_state_change(to, none, subscribed) -> none;
+in_state_change(to, none, unsubscribe) -> none;
+in_state_change(to, none, unsubscribed) -> {none, none};
+in_state_change(to, in, subscribe) -> none;
+in_state_change(to, in, subscribed) -> none;
+in_state_change(to, in, unsubscribe) -> {to, none};
+in_state_change(to, in, unsubscribed) -> {none, in};
+in_state_change(from, none, subscribe) -> none;
+in_state_change(from, none, subscribed) -> {both, none};
+in_state_change(from, none, unsubscribe) -> {none, none};
+in_state_change(from, none, unsubscribed) -> none;
+in_state_change(from, out, subscribe) -> none;
+in_state_change(from, out, subscribed) -> {both, none};
+in_state_change(from, out, unsubscribe) -> {none, out};
+in_state_change(from, out, unsubscribed) -> {from, none};
+in_state_change(both, none, subscribe) -> none;
+in_state_change(both, none, subscribed) -> none;
+in_state_change(both, none, unsubscribe) -> {to, none};
+in_state_change(both, none, unsubscribed) -> {from, none}.
+
+out_state_change(none, none, subscribe) -> {none, out};
+out_state_change(none, none, subscribed) -> none;
+out_state_change(none, none, unsubscribe) -> none;
+out_state_change(none, none, unsubscribed) -> none;
+out_state_change(none, out, subscribe) -> {none, out}; %% We need to resend query (RFC3921, section 9.2)
+out_state_change(none, out, subscribed) -> none;
+out_state_change(none, out, unsubscribe) -> {none, none};
+out_state_change(none, out, unsubscribed) -> none;
+out_state_change(none, in, subscribe) -> {none, both};
+out_state_change(none, in, subscribed) -> {from, none};
+out_state_change(none, in, unsubscribe) -> none;
+out_state_change(none, in, unsubscribed) -> {none, none};
+out_state_change(none, both, subscribe) -> none;
+out_state_change(none, both, subscribed) -> {from, out};
+out_state_change(none, both, unsubscribe) -> {none, in};
+out_state_change(none, both, unsubscribed) -> {none, out};
+out_state_change(to, none, subscribe) -> none;
+out_state_change(to, none, subscribed) -> {both, none};
+out_state_change(to, none, unsubscribe) -> {none, none};
+out_state_change(to, none, unsubscribed) -> none;
+out_state_change(to, in, subscribe) -> none;
+out_state_change(to, in, subscribed) -> {both, none};
+out_state_change(to, in, unsubscribe) -> {none, in};
+out_state_change(to, in, unsubscribed) -> {to, none};
+out_state_change(from, none, subscribe) -> {from, out};
+out_state_change(from, none, subscribed) -> none;
+out_state_change(from, none, unsubscribe) -> none;
+out_state_change(from, none, unsubscribed) -> {none, none};
+out_state_change(from, out, subscribe) -> none;
+out_state_change(from, out, subscribed) -> none;
+out_state_change(from, out, unsubscribe) -> {from, none};
+out_state_change(from, out, unsubscribed) -> {none, out};
+out_state_change(both, none, subscribe) -> none;
+out_state_change(both, none, subscribed) -> none;
+out_state_change(both, none, unsubscribe) -> {from, none};
+out_state_change(both, none, unsubscribed) -> {to, none}.
+
+in_auto_reply(from, none, subscribe) -> subscribed;
+in_auto_reply(from, out, subscribe) -> subscribed;
+in_auto_reply(both, none, subscribe) -> subscribed;
+in_auto_reply(none, in, unsubscribe) -> unsubscribed;
+in_auto_reply(none, both, unsubscribe) -> unsubscribed;
+in_auto_reply(to, in, unsubscribe) -> unsubscribed;
+in_auto_reply(from, none, unsubscribe) -> unsubscribed;
+in_auto_reply(from, out, unsubscribe) -> unsubscribed;
+in_auto_reply(both, none, unsubscribe) -> unsubscribed;
+in_auto_reply(_, _, _) -> none.
+
+
+remove_user(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ Username = list_to_binary(LUser),
+ send_unsubscription_to_rosteritems(LUser, LServer),
+ riak_del_user_roster(LServer, Username),
+ ok.
+
+%% For each contact with Subscription:
+%% Both or From, send a "unsubscribed" presence stanza;
+%% Both or To, send a "unsubscribe" presence stanza.
+send_unsubscription_to_rosteritems(LUser, LServer) ->
+ RosterItems = get_user_roster([], {LUser, LServer}),
+ From = jlib:make_jid({LUser, LServer, ""}),
+ lists:foreach(fun(RosterItem) ->
+ send_unsubscribing_presence(From, RosterItem)
+ end,
+ RosterItems).
+
+%% @spec (From::jid(), Item::roster()) -> ok
+send_unsubscribing_presence(From, Item) ->
+ IsTo = case Item#roster.subscription of
+ both -> true;
+ to -> true;
+ _ -> false
+ end,
+ IsFrom = case Item#roster.subscription of
+ both -> true;
+ from -> true;
+ _ -> false
+ end,
+ if IsTo ->
+ send_presence_type(
+ jlib:jid_remove_resource(From),
+ jlib:make_jid(Item#roster.jid), "unsubscribe");
+ true -> ok
+ end,
+ if IsFrom ->
+ send_presence_type(
+ jlib:jid_remove_resource(From),
+ jlib:make_jid(Item#roster.jid), "unsubscribed");
+ true -> ok
+ end,
+ ok.
+
+send_presence_type(From, To, Type) ->
+ ejabberd_router:route(
+ From, To,
+ {xmlelement, "presence",
+ [{"type", Type}],
+ []}).
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+set_items(User, Server, SubEl) ->
+ {xmlelement, _Name, _Attrs, Els} = SubEl,
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ lists:foreach(fun(El) ->
+ process_item_set_t(LUser, LServer, El)
+ end, Els).
+
+process_item_set_t(LUser, LServer, {xmlelement, _Name, Attrs, Els}) ->
+ JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)),
+ case JID1 of
+ error ->
+ [];
+ _ ->
+ LJID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+ Username = list_to_binary(LUser),
+ SJID = list_to_binary(jlib:jid_to_string(LJID)),
+ Item = #roster{usj = {LUser, LServer, LJID},
+ us = {LUser, LServer},
+ jid = LJID},
+ Item1 = process_item_attrs_ws(Item, Attrs),
+ Item2 = process_item_els(Item1, Els),
+ case Item2#roster.subscription of
+ remove ->
+ riak_del_roster(LServer, Username, SJID);
+ _ ->
+ ItemVals = record_to_string(Item1),
+ ItemGroups = groups_to_binary(Item2),
+ riak_update_roster(
+ LServer, Username, SJID, ItemVals, ItemGroups)
+ end
+ end;
+process_item_set_t(_LUser, _LServer, _) ->
+ [].
+
+process_item_attrs_ws(Item, [{Attr, Val} | Attrs]) ->
+ case Attr of
+ "jid" ->
+ case jlib:string_to_jid(Val) of
+ error ->
+ process_item_attrs_ws(Item, Attrs);
+ JID1 ->
+ JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource},
+ process_item_attrs_ws(Item#roster{jid = JID}, Attrs)
+ end;
+ "name" ->
+ process_item_attrs_ws(Item#roster{name = Val}, Attrs);
+ "subscription" ->
+ case Val of
+ "remove" ->
+ process_item_attrs_ws(Item#roster{subscription = remove},
+ Attrs);
+ "none" ->
+ process_item_attrs_ws(Item#roster{subscription = none},
+ Attrs);
+ "both" ->
+ process_item_attrs_ws(Item#roster{subscription = both},
+ Attrs);
+ "from" ->
+ process_item_attrs_ws(Item#roster{subscription = from},
+ Attrs);
+ "to" ->
+ process_item_attrs_ws(Item#roster{subscription = to},
+ Attrs);
+ _ ->
+ process_item_attrs_ws(Item, Attrs)
+ end;
+ "ask" ->
+ process_item_attrs_ws(Item, Attrs);
+ _ ->
+ process_item_attrs_ws(Item, Attrs)
+ end;
+process_item_attrs_ws(Item, []) ->
+ Item.
+
+get_in_pending_subscriptions(Ls, User, Server) ->
+ JID = jlib:make_jid(User, Server, ""),
+ LUser = JID#jid.luser,
+ LServer = JID#jid.lserver,
+ Username = list_to_binary(LUser),
+ case catch riak_get_roster(LServer, Username) of
+ {ok, Items} when is_list(Items) ->
+ Ls ++ lists:map(
+ fun(R) ->
+ Message = R#roster.askmessage,
+ {xmlelement, "presence",
+ [{"from", jlib:jid_to_string(R#roster.jid)},
+ {"to", jlib:jid_to_string(JID)},
+ {"type", "subscribe"}],
+ [{xmlelement, "status", [],
+ [{xmlcdata, Message}]}]}
+ end,
+ lists:flatmap(
+ fun(I) ->
+ case raw_to_record(LServer, I) of
+ %% Bad JID in database:
+ error ->
+ [];
+ R ->
+ case R#roster.ask of
+ in -> [R];
+ both -> [R];
+ _ -> []
+ end
+ end
+ end,
+ Items));
+ _ ->
+ Ls
+ end.
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+get_jid_info(_, User, Server, JID) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ LJID = jlib:jid_tolower(JID),
+ Username = list_to_binary(LUser),
+ SJID = list_to_binary(jlib:jid_to_string(LJID)),
+ case catch riak_get_subscription(LServer, Username, SJID) of
+ {ok, Subscription} ->
+ Groups =
+ case catch riak_get_roster_groups(LServer, Username, SJID) of
+ {ok, JGrps} when is_list(JGrps) ->
+ lists:map(fun binary_to_list/1, JGrps);
+ _ ->
+ []
+ end,
+ {Subscription, Groups};
+ _ ->
+ LRJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
+ if
+ LRJID == LJID ->
+ {none, []};
+ true ->
+ SRJID = list_to_binary(jlib:jid_to_string(LRJID)),
+ case catch riak_get_subscription(LServer, Username, SRJID) of
+ {ok, Subscription} ->
+ Groups = case catch riak_get_roster_groups(LServer, Username, SRJID) of
+ {ok, JGrps} when is_list(JGrps) ->
+ lists:map(fun binary_to_list/1,
+ JGrps);
+ _ ->
+ []
+ end,
+ {Subscription, Groups};
+ _ ->
+ {none, []}
+ end
+ end
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+raw_to_record(LServer,
+ <<UsernameLen:16, Username:UsernameLen/binary,
+ SJIDLen:16, SJID:SJIDLen/binary,
+ NickLen:16, Nick:NickLen/binary,
+ SSubscription, SAsk,
+ SAskMessageLen:16, SAskMessage:SAskMessageLen/binary>>) ->
+ User = binary_to_list(Username),
+ case jlib:string_to_jid(binary_to_list(SJID)) of
+ error ->
+ error;
+ JID ->
+ LJID = jlib:jid_tolower(JID),
+ Subscription = case SSubscription of
+ $B -> both;
+ $T -> to;
+ $F -> from;
+ _ -> none
+ end,
+ Ask = case SAsk of
+ $S -> subscribe;
+ $U -> unsubscribe;
+ $B -> both;
+ $O -> out;
+ $I -> in;
+ _ -> none
+ end,
+ #roster{usj = {User, LServer, LJID},
+ us = {User, LServer},
+ jid = LJID,
+ name = binary_to_list(Nick),
+ subscription = Subscription,
+ ask = Ask,
+ askmessage = SAskMessage}
+ end.
+
+record_to_string(#roster{us = {User, _Server},
+ jid = JID,
+ name = Name,
+ subscription = Subscription,
+ ask = Ask,
+ askmessage = AskMessage}) ->
+ Username = list_to_binary(User),
+ UsernameLen = size(Username),
+ SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))),
+ SJIDLen = size(SJID),
+ Nick = list_to_binary(Name),
+ NickLen = size(Nick),
+ SSubscription = case Subscription of
+ both -> $B;
+ to -> $T;
+ from -> $F;
+ none -> $N
+ end,
+ SAsk = case Ask of
+ subscribe -> $S;
+ unsubscribe -> $U;
+ both -> $B;
+ out -> $O;
+ in -> $I;
+ none -> $N
+ end,
+ SAskMessage = iolist_to_binary(AskMessage),
+ SAskMessageLen = size(SAskMessage),
+ <<UsernameLen:16, Username/binary,
+ SJIDLen:16, SJID/binary,
+ NickLen:16, Nick/binary,
+ SSubscription, SAsk,
+ SAskMessageLen:16, SAskMessage/binary>>.
+
+groups_to_binary(#roster{jid = JID, groups = Groups}) ->
+ SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))),
+ SJIDLen = size(SJID),
+ %% Empty groups do not need to be converted to string to be inserted in
+ %% the database
+ lists:foldl(
+ fun([], Acc) ->
+ Acc;
+ (Group, Acc) ->
+ G = list_to_binary(Group),
+ Len = size(G),
+ <<Acc/binary, Len:16, G/binary>>
+ end, <<SJIDLen:16, SJID/binary>>, Groups).
+
+binary_to_groups(<<Len:16, SJID:Len/binary, Rest/binary>>) ->
+ {binary_to_list(SJID), binary_to_groups(Rest, [])}.
+
+binary_to_groups(<<Len:16, G:Len/binary, Rest/binary>>, Res) ->
+ binary_to_groups(Rest, [G | Res]);
+binary_to_groups(_, Res) ->
+ Res.
+
+
+webadmin_page(_, Host,
+ #request{us = _US,
+ path = ["user", U, "roster"],
+ q = Query,
+ lang = Lang} = _Request) ->
+ Res = user_roster(U, Host, Query, Lang),
+ {stop, Res};
+
+webadmin_page(Acc, _, _) -> Acc.
+
+user_roster(User, Server, Query, Lang) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ US = {LUser, LServer},
+ Items1 = get_roster(LUser, LServer),
+ Res = user_roster_parse_query(User, Server, Items1, Query),
+ Items = get_roster(LUser, LServer),
+ SItems = lists:sort(Items),
+ FItems =
+ case SItems of
+ [] ->
+ [?CT("None")];
+ _ ->
+ [?XE("table",
+ [?XE("thead",
+ [?XE("tr",
+ [?XCT("td", "Jabber ID"),
+ ?XCT("td", "Nickname"),
+ ?XCT("td", "Subscription"),
+ ?XCT("td", "Pending"),
+ ?XCT("td", "Groups")
+ ])]),
+ ?XE("tbody",
+ lists:map(
+ fun(R) ->
+ Groups =
+ lists:flatmap(
+ fun(Group) ->
+ [?C(Group), ?BR]
+ end, R#roster.groups),
+ Pending = ask_to_pending(R#roster.ask),
+ TDJID = build_contact_jid_td(R#roster.jid),
+ ?XE("tr",
+ [TDJID,
+ ?XAC("td", [{"class", "valign"}],
+ R#roster.name),
+ ?XAC("td", [{"class", "valign"}],
+ atom_to_list(R#roster.subscription)),
+ ?XAC("td", [{"class", "valign"}],
+ atom_to_list(Pending)),
+ ?XAE("td", [{"class", "valign"}], Groups),
+ if
+ Pending == in ->
+ ?XAE("td", [{"class", "valign"}],
+ [?INPUTT("submit",
+ "validate" ++
+ ejabberd_web_admin:term_to_id(R#roster.jid),
+ "Validate")]);
+ true ->
+ ?X("td")
+ end,
+ ?XAE("td", [{"class", "valign"}],
+ [?INPUTT("submit",
+ "remove" ++
+ ejabberd_web_admin:term_to_id(R#roster.jid),
+ "Remove")])])
+ end, SItems))])]
+ end,
+ [?XC("h1", ?T("Roster of ") ++ us_to_list(US))] ++
+ case Res of
+ ok -> [?XREST("Submitted")];
+ error -> [?XREST("Bad format")];
+ nothing -> []
+ end ++
+ [?XAE("form", [{"action", ""}, {"method", "post"}],
+ FItems ++
+ [?P,
+ ?INPUT("text", "newjid", ""), ?C(" "),
+ ?INPUTT("submit", "addjid", "Add Jabber ID")
+ ])].
+
+build_contact_jid_td(RosterJID) ->
+ %% Convert {U, S, R} into {jid, U, S, R, U, S, R}:
+ ContactJID = jlib:make_jid(RosterJID),
+ JIDURI = case {ContactJID#jid.luser, ContactJID#jid.lserver} of
+ {"", _} -> "";
+ {CUser, CServer} ->
+ case lists:member(CServer, ?MYHOSTS) of
+ false -> "";
+ true -> "/admin/server/" ++ CServer ++ "/user/" ++ CUser ++ "/"
+ end
+ end,
+ case JIDURI of
+ [] ->
+ ?XAC("td", [{"class", "valign"}], jlib:jid_to_string(RosterJID));
+ URI when is_list(URI) ->
+ ?XAE("td", [{"class", "valign"}], [?AC(JIDURI, jlib:jid_to_string(RosterJID))])
+ end.
+
+user_roster_parse_query(User, Server, Items, Query) ->
+ case lists:keysearch("addjid", 1, Query) of
+ {value, _} ->
+ case lists:keysearch("newjid", 1, Query) of
+ {value, {_, undefined}} ->
+ error;
+ {value, {_, SJID}} ->
+ case jlib:string_to_jid(SJID) of
+ JID when is_record(JID, jid) ->
+ user_roster_subscribe_jid(User, Server, JID),
+ ok;
+ error ->
+ error
+ end;
+ false ->
+ error
+ end;
+ false ->
+ case catch user_roster_item_parse_query(
+ User, Server, Items, Query) of
+ submitted ->
+ ok;
+ {'EXIT', _Reason} ->
+ error;
+ _ ->
+ nothing
+ end
+ end.
+
+
+user_roster_subscribe_jid(User, Server, JID) ->
+ out_subscription(User, Server, JID, subscribe),
+ UJID = jlib:make_jid(User, Server, ""),
+ ejabberd_router:route(
+ UJID, JID, {xmlelement, "presence", [{"type", "subscribe"}], []}).
+
+user_roster_item_parse_query(User, Server, Items, Query) ->
+ lists:foreach(
+ fun(R) ->
+ JID = R#roster.jid,
+ case lists:keysearch(
+ "validate" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+ {value, _} ->
+ JID1 = jlib:make_jid(JID),
+ out_subscription(
+ User, Server, JID1, subscribed),
+ UJID = jlib:make_jid(User, Server, ""),
+ ejabberd_router:route(
+ UJID, JID1, {xmlelement, "presence",
+ [{"type", "subscribed"}], []}),
+ throw(submitted);
+ false ->
+ case lists:keysearch(
+ "remove" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of
+ {value, _} ->
+ UJID = jlib:make_jid(User, Server, ""),
+ process_iq(
+ UJID, UJID,
+ #iq{type = set,
+ sub_el = {xmlelement, "query",
+ [{"xmlns", ?NS_ROSTER}],
+ [{xmlelement, "item",
+ [{"jid", jlib:jid_to_string(JID)},
+ {"subscription", "remove"}],
+ []}]}}),
+ throw(submitted);
+ false ->
+ ok
+ end
+
+ end
+ end, Items),
+ nothing.
+
+us_to_list({User, Server}) ->
+ jlib:jid_to_string({User, Server, ""}).
+
+webadmin_user(Acc, _User, _Server, Lang) ->
+ Acc ++ [?XE("h3", [?ACT("roster/", "Roster")])].
+
+
+riak_get_roster(LServer, Username) ->
+ ejabberd_riak:get_by_index(
+ LServer, <<"roster">>, <<"user_bin">>, Username).
+
+riak_get_roster_jid_groups(LServer, Username) ->
+ case ejabberd_riak:get_by_index(
+ LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+ {ok, JGs} ->
+ Res = lists:map(fun binary_to_groups/1, JGs),
+ {ok, Res};
+ Error -> Error
+ end.
+
+riak_get_roster_groups(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ case ejabberd_riak:get(LServer, <<"roster_groups">>, Key) of
+ {ok, Gs} ->
+ {_, Res} = binary_to_groups(Gs),
+ {ok, Res};
+ {error, notfound} ->
+ {ok, []};
+ Error -> Error
+ end.
+
+riak_get_roster_by_jid(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:get(LServer, <<"roster">>, Key).
+
+riak_del_roster(LServer, Username, SJID) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:delete(LServer, <<"roster">>, Key).
+
+riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:put(
+ LServer, <<"roster">>, Key, ItemVals,
+ [{<<"user_bin">>, Username}]),
+ ejabberd_riak:put(
+ LServer, <<"roster_groups">>, Key, ItemGroups,
+ [{<<"user_bin">>, Username}]).
+
+riak_roster_subscribe(LServer, Username, SJID, ItemVals) ->
+ Key = <<Username/binary, $/, SJID/binary>>,
+ ejabberd_riak:put(
+ LServer, <<"roster">>, Key, ItemVals,
+ [{<<"user_bin">>, Username}]).
+
+riak_get_subscription(LServer, Username, SJID) ->
+ case riak_get_roster_by_jid(LServer, Username, SJID) of
+ {ok, SR} ->
+ case raw_to_record(LServer, SR) of
+ error ->
+ {error, bad_record};
+ R ->
+ {ok, R#roster.subscription}
+ end;
+ Error ->
+ Error
+ end.
+
+riak_set_roster_version(LServer, Username, RosterVersion) ->
+ ejabberd_riak:put(LServer, <<"roster_version">>,
+ Username, list_to_binary(RosterVersion)).
+
+
+riak_del_user_roster(LServer, Username) ->
+ case ejabberd_riak:get_keys_by_index(
+ LServer, <<"roster">>, <<"user_bin">>, Username) of
+ {ok, Keys} ->
+ lists:foreach(
+ fun(Key) ->
+ ejabberd_riak:delete(LServer, <<"roster">>, Key)
+ end, Keys);
+ _ ->
+ ok
+ end,
+ case ejabberd_riak:get_keys_by_index(
+ LServer, <<"roster_groups">>, <<"user_bin">>, Username) of
+ {ok, GKeys} ->
+ lists:foreach(
+ fun(Key) ->
+ ejabberd_riak:delete(LServer, <<"roster_groups">>, Key)
+ end, GKeys);
+ _ ->
+ ok
+ end,
+ ejabberd_riak:delete(LServer, <<"roster_version">>, Username).
+