]> granicus.if.org Git - ejabberd/commitdiff
Distribute routing of MUC messages accross all CPU cores
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Fri, 5 Jul 2019 07:35:31 +0000 (10:35 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Fri, 5 Jul 2019 07:35:31 +0000 (10:35 +0300)
Also relay as less stanzas as possible through mod_muc workers

src/mod_muc.erl
src/mod_muc_opt.erl
src/mod_muc_room.erl

index ef9a904b83d7fff040819560b4cfce92777f372b..c696812616c9c106fbd3cf0fef9d6b41a74cd127 100644 (file)
 %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 %%%
 %%%----------------------------------------------------------------------
-
 -module(mod_muc).
-
 -author('alexey@process-one.net').
-
 -protocol({xep, 45, '1.25'}).
-
--behaviour(gen_server).
-
+-ifndef(GEN_SERVER).
+-define(GEN_SERVER, gen_server).
+-endif.
+-behaviour(?GEN_SERVER).
 -behaviour(gen_mod).
 
 %% API
 -export([start/2,
         stop/1,
+        start_link/3,
         reload/3,
         room_destroyed/4,
         store_room/4,
@@ -66,7 +65,9 @@
         count_online_rooms_by_user/3,
         get_online_rooms_by_user/3,
         can_use_nick/4,
-        get_subscribed_rooms/2]).
+        get_subscribed_rooms/2,
+        procname/2,
+        route/1]).
 
 -export([init/1, handle_call/3, handle_cast/2,
         handle_info/2, terminate/2, code_change/3,
 -include("xmpp.hrl").
 -include("mod_muc.hrl").
 -include("translate.hrl").
+-include("ejabberd_stacktrace.hrl").
 
--record(state,
-       {hosts = [] :: [binary()],
-         server_host = <<"">> :: binary(),
-         access = {none, none, none, none} :: {atom(), atom(), atom(), atom(), atom()},
-         history_size = 20 :: non_neg_integer(),
-         max_rooms_discoitems = 100 :: non_neg_integer(),
-        queue_type = ram :: ram | file,
-         default_room_opts = [] :: list(),
-         room_shaper = none :: ejabberd_shaper:shaper()}).
+-record(state, {hosts :: [binary()],
+               server_host :: binary(),
+               worker :: pos_integer()}).
 
+-type access() :: {acl:acl(), acl:acl(), acl:acl(), acl:acl(), acl:acl()}.
 -type muc_room_opts() :: [{atom(), any()}].
+-export_type([access/0]).
 -callback init(binary(), gen_mod:opts()) -> any().
 -callback import(binary(), binary(), [binary()]) -> ok.
 -callback store_room(binary(), binary(), binary(), list(), list()|undefined) -> {atomic, any()}.
 %% API
 %%====================================================================
 start(Host, Opts) ->
-    gen_mod:start_child(?MODULE, Host, Opts).
+    case mod_muc_sup:start(Host, Opts) of
+       {ok, _} ->
+           MyHosts = gen_mod:get_opt_hosts(Opts),
+           Mod = gen_mod:db_mod(Opts, ?MODULE),
+           RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
+           Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
+           RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
+           load_permanent_rooms(MyHosts, Host, Opts);
+       Err ->
+           Err
+    end.
 
 stop(Host) ->
+    Proc = mod_muc_sup:procname(Host),
     Rooms = shutdown_rooms(Host),
-    gen_mod:stop_child(?MODULE, Host),
+    supervisor:terminate_child(ejabberd_gen_mod_sup, Proc),
+    supervisor:delete_child(ejabberd_gen_mod_sup, Proc),
     {wait, Rooms}.
 
-reload(Host, NewOpts, OldOpts) ->
-    Proc = gen_mod:get_module_proc(Host, ?MODULE),
-    gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}).
+-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok.
+reload(ServerHost, NewOpts, OldOpts) ->
+    NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
+    NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE),
+    OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
+    OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE),
+    NewHosts = gen_mod:get_opt_hosts(NewOpts),
+    OldHosts = gen_mod:get_opt_hosts(OldOpts),
+    AddHosts = NewHosts -- OldHosts,
+    DelHosts = OldHosts -- NewHosts,
+    if NewMod /= OldMod ->
+           NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
+       true ->
+           ok
+    end,
+    if NewRMod /= OldRMod ->
+           NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
+       true ->
+           ok
+    end,
+    lists:foreach(
+      fun(I) ->
+             ?GEN_SERVER:cast(procname(ServerHost, I),
+                              {reload, AddHosts, DelHosts, NewHosts})
+      end, lists:seq(1, erlang:system_info(logical_processors))),
+    load_permanent_rooms(AddHosts, ServerHost, NewOpts),
+    shutdown_rooms(ServerHost, DelHosts, OldRMod),
+    lists:foreach(
+      fun(Host) ->
+             lists:foreach(
+               fun({_, _, Pid}) when node(Pid) == node() ->
+                       Pid ! config_reloaded;
+                  (_) ->
+                       ok
+               end, get_online_rooms(ServerHost, Host))
+      end, misc:intersection(NewHosts, OldHosts)).
 
 depends(_Host, _Opts) ->
     [{mod_mam, soft}].
 
