-record(muc_online_room, {name_host, pid}).
-record(muc_registered, {us_host, nick}).
--record(state, {host, server_host, access, history_size, default_room_opts}).
+-record(state, {host,
+ server_host,
+ access,
+ history_size,
+ default_room_opts,
+ room_shaper}).
-define(PROCNAME, ejabberd_mod_muc).
AccessPersistent = gen_mod:get_opt(access_persistent, Opts, all),
HistorySize = gen_mod:get_opt(history_size, Opts, 20),
DefRoomOpts = gen_mod:get_opt(default_room_options, Opts, []),
+ RoomShaper = gen_mod:get_opt(room_shaper, Opts, none),
ejabberd_router:register_route(MyHost),
- load_permanent_rooms(MyHost, Host, {Access, AccessCreate, AccessAdmin, AccessPersistent},
- HistorySize),
+ load_permanent_rooms(MyHost, Host,
+ {Access, AccessCreate, AccessAdmin, AccessPersistent},
+ HistorySize,
+ RoomShaper),
{ok, #state{host = MyHost,
server_host = Host,
access = {Access, AccessCreate, AccessAdmin, AccessPersistent},
default_room_opts = DefRoomOpts,
- history_size = HistorySize}}.
+ history_size = HistorySize,
+ room_shaper = RoomShaper}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
server_host = ServerHost,
access = Access,
default_room_opts = DefRoomOpts,
- history_size = HistorySize} = State) ->
- case catch do_route(Host, ServerHost, Access, HistorySize, From, To, Packet, DefRoomOpts) of
+ history_size = HistorySize,
+ room_shaper = RoomShaper} = State) ->
+ case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
+ From, To, Packet, DefRoomOpts) of
{'EXIT', Reason} ->
?ERROR_MSG("~p", [Reason]);
_ ->
supervisor:terminate_child(ejabberd_sup, Proc),
supervisor:delete_child(ejabberd_sup, Proc).
-do_route(Host, ServerHost, Access, HistorySize, From, To, Packet, DefRoomOpts) ->
+do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
+ From, To, Packet, DefRoomOpts) ->
{AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent} = Access,
case acl:match_rule(ServerHost, AccessRoute, From) of
allow ->
- do_route1(Host, ServerHost, Access, HistorySize, From, To, Packet, DefRoomOpts);
+ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
+ From, To, Packet, DefRoomOpts);
_ ->
{xmlelement, _Name, Attrs, _Els} = Packet,
Lang = xml:get_attr_s("xml:lang", Attrs),
end.
-do_route1(Host, ServerHost, Access, HistorySize, From, To, Packet, DefRoomOpts) ->
+do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
+ From, To, Packet, DefRoomOpts) ->
{_AccessRoute, AccessCreate, AccessAdmin, _AccessPersistent} = Access,
{Room, _, Nick} = jlib:jid_tolower(To),
{xmlelement, Name, Attrs, _Els} = Packet,
?DEBUG("MUC: open new room '~s'~n", [Room]),
{ok, Pid} = mod_muc_room:start(
Host, ServerHost, Access,
- Room, HistorySize, From,
+ Room, HistorySize,
+ RoomShaper, From,
Nick, DefRoomOpts),
register_room(Host, Room, Pid),
mod_muc_room:route(Pid, From, Nick, Packet),
-load_permanent_rooms(Host, ServerHost, Access, HistorySize) ->
+load_permanent_rooms(Host, ServerHost, Access, HistorySize, RoomShaper) ->
case catch mnesia:dirty_select(
muc_room, [{#muc_room{name_host = {'_', Host}, _ = '_'},
[],
Access,
Room,
HistorySize,
+ RoomShaper,
R#muc_room.opts),
register_room(Host, Room, Pid);
_ ->
%% External exports
--export([start_link/8,
- start_link/6,
- start/8,
- start/6,
+-export([start_link/9,
+ start_link/7,
+ start/9,
+ start/7,
route/4]).
%% gen_fsm callbacks
-record(activity, {message_time = 0,
presence_time = 0,
+ message_shaper,
+ presence_shaper,
+ message,
presence}).
-record(state, {room,
subject = "",
subject_author = "",
just_created = false,
- activity = ?DICT:new()}).
+ activity = ?DICT:new(),
+ room_shaper,
+ room_queue = queue:new()}).
%-define(DBGFSM, true).
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
-start(Host, ServerHost, Access, Room, HistorySize, Creator, Nick, DefRoomOpts) ->
+start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
+ Creator, Nick, DefRoomOpts) ->
Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup),
supervisor:start_child(
- Supervisor, [Host, ServerHost, Access, Room, HistorySize, Creator, Nick, DefRoomOpts]).
+ Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper,
+ Creator, Nick, DefRoomOpts]).
-start(Host, ServerHost, Access, Room, HistorySize, Opts) ->
+start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup),
supervisor:start_child(
- Supervisor, [Host, ServerHost, Access, Room, HistorySize, Opts]).
+ Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper,
+ Opts]).
-start_link(Host, ServerHost, Access, Room, HistorySize, Creator, Nick, DefRoomOpts) ->
- gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, Creator, Nick, DefRoomOpts],
+start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
+ Creator, Nick, DefRoomOpts) ->
+ gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
+ RoomShaper, Creator, Nick, DefRoomOpts],
?FSMOPTS).
-start_link(Host, ServerHost, Access, Room, HistorySize, Opts) ->
- gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, Opts],
+start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
+ gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
+ RoomShaper, Opts],
?FSMOPTS).
%%%----------------------------------------------------------------------
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
-init([Host, ServerHost, Access, Room, HistorySize, Creator, _Nick, DefRoomOpts]) ->
+init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, _Nick, DefRoomOpts]) ->
+ Shaper = shaper:new(RoomShaper),
State = set_affiliation(Creator, owner,
#state{host = Host,
server_host = ServerHost,
room = Room,
history = lqueue_new(HistorySize),
jid = jlib:make_jid(Room, Host, ""),
- just_created = true}),
+ just_created = true,
+ room_shaper = Shaper}),
State1 = set_opts(DefRoomOpts, State),
{ok, normal_state, State1};
-init([Host, ServerHost, Access, Room, HistorySize, Opts]) ->
+init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) ->
+ Shaper = shaper:new(RoomShaper),
State = set_opts(Opts, #state{host = Host,
server_host = ServerHost,
access = Access,
room = Room,
history = lqueue_new(HistorySize),
- jid = jlib:make_jid(Room, Host, "")}),
+ jid = jlib:make_jid(Room, Host, ""),
+ room_shaper = Shaper}),
{ok, normal_state, State}.
%%----------------------------------------------------------------------
true ->
case xml:get_attr_s("type", Attrs) of
"groupchat" ->
- Activity = case ?DICT:find(jlib:jid_tolower(From),
- StateData#state.activity) of
- {ok, A} -> A;
- error -> #activity{}
- end,
+ Activity = get_user_activity(From, StateData),
Now = now_to_usec(now()),
MinMessageInterval =
trunc(gen_mod:get_module_opt(
StateData#state.server_host,
mod_muc, min_message_interval, 0) * 1000000),
+ Size = lists:flatlength(xml:element_to_string(Packet)),
+ {MessageShaper, MessageShaperInterval} =
+ shaper:update(Activity#activity.message_shaper, Size),
if
- Now >= Activity#activity.message_time + MinMessageInterval ->
- NewActivity = Activity#activity{message_time = Now},
+ Activity#activity.message /= undefined ->
+ ErrText = "Traffic rate limit is exceeded",
+ Err = jlib:make_error_reply(
+ Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)),
+ ejabberd_router:route(
+ StateData#state.jid,
+ From, Err),
+ {next_state, normal_state, StateData};
+ Now >= Activity#activity.message_time + MinMessageInterval,
+ MessageShaperInterval == 0 ->
+ {RoomShaper, RoomShaperInterval} =
+ shaper:update(StateData#state.room_shaper, Size),
+ RoomQueueEmpty = queue:is_empty(
+ StateData#state.room_queue),
+ if
+ RoomShaperInterval == 0,
+ RoomQueueEmpty ->
+ NewActivity = Activity#activity{
+ message_time = Now,
+ message_shaper = MessageShaper},
+ StateData1 =
+ StateData#state{
+ activity = ?DICT:store(
+ jlib:jid_tolower(From),
+ NewActivity,
+ StateData#state.activity),
+ room_shaper = RoomShaper},
+ process_groupchat_message(From, Packet, StateData1);
+ true ->
+ StateData1 =
+ if
+ RoomQueueEmpty ->
+ erlang:send_after(
+ RoomShaperInterval, self(),
+ process_room_queue),
+ StateData#state{
+ room_shaper = RoomShaper};
+ true ->
+ StateData
+ end,
+ NewActivity = Activity#activity{
+ message_time = Now,
+ message_shaper = MessageShaper,
+ message = Packet},
+ RoomQueue = queue:in(
+ {message, From},
+ StateData#state.room_queue),
+ StateData2 =
+ StateData1#state{
+ activity = ?DICT:store(
+ jlib:jid_tolower(From),
+ NewActivity,
+ StateData#state.activity),
+ room_queue = RoomQueue},
+ {next_state, normal_state, StateData2}
+ end;
+ true ->
+ MessageInterval =
+ (Activity#activity.message_time +
+ MinMessageInterval - Now) div 1000,
+ Interval = lists:max([MessageInterval,
+ MessageShaperInterval]),
+ erlang:send_after(
+ Interval, self(), {process_user_message, From}),
+ NewActivity = Activity#activity{
+ message = Packet,
+ message_shaper = MessageShaper},
StateData1 =
StateData#state{
activity = ?DICT:store(
jlib:jid_tolower(From),
NewActivity,
StateData#state.activity)},
- process_groupchat_message(From, Packet, StateData1);
- true ->
- ErrText = "Message frequency is too high",
- Err = jlib:make_error_reply(
- Packet, ?ERRT_NOT_ACCEPTABLE(Lang, ErrText)),
- ejabberd_router:route(
- StateData#state.jid,
- From, Err),
- {next_state, normal_state, StateData}
+ {next_state, normal_state, StateData1}
end;
"error" ->
case is_user_online(From, StateData) of
normal_state({route, From, Nick,
{xmlelement, "presence", _Attrs, _Els} = Packet},
StateData) ->
- Activity = case ?DICT:find(jlib:jid_tolower(From),
- StateData#state.activity) of
- {ok, A} -> A;
- error -> #activity{}
- end,
+ Activity = get_user_activity(From, StateData),
Now = now_to_usec(now()),
MinPresenceInterval =
trunc(gen_mod:get_module_opt(
Interval = (Activity#activity.presence_time +
MinPresenceInterval - Now) div 1000,
erlang:send_after(
- Interval, self(), {process_presence, From});
+ Interval, self(), {process_user_presence, From});
true ->
ok
end,
_ ->
translate:translate(Lang, "private, ")
end,
- Len = length(?DICT:to_list(StateData#state.users)),
+ Len = ?DICT:fold(fun(_, _, Acc) -> Acc + 1 end, 0,
+ StateData#state.users),
" (" ++ Desc ++ integer_to_list(Len) ++ ")";
_ ->
""
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
-handle_info({process_presence, From}, normal_state = _StateName, StateData) ->
- Activity = case ?DICT:find(jlib:jid_tolower(From),
- StateData#state.activity) of
- {ok, A} -> A;
- error -> #activity{}
- end,
+handle_info({process_user_presence, From}, normal_state = _StateName, StateData) ->
+ Activity = get_user_activity(From, StateData),
Now = now_to_usec(now()),
{Nick, Packet} = Activity#activity.presence,
NewActivity = Activity#activity{presence_time = Now,
NewActivity,
StateData#state.activity)},
process_presence(From, Nick, Packet, StateData1);
+handle_info({process_user_message, From}, normal_state = _StateName, StateData) ->
+ Activity = get_user_activity(From, StateData),
+ Now = now_to_usec(now()),
+ Packet = Activity#activity.message,
+ NewActivity = Activity#activity{message_time = Now,
+ message = undefined},
+ StateData1 =
+ StateData#state{
+ activity = ?DICT:store(
+ jlib:jid_tolower(From),
+ NewActivity,
+ StateData#state.activity)},
+ process_groupchat_message(From, Packet, StateData1);
+handle_info(process_room_queue, normal_state = StateName, StateData) ->
+ case queue:out(StateData#state.room_queue) of
+ {{value, {message, From}}, RoomQueue} ->
+ Activity = get_user_activity(From, StateData),
+ Packet = Activity#activity.message,
+ NewActivity = Activity#activity{message = undefined},
+ StateData1 =
+ StateData#state{
+ activity = ?DICT:store(
+ jlib:jid_tolower(From),
+ NewActivity,
+ StateData#state.activity),
+ room_queue = RoomQueue},
+ StateData2 = prepare_room_queue(StateData1),
+ process_groupchat_message(From, Packet, StateData2);
+ {{value, {presence, From}}, RoomQueue} ->
+ Activity = get_user_activity(From, StateData),
+ {Nick, Packet} = Activity#activity.presence,
+ NewActivity = Activity#activity{presence = undefined},
+ StateData1 =
+ StateData#state{
+ activity = ?DICT:store(
+ jlib:jid_tolower(From),
+ NewActivity,
+ StateData#state.activity),
+ room_queue = RoomQueue},
+ StateData2 = prepare_room_queue(StateData1),
+ process_presence(From, Nick, Packet, StateData2);
+ {empty, _} ->
+ {next_state, StateName, StateData}
+ end;
handle_info(_Info, StateName, StateData) ->
{next_state, StateName, StateData}.
process_groupchat_message(From, {xmlelement, "message", Attrs, _Els} = Packet,
StateData) ->
Lang = xml:get_attr_s("xml:lang", Attrs),
- {ok, #user{nick = FromNick, role = Role}} =
- ?DICT:find(jlib:jid_tolower(From),
- StateData#state.users),
- if
- (Role == moderator) or (Role == participant) ->
- {NewStateData1, IsAllowed} =
- case check_subject(Packet) of
- false ->
- {StateData, true};
- Subject ->
- case can_change_subject(Role,
- StateData) of
- true ->
- NSD =
- StateData#state{
- subject = Subject,
- subject_author =
- FromNick},
- case (NSD#state.config)#config.persistent of
+ case is_user_online(From, StateData) of
+ true ->
+ {ok, #user{nick = FromNick, role = Role}} =
+ ?DICT:find(jlib:jid_tolower(From),
+ StateData#state.users),
+ if
+ (Role == moderator) or (Role == participant) ->
+ {NewStateData1, IsAllowed} =
+ case check_subject(Packet) of
+ false ->
+ {StateData, true};
+ Subject ->
+ case can_change_subject(Role,
+ StateData) of
+ true ->
+ NSD =
+ StateData#state{
+ subject = Subject,
+ subject_author =
+ FromNick},
+ case (NSD#state.config)#config.persistent of
+ true ->
+ mod_muc:store_room(
+ NSD#state.host,
+ NSD#state.room,
+ make_opts(NSD));
+ _ ->
+ ok
+ end,
+ {NSD, true};
+ _ ->
+ {StateData, false}
+ end
+ end,
+ case IsAllowed of
+ true ->
+ lists:foreach(
+ fun({_LJID, Info}) ->
+ ejabberd_router:route(
+ jlib:jid_replace_resource(
+ StateData#state.jid,
+ FromNick),
+ Info#user.jid,
+ Packet)
+ end,
+ ?DICT:to_list(StateData#state.users)),
+ NewStateData2 =
+ add_message_to_history(FromNick,
+ Packet,
+ NewStateData1),
+ {next_state, normal_state, NewStateData2};
+ _ ->
+ Err =
+ case (StateData#state.config)#config.allow_change_subj of
true ->
- mod_muc:store_room(
- NSD#state.host,
- NSD#state.room,
- make_opts(NSD));
+ ?ERRT_FORBIDDEN(
+ Lang,
+ "Only moderators and participants "
+ "are allowed to change subject in this room");
_ ->
- ok
+ ?ERRT_FORBIDDEN(
+ Lang,
+ "Only moderators "
+ "are allowed to change subject in this room")
end,
- {NSD, true};
- _ ->
- {StateData, false}
- end
- end,
- case IsAllowed of
+ ejabberd_router:route(
+ StateData#state.jid,
+ From,
+ jlib:make_error_reply(Packet, Err)),
+ {next_state, normal_state, StateData}
+ end;
true ->
- lists:foreach(
- fun({_LJID, Info}) ->
- ejabberd_router:route(
- jlib:jid_replace_resource(
- StateData#state.jid,
- FromNick),
- Info#user.jid,
- Packet)
- end,
- ?DICT:to_list(StateData#state.users)),
- NewStateData2 =
- add_message_to_history(FromNick,
- Packet,
- NewStateData1),
- {next_state, normal_state, NewStateData2};
- _ ->
- Err =
- case (StateData#state.config)#config.allow_change_subj of
- true ->
- ?ERRT_FORBIDDEN(
- Lang,
- "Only moderators and participants "
- "are allowed to change subject in this room");
- _ ->
- ?ERRT_FORBIDDEN(
- Lang,
- "Only moderators "
- "are allowed to change subject in this room")
- end,
+ ErrText = "Visitors are not allowed to send messages to all occupants",
+ Err = jlib:make_error_reply(
+ Packet, ?ERRT_FORBIDDEN(Lang, ErrText)),
ejabberd_router:route(
StateData#state.jid,
- From,
- jlib:make_error_reply(Packet, Err)),
+ From, Err),
{next_state, normal_state, StateData}
end;
- true ->
- ErrText = "Visitors are not allowed to send messages to all occupants",
+ false ->
+ ErrText = "Only occupants are allowed to send messages to the conference",
Err = jlib:make_error_reply(
- Packet, ?ERRT_FORBIDDEN(Lang, ErrText)),
- ejabberd_router:route(
- StateData#state.jid,
- From, Err),
+ Packet, ?ERRT_NOT_ACCEPTABLE(Lang, ErrText)),
+ ejabberd_router:route(StateData#state.jid, From, Err),
{next_state, normal_state, StateData}
end.
gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, max_users_admin_threshold, 5).
+get_user_activity(JID, StateData) ->
+ case ?DICT:find(jlib:jid_tolower(JID),
+ StateData#state.activity) of
+ {ok, A} -> A;
+ error ->
+ MessageShaper =
+ shaper:new(gen_mod:get_module_opt(
+ StateData#state.server_host,
+ mod_muc, user_message_shaper, none)),
+ PresenceShaper =
+ shaper:new(gen_mod:get_module_opt(
+ StateData#state.server_host,
+ mod_muc, user_presence_shaper, none)),
+ #activity{message_shaper = MessageShaper,
+ presence_shaper = PresenceShaper}
+ end.
+
+prepare_room_queue(StateData) ->
+ case queue:out(StateData#state.room_queue) of
+ {{value, {message, From}}, _RoomQueue} ->
+ Activity = get_user_activity(From, StateData),
+ Packet = Activity#activity.message,
+ Size = lists:flatlength(xml:element_to_string(Packet)),
+ {RoomShaper, RoomShaperInterval} =
+ shaper:update(StateData#state.room_shaper, Size),
+ erlang:send_after(
+ RoomShaperInterval, self(),
+ process_room_queue),
+ StateData#state{
+ room_shaper = RoomShaper};
+ {{value, {presence, From}}, _RoomQueue} ->
+ Activity = get_user_activity(From, StateData),
+ {_Nick, Packet} = Activity#activity.presence,
+ Size = lists:flatlength(xml:element_to_string(Packet)),
+ {RoomShaper, RoomShaperInterval} =
+ shaper:update(StateData#state.room_shaper, Size),
+ erlang:send_after(
+ RoomShaperInterval, self(),
+ process_room_queue),
+ StateData#state{
+ room_shaper = RoomShaper};
+ {empty, _} ->
+ StateData
+ end.
+
add_online_user(JID, Nick, Role, StateData) ->
LJID = jlib:jid_tolower(JID),