From 50b645aa92cd356df2c42ac6a903304c49df1e53 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Thu, 5 Jul 2018 09:31:55 +0300 Subject: [PATCH] Move shaper to p1_utils repo --- rebar.config | 2 +- src/ejabberd_bosh.erl | 10 +++---- src/ejabberd_config.erl | 2 +- src/{shaper.erl => ejabberd_shaper.erl} | 37 ++++++------------------- src/ejabberd_stun.erl | 2 +- src/ejabberd_sup.erl | 2 +- src/mod_muc.erl | 2 +- src/mod_muc_room.erl | 22 +++++++-------- src/mod_proxy65_stream.erl | 8 +++--- src/xmpp_socket.erl | 6 ++-- src/xmpp_stream_in.erl | 2 +- src/xmpp_stream_out.erl | 2 +- 12 files changed, 39 insertions(+), 58 deletions(-) rename src/{shaper.erl => ejabberd_shaper.erl} (82%) diff --git a/rebar.config b/rebar.config index 0d5e04031..1252ae641 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,7 @@ {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", {tag, {if_version_above, "17", "3.4.2", "3.2.1"}}}}, - {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.12"}}}, + {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "b49f804"}}, {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.14"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.23"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}}, diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 7423211ba..33dcf1614 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -88,7 +88,7 @@ sid = <<"">> :: binary(), el_ibuf :: p1_queue:queue(), el_obuf :: p1_queue:queue(), - shaper_state = none :: shaper:shaper(), + shaper_state = none :: ejabberd_shaper:shaper(), c2s_pid :: pid() | undefined, xmpp_ver = <<"">> :: binary(), inactivity_timer :: reference() | undefined, @@ -281,7 +281,7 @@ init([#body{attrs = Attrs}, IP, SID]) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts2 = [{xml_socket, true} | Opts1], Shaper = none, - ShaperState = shaper:new(Shaper), + ShaperState = ejabberd_shaper:new(Shaper), Socket = make_socket(self(), IP), XMPPVer = get_attr('xmpp:version', Attrs), XMPPDomain = get_attr(to, Attrs), @@ -355,7 +355,7 @@ wait_for_session(#body{attrs = Attrs} = Req, From, {'xmlns:stream', ?NS_STREAM}, {from, State#state.host} | Polling]}, {ShaperState, _} = - shaper:update(State#state.shaper_state, Req#body.size), + ejabberd_shaper:update(State#state.shaper_state, Req#body.size), State1 = State#state{wait_timeout = Wait, prev_rid = RID, prev_key = NewKey, prev_poll = PollTime, shaper_state = ShaperState, @@ -393,7 +393,7 @@ active(#body{attrs = Attrs, size = Size} = Req, From, "~p~n** State: ~p", [Req, From, State]), {ShaperState, Pause} = - shaper:update(State#state.shaper_state, Size), + ejabberd_shaper:update(State#state.shaper_state, Size), State1 = State#state{shaper_state = ShaperState}, if Pause > 0 -> TRef = start_shaper_timer(Pause), @@ -524,7 +524,7 @@ handle_event({become_controller, C2SPid}, StateName, {next_state, StateName, State1}; handle_event({change_shaper, Shaper}, StateName, State) -> - NewShaperState = shaper:new(Shaper), + NewShaperState = ejabberd_shaper:new(Shaper), {next_state, StateName, State#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, State) -> diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index e0f3f6ade..f469fcbe9 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -1245,7 +1245,7 @@ transform_terms(Terms) -> ejabberd_s2s, ejabberd_listener, ejabberd_sql_sup, - shaper, + ejabberd_shaper, ejabberd_s2s_out, acl, ejabberd_config], diff --git a/src/shaper.erl b/src/ejabberd_shaper.erl similarity index 82% rename from src/shaper.erl rename to src/ejabberd_shaper.erl index 8adcf6056..8b6ca4125 100644 --- a/src/shaper.erl +++ b/src/ejabberd_shaper.erl @@ -1,5 +1,5 @@ %%%---------------------------------------------------------------------- -%%% File : shaper.erl +%%% File : ejabberd_shaper.erl %%% Author : Alexey Shchepin %%% Purpose : Functions to control connections traffic %%% Created : 9 Feb 2003 by Alexey Shchepin @@ -23,7 +23,7 @@ %%% %%%---------------------------------------------------------------------- --module(shaper). +-module(ejabberd_shaper). -behaviour(gen_server). -behaviour(ejabberd_config). @@ -39,19 +39,13 @@ -include("logger.hrl"). --record(maxrate, {maxrate = 0 :: integer(), - burst_size = 0 :: integer(), - acquired_credit = 0 :: integer(), - lasttime = 0 :: integer()}). - -record(shaper, {name :: {atom(), global}, maxrate :: integer(), burst_size :: integer()}). -record(state, {}). --type shaper() :: none | #maxrate{}. - +-type shaper() :: none | p1_shaper:state(). -export_type([shaper/0]). -spec start_link() -> {ok, pid()} | {error, any()}. @@ -84,7 +78,6 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. -spec load_from_config() -> ok | {error, any()}. - load_from_config() -> Shapers = ejabberd_config:get_option(shaper, []), case mnesia:transaction( @@ -105,7 +98,6 @@ load_from_config() -> end. -spec get_max_rate(atom()) -> none | non_neg_integer(). - get_max_rate(none) -> none; get_max_rate(Name) -> @@ -122,29 +114,18 @@ new(none) -> new(Name) -> case ets:lookup(shaper, {Name, global}) of [#shaper{maxrate = R, burst_size = B}] -> - - #maxrate{maxrate = R, burst_size = B, - acquired_credit = B, - lasttime = p1_time_compat:system_time(micro_seconds)}; + p1_shaper:new(R, B); [] -> none end. -spec update(shaper(), integer()) -> {shaper(), integer()}. - update(none, _Size) -> {none, 0}; -update(#maxrate{maxrate = MR, burst_size = BS, - acquired_credit = AC, lasttime = L} = State, Size) -> - Now = p1_time_compat:system_time(micro_seconds), - AC2 = min(BS, AC + (MR*(Now - L) div 1000000) - Size), - - Pause = if AC2 >= 0 -> 0; - true -> -1000*AC2 div MR - end, - ?DEBUG("MaxRate=~p, BurstSize=~p, AcquiredCredit=~p, Size=~p, NewAcquiredCredit=~p, Pause=~p", - [MR, BS, AC, Size, AC2, Pause]), - {State#maxrate{acquired_credit = AC2, lasttime = Now}, - Pause}. +update(Shaper, Size) -> + Result = p1_shaper:update(Shaper, Size), + ?DEBUG("Shaper update:~n~s =>~n~s", + [p1_shaper:pp(Shaper), p1_shaper:pp(Result)]), + Result. transform_options(Opts) -> lists:foldl(fun transform_options/2, [], Opts). diff --git a/src/ejabberd_stun.erl b/src/ejabberd_stun.erl index 8aae66221..53ecd5cc1 100644 --- a/src/ejabberd_stun.erl +++ b/src/ejabberd_stun.erl @@ -107,7 +107,7 @@ prepare_turn_opts(Opts, _UseTurn = true) -> _ -> [] end, - MaxRate = shaper:get_max_rate(Shaper), + MaxRate = ejabberd_shaper:get_max_rate(Shaper), Opts1 = Realm ++ [{auth_fun, AuthFun},{shaper, MaxRate} | lists:keydelete(shaper, 1, Opts)], set_certfile(Opts1). diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index f692575c1..73cb5b99f 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -53,7 +53,7 @@ init([]) -> simple_supervisor(ejabberd_s2s_out), simple_supervisor(ejabberd_service), worker(acl), - worker(shaper), + worker(ejabberd_shaper), supervisor(ejabberd_backend_sup), supervisor(ejabberd_rdbms), supervisor(ejabberd_riak_sup), diff --git a/src/mod_muc.erl b/src/mod_muc.erl index 7320e19cf..3be8d1688 100644 --- a/src/mod_muc.erl +++ b/src/mod_muc.erl @@ -84,7 +84,7 @@ max_rooms_discoitems = 100 :: non_neg_integer(), queue_type = ram :: ram | file, default_room_opts = [] :: list(), - room_shaper = none :: shaper:shaper()}). + room_shaper = none :: ejabberd_shaper:shaper()}). -type muc_room_opts() :: [{atom(), any()}]. -callback init(binary(), gen_mod:opts()) -> any(). diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index 0778cc157..e7351fb33 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -122,7 +122,7 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueT init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) -> process_flag(trap_exit, true), - Shaper = shaper:new(RoomShaper), + Shaper = ejabberd_shaper:new(RoomShaper), RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_affiliation(Creator, owner, #state{host = Host, server_host = ServerHost, @@ -141,7 +141,7 @@ init([Host, ServerHost, Access, Room, HistorySize, {ok, normal_state, State1}; init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) -> process_flag(trap_exit, true), - Shaper = shaper:new(RoomShaper), + Shaper = ejabberd_shaper:new(RoomShaper), RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_opts(Opts, #state{host = Host, server_host = ServerHost, @@ -169,7 +169,7 @@ normal_state({route, <<"">>, * 1000000), Size = element_size(Packet), {MessageShaper, MessageShaperInterval} = - shaper:update(Activity#activity.message_shaper, Size), + ejabberd_shaper:update(Activity#activity.message_shaper, Size), if Activity#activity.message /= undefined -> ErrText = <<"Traffic rate limit is exceeded">>, Err = xmpp:err_resource_constraint(ErrText, Lang), @@ -178,7 +178,7 @@ normal_state({route, <<"">>, Now >= Activity#activity.message_time + MinMessageInterval, MessageShaperInterval == 0 -> {RoomShaper, RoomShaperInterval} = - shaper:update(StateData#state.room_shaper, Size), + ejabberd_shaper:update(StateData#state.room_shaper, Size), RoomQueueEmpty = case StateData#state.room_queue of undefined -> true; RQ -> p1_queue:is_empty(RQ) @@ -1503,7 +1503,7 @@ get_max_users_admin_threshold(StateData) -> gen_mod:get_module_opt(StateData#state.server_host, mod_muc, max_users_admin_threshold). --spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue(). +-spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue(). room_queue_new(ServerHost, Shaper, QueueType) -> HaveRoomShaper = Shaper /= none, HaveMessageShaper = gen_mod:get_module_opt( @@ -1533,10 +1533,10 @@ get_user_activity(JID, StateData) -> {ok, _P, A} -> A; error -> MessageShaper = - shaper:new(gen_mod:get_module_opt(StateData#state.server_host, + ejabberd_shaper:new(gen_mod:get_module_opt(StateData#state.server_host, mod_muc, user_message_shaper)), PresenceShaper = - shaper:new(gen_mod:get_module_opt(StateData#state.server_host, + ejabberd_shaper:new(gen_mod:get_module_opt(StateData#state.server_host, mod_muc, user_presence_shaper)), #activity{message_shaper = MessageShaper, presence_shaper = PresenceShaper} @@ -1575,10 +1575,10 @@ store_user_activity(JID, UserActivity, StateData) -> of true -> {_, MessageShaperInterval} = - shaper:update(UserActivity#activity.message_shaper, + ejabberd_shaper:update(UserActivity#activity.message_shaper, 100000), {_, PresenceShaperInterval} = - shaper:update(UserActivity#activity.presence_shaper, + ejabberd_shaper:update(UserActivity#activity.presence_shaper, 100000), Delay = lists:max([MessageShaperInterval, PresenceShaperInterval, @@ -1620,7 +1620,7 @@ prepare_room_queue(StateData) -> Packet = Activity#activity.message, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - shaper:update(StateData#state.room_shaper, Size), + ejabberd_shaper:update(StateData#state.room_shaper, Size), erlang:send_after(RoomShaperInterval, self(), process_room_queue), StateData#state{room_shaper = RoomShaper}; @@ -1629,7 +1629,7 @@ prepare_room_queue(StateData) -> {_Nick, Packet} = Activity#activity.presence, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - shaper:update(StateData#state.room_shaper, Size), + ejabberd_shaper:update(StateData#state.room_shaper, Size), erlang:send_after(RoomShaperInterval, self(), process_room_queue), StateData#state{room_shaper = RoomShaper}; diff --git a/src/mod_proxy65_stream.erl b/src/mod_proxy65_stream.erl index 418e4588f..0646e28c8 100644 --- a/src/mod_proxy65_stream.erl +++ b/src/mod_proxy65_stream.erl @@ -53,7 +53,7 @@ sha1 = <<"">> :: binary(), host = <<"">> :: binary(), auth_type = anonymous :: plain | anonymous, - shaper = none :: shaper:shaper()}). + shaper = none :: ejabberd_shaper:shaper()}). %% Unused callbacks handle_event(_Event, StateName, StateData) -> @@ -248,7 +248,7 @@ relay(MySocket, PeerSocket, Shaper) -> {ok, Data} -> case gen_tcp:send(PeerSocket, Data) of ok -> - {NewShaper, Pause} = shaper:update(Shaper, byte_size(Data)), + {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)), if Pause > 0 -> timer:sleep(Pause); true -> pass end, @@ -278,11 +278,11 @@ select_auth_method(anonymous, AuthMethods) -> find_maxrate(Shaper, JID1, JID2, Host) -> MaxRate1 = case acl:match_rule(Host, Shaper, JID1) of deny -> none; - R1 -> shaper:new(R1) + R1 -> ejabberd_shaper:new(R1) end, MaxRate2 = case acl:match_rule(Host, Shaper, JID2) of deny -> none; - R2 -> shaper:new(R2) + R2 -> ejabberd_shaper:new(R2) end, if MaxRate1 == none; MaxRate2 == none -> none; true -> lists:max([MaxRate1, MaxRate2]) diff --git a/src/xmpp_socket.erl b/src/xmpp_socket.erl index 7c0500ce8..5eedce67e 100644 --- a/src/xmpp_socket.erl +++ b/src/xmpp_socket.erl @@ -70,7 +70,7 @@ socket :: socket(), max_stanza_size = infinity :: timeout(), xml_stream :: undefined | fxml_stream:xml_stream_state(), - shaper = none :: none | shaper:shaper(), + shaper = none :: none | ejabberd_shaper:shaper(), receiver :: receiver()}). -type socket_state() :: #socket_state{}. @@ -263,7 +263,7 @@ recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> end. change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) -> - ShaperState = shaper:new(Shaper), + ShaperState = ejabberd_shaper:new(Shaper), SocketData#socket_state{shaper = ShaperState}; change_shaper(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Shaper) -> @@ -373,7 +373,7 @@ parse(#socket_state{xml_stream = XMLStream, when is_binary(Data) -> ?DEBUG("(~s) Received XML on stream = ~p", [pp(SocketData), Data]), XMLStream1 = fxml_stream:parse(XMLStream, Data), - {ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)), + {ShaperState1, Pause} = ejabberd_shaper:update(ShaperState, byte_size(Data)), Ret = if Pause > 0 -> activate_after(Socket, self(), Pause); true -> diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl index 29e0d1016..fa1038ead 100644 --- a/src/xmpp_stream_in.erl +++ b/src/xmpp_stream_in.erl @@ -183,7 +183,7 @@ get_transport(#{socket := Socket, owner := Owner}) get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> state(). +-spec change_shaper(state(), ejabberd_shaper:shaper()) -> state(). change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> Socket1 = xmpp_socket:change_shaper(Socket, Shaper), diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl index 2031f0038..27accf59e 100644 --- a/src/xmpp_stream_out.erl +++ b/src/xmpp_stream_out.erl @@ -216,7 +216,7 @@ get_transport(#{socket := Socket, owner := Owner}) get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> state(). +-spec change_shaper(state(), ejabberd_shaper:shaper()) -> state(). change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> Socket1 = xmpp_socket:change_shaper(Socket, Shaper), -- 2.40.0