+start_link(Host, Opts, I) ->
+    Proc = procname(Host, I),
+    ?GEN_SERVER:start_link({local, Proc}, ?MODULE, [Host, Opts, I],
+                          ejabberd_config:fsm_limit_opts([])).
+
+-spec procname(binary(), pos_integer() | {binary(), binary()}) -> atom().
+procname(Host, I) when is_integer(I) ->
+    binary_to_atom(
+      <<(atom_to_binary(?MODULE, latin1))/binary, "_", Host/binary,
+       "_", (integer_to_binary(I))/binary>>, utf8);
+procname(Host, RoomHost) ->
+    Cores = erlang:system_info(logical_processors),
+    I = erlang:phash2(RoomHost, Cores) + 1,
+    procname(Host, I).
+
+-spec route(stanza()) -> ok.
+route(Pkt) ->
+    To = xmpp:get_to(Pkt),
+    ServerHost = ejabberd_router:host_of_route(To#jid.lserver),
+    route(Pkt, ServerHost).
+
+-spec route(stanza(), binary()) -> ok.
+route(Pkt, ServerHost) ->
+    From = xmpp:get_from(Pkt),
+    To = xmpp:get_to(Pkt),
+    Host = To#jid.lserver,
+    Access = mod_muc_opt:access(ServerHost),
+    case acl:match_rule(ServerHost, Access, From) of
+       allow ->
+           route(Pkt, Host, ServerHost);
+       deny ->
+           Lang = xmpp:get_lang(Pkt),
+            ErrText = ?T("Access denied by service policy"),
+            Err = xmpp:err_forbidden(ErrText, Lang),
+            ejabberd_router:route_error(Pkt, Err)
+    end.
+
+-spec route(stanza(), binary(), binary()) -> ok.
+route(#iq{to = #jid{luser = <<"">>, lresource = <<"">>}} = IQ, _, _) ->
+    ejabberd_router:process_iq(IQ);
+route(#message{lang = Lang, body = Body, type = Type, from = From,
+              to = #jid{luser = <<"">>, lresource = <<"">>}} = Pkt,
+      Host, ServerHost) ->
+    if Type == error ->
+            ok;
+       true ->
+           AccessAdmin = mod_muc_opt:access_admin(ServerHost),
+            case acl:match_rule(ServerHost, AccessAdmin, From) of
+                allow ->
+                    Msg = xmpp:get_text(Body),
+                    broadcast_service_message(ServerHost, Host, Msg);
+                deny ->
+                    ErrText = ?T("Only service administrators are allowed "
+                                 "to send service messages"),
+                    Err = xmpp:err_forbidden(ErrText, Lang),
+                    ejabberd_router:route_error(Pkt, Err)
+            end
+    end;
+route(Pkt, Host, ServerHost) ->
+    {Room, _, _} = jid:tolower(xmpp:get_to(Pkt)),
+    case Room of
+       <<"">> ->
+           Txt = ?T("No module is handling this query"),
+           Err = xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)),
+           ejabberd_router:route_error(Pkt, Err);
+       _ ->
+           RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
+           case RMod:find_online_room(ServerHost, Room, Host) of
+               error ->
+                   Proc = procname(ServerHost, {Room, Host}),
+                   case whereis(Proc) of
+                       Pid when Pid == self() ->
+                           route_to_room(Pkt, ServerHost);
+                       Pid when is_pid(Pid) ->
+                           ?DEBUG("Routing to MUC worker ~p:~n~s", [Proc, xmpp:pp(Pkt)]),
+                           ?GEN_SERVER:cast(Pid, {route_to_room, Pkt});
+                       undefined ->
+                           ?DEBUG("MUC worker ~p is dead", [Proc]),
+                           Err = xmpp:err_internal_server_error(),
+                           ejabberd_router:route_error(Pkt, Err)
+                   end;
+               {ok, Pid} ->
+                   mod_muc_room:route(Pid, Pkt)
+           end
+    end.
+
+-spec shutdown_rooms(binary()) -> [pid()].
 shutdown_rooms(ServerHost) ->
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
     Hosts = gen_mod:get_module_opt_hosts(ServerHost, mod_muc),
+    shutdown_rooms(ServerHost, Hosts, RMod).
+
+-spec shutdown_rooms(binary(), [binary()], module()) -> [pid()].
+shutdown_rooms(ServerHost, Hosts, RMod) ->
     Rooms = [RMod:get_online_rooms(ServerHost, Host, undefined)
             || Host <- Hosts],
     lists:flatmap(
@@ -149,18 +283,18 @@ shutdown_rooms(ServerHost) ->
 %% C) mod_muc:stop was called, and each room is being terminated
 %%    In this case, the mod_muc process died before the room processes
 %%    So the message sending must be catched
+-spec room_destroyed(binary(), binary(), pid(), binary()) -> ok.
 room_destroyed(Host, Room, Pid, ServerHost) ->
-    catch gen_mod:get_module_proc(ServerHost, ?MODULE) !
-           {room_destroyed, {Room, Host}, Pid},
-    ok.
+    Proc = procname(ServerHost, {Room, Host}),
+    ?GEN_SERVER:cast(Proc, {room_destroyed, {Room, Host}, Pid}).
 
 %% @doc Create a room.
 %% If Opts = default, the default room options are used.
 %% Else use the passed options as defined in mod_muc_room.
 create_room(Host, Name, From, Nick, Opts) ->
     ServerHost = ejabberd_router:host_of_route(Host),
-    Proc = gen_mod:get_module_proc(ServerHost, ?MODULE),
-    gen_server:call(Proc, {create, Name, Host, From, Nick, Opts}).
+    Proc = procname(ServerHost, {Name, Host}),
+    ?GEN_SERVER:call(Proc, {create, Name, Host, From, Nick, Opts}).
 
 store_room(ServerHost, Host, Name, Opts) ->
     store_room(ServerHost, Host, Name, Opts, undefined).
@@ -232,228 +366,158 @@ get_online_rooms_by_user(ServerHost, LUser, LServer) ->
 %%====================================================================
 %% gen_server callbacks
 %%====================================================================
-
-init([Host, Opts]) ->
+init([Host, Opts, Worker]) ->
     process_flag(trap_exit, true),
-    #state{access = Access, hosts = MyHosts,
-          history_size = HistorySize, queue_type = QueueType,
-          room_shaper = RoomShaper} = State = init_state(Host, Opts),
-    Mod = gen_mod:db_mod(Opts, ?MODULE),
-    RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
-    Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
-    RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
-    lists:foreach(
-      fun(MyHost) ->
-             register_iq_handlers(MyHost),
-             ejabberd_router:register_route(MyHost, Host),
-             load_permanent_rooms(MyHost, Host, Access, HistorySize,
-                                  RoomShaper, QueueType)
-      end, MyHosts),
-    {ok, State}.
+    MyHosts = gen_mod:get_opt_hosts(Opts),
+    register_routes(Host, MyHosts, Worker),
+    register_iq_handlers(MyHosts, Worker),
+    {ok, #state{server_host = Host, hosts = MyHosts, worker = Worker}}.
 
 handle_call(stop, _From, State) ->
     {stop, normal, ok, State};
 handle_call({create, Room, Host, From, Nick, Opts}, _From,
-           #state{server_host = ServerHost,
-                  access = Access, default_room_opts = DefOpts,
-                  history_size = HistorySize, queue_type = QueueType,
-                  room_shaper = RoomShaper} = State) ->
+           #state{server_host = ServerHost} = State) ->
     ?DEBUG("MUC: create new room '~s'~n", [Room]),
     NewOpts = case Opts of
-               default -> DefOpts;
-               _ -> Opts
+                 default -> mod_muc_opt:default_room_options(ServerHost);
+                 _ -> Opts
              end,
-    {ok, Pid} = mod_muc_room:start(
-                 Host, ServerHost, Access,
-                 Room, HistorySize,
-                 RoomShaper, From,
-                 Nick, NewOpts, QueueType),
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
-    RMod:register_online_room(ServerHost, Room, Host, Pid),
-    ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]),
-    {reply, ok, State}.
+    case start_room(RMod, Host, ServerHost, Room, NewOpts, From, Nick) of
+       {ok, _} ->
+           ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]),
+           {reply, ok, State};
+       Err ->
+           {reply, Err, State}
+    end.
 
-handle_cast({reload, ServerHost, NewOpts, OldOpts}, #state{hosts = OldHosts}) ->
-    NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
-    NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE),
-    OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
-    OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE),
-    #state{hosts = NewHosts} = NewState = init_state(ServerHost, NewOpts),
-    if NewMod /= OldMod ->
-           NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
-       true ->
-           ok
+handle_cast({route_to_room, Packet}, #state{server_host = ServerHost} = State) ->
+    try route_to_room(Packet, ServerHost)
+    catch ?EX_RULE(E, R, St) ->
+            StackTrace = ?EX_STACK(St),
+            ?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p",
+                       [xmpp:pp(Packet), {E, {R, StackTrace}}])
     end,
-    if NewRMod /= OldRMod ->
-           NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
-       true ->
-           ok
-    end,
-    lists:foreach(
-      fun(NewHost) ->
-             ejabberd_router:register_route(NewHost, ServerHost),
-             register_iq_handlers(NewHost)
-      end, NewHosts -- OldHosts),
-    lists:foreach(
-      fun(OldHost) ->
-             ejabberd_router:unregister_route(OldHost),
-             unregister_iq_handlers(OldHost)
-      end, OldHosts -- NewHosts),
-    lists:foreach(
-      fun(Host) ->
-             lists:foreach(
-               fun({_, _, Pid}) when node(Pid) == node() ->
-                       Pid ! config_reloaded;
-                  (_) ->
-                       ok
-               end, get_online_rooms(ServerHost, Host))
-      end, misc:intersection(NewHosts, OldHosts)),
-    {noreply, NewState};
+    {noreply, State};
+handle_cast({room_destroyed, {Room, Host}, Pid}, State) ->
+    ServerHost = State#state.server_host,
+    RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
+    RMod:unregister_online_room(ServerHost, Room, Host, Pid),
+    {noreply, State};
+handle_cast({reload, AddHosts, DelHosts, NewHosts},
+           #state{server_host = ServerHost, worker = Worker} = State) ->
+    register_routes(ServerHost, AddHosts, Worker),
+    register_iq_handlers(AddHosts, Worker),
+    unregister_routes(DelHosts, Worker),
+    unregister_iq_handlers(DelHosts, Worker),
+    {noreply, State#state{hosts = NewHosts}};
 handle_cast(Msg, State) ->
     ?WARNING_MSG("Unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({route, Packet},
-           #state{server_host = ServerHost,
-                  access = Access, default_room_opts = DefRoomOpts,
-                  history_size = HistorySize, queue_type = QueueType,
-                  max_rooms_discoitems = MaxRoomsDiscoItems,
-                  room_shaper = RoomShaper} = State) ->
-    From = xmpp:get_from(Packet),
-    To = xmpp:get_to(Packet),
-    Host = To#jid.lserver,
-    case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
-                       From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems,
-                       QueueType) of
-       {'EXIT', Reason} ->
-           ?ERROR_MSG("~p", [Reason]);
-       _ ->
-           ok
+handle_info({route, Packet}, State) ->
+    %% We can only receive the packet here from other nodes
+    %% where mod_muc is not loaded. Such configuration
+    %% is *highly* discouraged
+    try route(Packet, State#state.server_host)
+    catch ?EX_RULE(E, R, St) ->
+            StackTrace = ?EX_STACK(St),
+            ?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p",
+                       [xmpp:pp(Packet), {E, {R, StackTrace}}])
     end,
     {noreply, State};
 handle_info({room_destroyed, {Room, Host}, Pid}, State) ->
-    ServerHost = State#state.server_host,
-    RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
-    RMod:unregister_online_room(ServerHost, Room, Host, Pid),
-    {noreply, State};
+    %% For backward compat
+    handle_cast({room_destroyed, {Room, Host}, Pid}, State);
 handle_info(Info, State) ->
     ?ERROR_MSG("Unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, #state{hosts = MyHosts}) ->
-    lists:foreach(
-      fun(MyHost) ->
-             ejabberd_router:unregister_route(MyHost),
-             unregister_iq_handlers(MyHost)
-      end, MyHosts).
+terminate(_Reason, #state{hosts = Hosts, worker = Worker}) ->
+    unregister_routes(Hosts, Worker),
+    unregister_iq_handlers(Hosts, Worker).
 
 code_change(_OldVsn, State, _Extra) -> {ok, State}.
 
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
-init_state(Host, Opts) ->
-    MyHosts = gen_mod:get_opt_hosts(Opts),
-    Access = mod_muc_opt:access(Opts),
-    AccessCreate = mod_muc_opt:access_create(Opts),
-    AccessAdmin = mod_muc_opt:access_admin(Opts),
-    AccessPersistent = mod_muc_opt:access_persistent(Opts),
-    AccessMam = mod_muc_opt:access_mam(Opts),
-    HistorySize = mod_muc_opt:history_size(Opts),
-    MaxRoomsDiscoItems = mod_muc_opt:max_rooms_discoitems(Opts),
-    DefRoomOpts = mod_muc_opt:default_room_options(Opts),
-    QueueType = mod_muc_opt:queue_type(Opts),
-    RoomShaper = mod_muc_opt:room_shaper(Opts),
-    #state{hosts = MyHosts,
-          server_host = Host,
-          access = {Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam},
-          default_room_opts = DefRoomOpts,
-          queue_type = QueueType,
-          history_size = HistorySize,
-          max_rooms_discoitems = MaxRoomsDiscoItems,
-          room_shaper = RoomShaper}.
-
-register_iq_handlers(Host) ->
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER,
-                                 ?MODULE, process_register),
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD,
-                                 ?MODULE, process_vcard),
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB,
-                                 ?MODULE, process_mucsub),
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE,
-                                 ?MODULE, process_muc_unique),
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO,
-                                 ?MODULE, process_disco_info),
-    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS,
-                                 ?MODULE, process_disco_items).
-
-unregister_iq_handlers(Host) ->
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER),
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD),
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB),
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE),
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
-    gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS).
-
-do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
-        From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems, QueueType) ->
-    {AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent, _AccessMam} = Access,
-    case acl:match_rule(ServerHost, AccessRoute, From) of
-       allow ->
-           do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
-                     From, To, Packet, DefRoomOpts, QueueType);
-       deny ->
-           Lang = xmpp:get_lang(Packet),
-           ErrText = ?T("Access denied by service policy"),
-           Err = xmpp:err_forbidden(ErrText, Lang),
-           ejabberd_router:route_error(Packet, Err)
-    end.
+-spec register_iq_handlers([binary()], pos_integer()) -> ok.
+register_iq_handlers(Hosts, 1) ->
+    %% Only register handlers on first worker
+    lists:foreach(
+      fun(Host) ->
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER,
+                                           ?MODULE, process_register),
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD,
+                                           ?MODULE, process_vcard),
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB,
+                                           ?MODULE, process_mucsub),
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE,
+                                           ?MODULE, process_muc_unique),
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO,
+                                           ?MODULE, process_disco_info),
+             gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS,
+                                           ?MODULE, process_disco_items)
+      end, Hosts);
+register_iq_handlers(_, _) ->
+    ok.
 
-do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper,
-         _From, #jid{luser = <<"">>, lresource = <<"">>} = _To,
-         #iq{} = IQ, _DefRoomOpts, _QueueType) ->
-    ejabberd_router:process_iq(IQ);
-do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper,
-         From, #jid{luser = <<"">>, lresource = <<"">>} = _To,
-         #message{lang = Lang, body = Body, type = Type} = Packet, _, _) ->
-    {_AccessRoute, _AccessCreate, AccessAdmin, _AccessPersistent, _AccessMam} = Access,
-    if Type == error ->
-           ok;
-       true ->
-           case acl:match_rule(ServerHost, AccessAdmin, From) of
-               allow ->
-                   Msg = xmpp:get_text(Body),
-                   broadcast_service_message(ServerHost, Host, Msg);
-               deny ->
-                   ErrText = ?T("Only service administrators are allowed "
-                                "to send service messages"),
-                   Err = xmpp:err_forbidden(ErrText, Lang),
-                   ejabberd_router:route_error(Packet, Err)
-           end
-    end;
-do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper,
-         _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts, _) ->
-    Err = xmpp:err_service_unavailable(),
-    ejabberd_router:route_error(Packet, Err);
-do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
-         From, To, Packet, DefRoomOpts, QueueType) ->
-    {Room, _, Nick} = jid:tolower(To),
+-spec unregister_iq_handlers([binary()], pos_integer()) -> ok.
+unregister_iq_handlers(Hosts, 1) ->
+    %% Only unregister handlers on first worker
+    lists:foreach(
+      fun(Host) ->
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER),
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD),
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB),
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE),
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
+             gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS)
+      end, Hosts);
+unregister_iq_handlers(_, _) ->
+    ok.
+
+-spec register_routes(binary(), [binary()], pos_integer()) -> ok.
+register_routes(ServerHost, Hosts, 1) ->
+    %% Only register routes on first worker
+    lists:foreach(
+      fun(Host) ->
+             ejabberd_router:register_route(
+               Host, ServerHost, {apply, ?MODULE, route})
+      end, Hosts);
+register_routes(_, _, _) ->
+    ok.
+
+-spec unregister_routes([binary()], pos_integer()) -> ok.
+unregister_routes(Hosts, 1) ->
+    %% Only unregister routes on first worker
+    lists:foreach(
+      fun(Host) ->
+             ejabberd_router:unregister_route(Host)
+      end, Hosts);
+unregister_routes(_, _) ->
+    ok.
+
+-spec route_to_room(stanza(), binary()) -> ok.
+route_to_room(Packet, ServerHost) ->
+    From = xmpp:get_from(Packet),
+    To = xmpp:get_to(Packet),
+    {Room, Host, Nick} = jid:tolower(To),
     RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
     case RMod:find_online_room(ServerHost, Room, Host) of
        error ->
            case is_create_request(Packet) of
                true ->
-                   case check_create_room(
-                          ServerHost, Host, Room, From, Access) of
+                   case check_create_room(ServerHost, Host, Room, From) of
                        true ->
-                           {ok, Pid} = start_new_room(
-                                         Host, ServerHost, Access,
-                                         Room, HistorySize,
-                                         RoomShaper, From, Nick, DefRoomOpts,
-                                         QueueType),
-                           RMod:register_online_room(ServerHost, Room, Host, Pid),
-                           mod_muc_room:route(Pid, Packet),
-                           ok;
+                           case start_new_room(Host, ServerHost, Room, From, Nick) of
+                               {ok, Pid} ->
+                                   mod_muc_room:route(Pid, Packet);
+                               _Err ->
+                                   Err = xmpp:err_internal_server_error(),
+                                   ejabberd_router:route_error(Packet, Err)
+                           end;
                        false ->
                            Lang = xmpp:get_lang(Packet),
                            ErrText = ?T("Room creation is denied by service policy"),
@@ -467,9 +531,7 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
                    ejabberd_router:route_error(Packet, Err)
            end;
        {ok, Pid} ->
-           ?DEBUG("MUC: send to process ~p~n", [Pid]),
-           mod_muc_room:route(Pid, Packet),
-           ok
+           mod_muc_room:route(Pid, Packet)
     end.
 
 -spec process_vcard(iq()) -> iq().
@@ -612,11 +674,9 @@ is_create_request(#iq{type = T} = IQ) when T == get; T == set ->
 is_create_request(_) ->
     false.
 
--spec check_create_room(binary(), binary(), binary(), jid(), tuple())
-      -> boolean().
-check_create_room(ServerHost, Host, Room, From, Access) ->
-    {_AccessRoute, AccessCreate, AccessAdmin,
-     _AccessPersistent, _AccessMam} = Access,
+-spec check_create_room(binary(), binary(), binary(), jid()) -> boolean().
+check_create_room(ServerHost, Host, Room, From) ->
+    AccessCreate = mod_muc_opt:access_create(ServerHost),
     case acl:match_rule(ServerHost, AccessCreate, From) of
        allow ->
            case mod_muc_opt:max_room_id(ServerHost) of
@@ -624,8 +684,8 @@ check_create_room(ServerHost, Host, Room, From, Access) ->
                    Regexp = mod_muc_opt:regexp_room_id(ServerHost),
                    case re:run(Room, Regexp, [unicode, {capture, none}]) of
                        match ->
-                           case acl:match_rule(
-                                  ServerHost, AccessAdmin, From) of
+                           AccessAdmin = mod_muc_opt:access_admin(ServerHost),
+                           case acl:match_rule(ServerHost, AccessAdmin, From) of
                                allow ->
                                    true;
                                _ ->
@@ -643,37 +703,56 @@ check_create_room(ServerHost, Host, Room, From, Access) ->
            false
     end.
 
+-spec get_access(binary() | gen_mod:opts()) -> access().
+get_access(ServerHost) ->
+    Access = mod_muc_opt:access(ServerHost),
+    AccessCreate = mod_muc_opt:access_create(ServerHost),
+    AccessAdmin = mod_muc_opt:access_admin(ServerHost),
+    AccessPersistent = mod_muc_opt:access_persistent(ServerHost),
+    AccessMam = mod_muc_opt:access_mam(ServerHost),
+    {Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam}.
+
+-spec get_rooms(binary(), binary()) -> [#muc_room{}].
 get_rooms(ServerHost, Host) ->
-    LServer = jid:nameprep(ServerHost),
-    Mod = gen_mod:db_mod(LServer, ?MODULE),
-    Mod:get_rooms(LServer, Host).
+    Mod = gen_mod:db_mod(ServerHost, ?MODULE),
+    Mod:get_rooms(ServerHost, Host).
 
-load_permanent_rooms(Host, ServerHost, Access,
-                    HistorySize, RoomShaper, QueueType) ->
-    RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
-    lists:foreach(
-       fun(R) ->
-           {Room, Host} = R#muc_room.name_host,
-           case proplists:get_bool(persistent, R#muc_room.opts) of
-               true ->
-                   case RMod:find_online_room(ServerHost, Room, Host) of
-                       error ->
-                           {ok, Pid} = mod_muc_room:start(Host,
-                                                          ServerHost, Access, Room,
-                                                          HistorySize, RoomShaper,
-                                                          R#muc_room.opts, QueueType),
-                           RMod:register_online_room(ServerHost, Room, Host, Pid);
-                       {ok, _} ->
-                           ok
-                   end;
-               _ ->
-                   forget_room(ServerHost, Host, Room)
-           end
-       end, get_rooms(ServerHost, Host)).
+-spec load_permanent_rooms([binary()], binary(), gen_mod:opts()) -> ok.
+load_permanent_rooms(Hosts, ServerHost, Opts) ->
+    case mod_muc_opt:preload_rooms(Opts) of
+       true ->
+           Access = get_access(Opts),
+           HistorySize = mod_muc_opt:history_size(Opts),
+           QueueType = mod_muc_opt:queue_type(Opts),
+           RoomShaper = mod_muc_opt:room_shaper(Opts),
+           RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
+           lists:foreach(
+             fun(Host) ->
+                     ?DEBUG("Loading rooms at ~s", [Host]),
+                     lists:foreach(
+                       fun(R) ->
+                               {Room, _} = R#muc_room.name_host,
+                               case proplists:get_bool(persistent, R#muc_room.opts) of
+                                   true ->
+                                       case RMod:find_online_room(ServerHost, Room, Host) of
+                                           error ->
+                                               start_room(RMod, Host, ServerHost, Access,
+                                                          Room, HistorySize, RoomShaper,
+                                                          R#muc_room.opts, QueueType);
+                                           {ok, _} ->
+                                               ok
+                                       end;
+                                   _ ->
+                                       forget_room(ServerHost, Host, Room)
+                               end
+                       end, get_rooms(ServerHost, Host))
+             end, Hosts);
+       false ->
+           ok
+    end.
 
-start_new_room(Host, ServerHost, Access, Room,
-           HistorySize, RoomShaper, From,
-           Nick, DefRoomOpts, QueueType) ->
+start_new_room(Host, ServerHost, Room, From, Nick) ->
+    RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
     Opts = case restore_room(ServerHost, Host, Room) of
               error ->
                   error;
@@ -687,14 +766,52 @@ start_new_room(Host, ServerHost, Access, Room,
           end,
     case Opts of
        error ->
-           ?DEBUG("MUC: open new room '~s'~n", [Room]),
-           mod_muc_room:start(Host, ServerHost, Access, Room,
-               HistorySize, RoomShaper,
-               From, Nick, DefRoomOpts, QueueType);
+           ?DEBUG("Open new room: ~s", [Room]),
+           DefRoomOpts = mod_muc_opt:default_room_options(ServerHost),
+           start_room(RMod, Host, ServerHost, Room, DefRoomOpts, From, Nick);
        _ ->
-           ?DEBUG("MUC: restore room '~s'~n", [Room]),
-           mod_muc_room:start(Host, ServerHost, Access, Room,
-               HistorySize, RoomShaper, Opts, QueueType)
+           ?DEBUG("Restore room: ~s", [Room]),
+           start_room(RMod, Host, ServerHost, Room, Opts)
+    end.
+
+start_room(Mod, Host, ServerHost, Room, DefOpts) ->
+    Access = get_access(ServerHost),
+    HistorySize = mod_muc_opt:history_size(ServerHost),
+    QueueType = mod_muc_opt:queue_type(ServerHost),
+    RoomShaper = mod_muc_opt:room_shaper(ServerHost),
+    start_room(Mod, Host, ServerHost, Access, Room, HistorySize,
+              RoomShaper, DefOpts, QueueType).
+
+start_room(Mod, Host, ServerHost, Room, DefOpts, Creator, Nick) ->
+    Access = get_access(ServerHost),
+    HistorySize = mod_muc_opt:history_size(ServerHost),
+    QueueType = mod_muc_opt:queue_type(ServerHost),
+    RoomShaper = mod_muc_opt:room_shaper(ServerHost),
+    start_room(Mod, Host, ServerHost, Access, Room,
+              HistorySize, RoomShaper,
+              Creator, Nick, DefOpts, QueueType).
+
+start_room(Mod, Host, ServerHost, Access, Room,
+          HistorySize, RoomShaper, DefOpts, QueueType) ->
+    case mod_muc_room:start(Host, ServerHost, Access, Room,
+                           HistorySize, RoomShaper, DefOpts, QueueType) of
+       {ok, Pid} ->
+           Mod:register_online_room(ServerHost, Room, Host, Pid),
+           {ok, Pid};
+       Err ->
+           Err
+    end.
+
+start_room(Mod, Host, ServerHost, Access, Room, HistorySize,
+          RoomShaper, Creator, Nick, DefOpts, QueueType) ->
+    case mod_muc_room:start(Host, ServerHost, Access, Room,
+                           HistorySize, RoomShaper,
+                           Creator, Nick, DefOpts, QueueType) of
+       {ok, Pid} ->
+           Mod:register_online_room(ServerHost, Room, Host, Pid),
+           {ok, Pid};
+       Err ->
+           Err
     end.
 
 -spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(),
@@ -964,7 +1081,7 @@ mod_opt_type(max_room_id) ->
 mod_opt_type(max_rooms_discoitems) ->
     econf:non_neg_int();
 mod_opt_type(regexp_room_id) ->
-    econf:binary();
+    econf:re();
 mod_opt_type(max_room_name) ->
     econf:pos_int(infinity);
 mod_opt_type(max_user_conferences) ->
@@ -979,6 +1096,8 @@ mod_opt_type(min_message_interval) ->
     econf:number(0);
 mod_opt_type(min_presence_interval) ->
     econf:number(0);
+mod_opt_type(preload_rooms) ->
+    econf:bool();
 mod_opt_type(room_shaper) ->
     econf:atom();
 mod_opt_type(user_message_shaper) ->
@@ -1053,6 +1172,7 @@ mod_options(Host) ->
      {room_shaper, none},
      {user_message_shaper, none},
      {user_presence_shaper, none},
+     {preload_rooms, true},
      {default_room_options,
       [{allow_change_subj,true},
        {allow_private_messages,true},
index 67c42e98ffe1955c3b8ab3564d9132b688c91fbb..df6d5e784bc2b895e332e6c3a5200fae6e37ef08 100644 (file)
@@ -25,6 +25,7 @@
 -export([min_message_interval/1]).
 -export([min_presence_interval/1]).
 -export([name/1]).
+-export([preload_rooms/1]).
 -export([queue_type/1]).
 -export([ram_db_type/1]).
 -export([regexp_room_id/1]).
@@ -164,6 +165,12 @@ name(Opts) when is_map(Opts) ->
 name(Host) ->
     gen_mod:get_module_opt(Host, mod_muc, name).
 
+-spec preload_rooms(gen_mod:opts() | global | binary()) -> boolean().
+preload_rooms(Opts) when is_map(Opts) ->
+    gen_mod:get_opt(preload_rooms, Opts);
+preload_rooms(Host) ->
+    gen_mod:get_module_opt(Host, mod_muc, preload_rooms).
+
 -spec queue_type(gen_mod:opts() | global | binary()) -> 'file' | 'ram'.
 queue_type(Opts) when is_map(Opts) ->
     gen_mod:get_opt(queue_type, Opts);
@@ -176,7 +183,7 @@ ram_db_type(Opts) when is_map(Opts) ->
 ram_db_type(Host) ->
     gen_mod:get_module_opt(Host, mod_muc, ram_db_type).
 
--spec regexp_room_id(gen_mod:opts() | global | binary()) -> binary().
+-spec regexp_room_id(gen_mod:opts() | global | binary()) -> <<>> | re:mp().
 regexp_room_id(Opts) when is_map(Opts) ->
     gen_mod:get_opt(regexp_room_id, Opts);
 regexp_room_id(Host) ->
index bbbd3a2ec963a22df8e5bcff7abf0f446ae5fea1..b0d9da2763eb24aecd07d0365ae2188092f1053d 100644 (file)
 %%%----------------------------------------------------------------------
 %%% API
 %%%----------------------------------------------------------------------
+-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
+           atom(), jid(), binary(), [{atom(), term()}], ram | file) ->
+                  {ok, pid()} | {error, any()}.
 start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
       Creator, Nick, DefRoomOpts, QueueType) ->
     p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
                            RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
                    ?FSMOPTS).
 
+-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
+           atom(), [{atom(), term()}], ram | file) ->
+                  {ok, pid()} | {error, any()}.
 start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
     p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
                            RoomShaper, Opts, QueueType],
                    ?FSMOPTS).
 
+-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
+                atom(), jid(), binary(), [{atom(), term()}], ram | file) ->
+                       {ok, pid()} | {error, any()}.
 start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
           Creator, Nick, DefRoomOpts, QueueType) ->
     p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
                                 RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
                       ?FSMOPTS).
 
+-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
+                atom(), [{atom(), term()}], ram | file) ->
+                       {ok, pid()} | {error, any()}.
 start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
     p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
                                 RoomShaper, Opts, QueueType],
@@ -756,6 +768,7 @@ terminate(Reason, _StateName,
 %%%----------------------------------------------------------------------
 -spec route(pid(), stanza()) -> ok.
 route(Pid, Packet) ->
+    ?DEBUG("Routing to MUC room ~p:~n~s", [Pid, xmpp:pp(Packet)]),
     #jid{lresource = Nick} = xmpp:get_to(Packet),
     p1_fsm:send_event(Pid, {route, Nick, Packet}).