]> granicus.if.org Git - ejabberd/commitdiff
*** empty log message ***
authorAlexey Shchepin <alexey@process-one.net>
Sun, 9 Feb 2003 19:17:23 +0000 (19:17 +0000)
committerAlexey Shchepin <alexey@process-one.net>
Sun, 9 Feb 2003 19:17:23 +0000 (19:17 +0000)
SVN Revision: 66

src/ejabberd.cfg
src/ejabberd_c2s.erl
src/ejabberd_config.erl
src/shaper.erl [new file with mode: 0644]

index c91c7113942c19816518a7e37050032f3d2d1c65..3705030ac11e8414d0f0c2a33e0cd1ab109602b3 100644 (file)
@@ -1,6 +1,6 @@
 % $Id$
 
-override_acls.
+%override_acls.
 
 {acl, admin, {user, "aleksey"}}.
 {acl, admin, {user, "ermine"}}.
@@ -14,10 +14,13 @@ override_acls.
 {acl, jabberorg, {server, "jabber.org"}}.
 {acl, aleksey, {user, "aleksey", "jabber.ru"}}.
 
-{acl, test,  {user_glob, "test.*"}}.
+%{acl, test,  {user_regexp, "^test"}}.
 %{acl, test2, {user_glob, "test*"}}.
 
 
+{shaper, normal, {maxrate, 1000}}.
+
+
 {access, disco_admin, [{allow, admin},
                        {deny, all}]}.
 
@@ -26,9 +29,15 @@ override_acls.
 {access, c2s, [{deny, blocked},
               {allow, all}]}.
 
+
+{access, c2s_shaper, [{none, admin},
+                     {normal, all}]}.
+
+
 {host, "e.localhost"}.
 
-{listen, [{5522, ejabberd_c2s,     start, [{access, c2s}]},
+{listen, [{5522, ejabberd_c2s,     start, [{access, c2s},
+                                          {shaper, c2s_shaper}]},
          %{5523, ejabberd_c2s,     start,
          % [{access, c2s}, {ssl, [{certfile, "./ssl.pem"}]}]},
           {5269, ejabberd_s2s_in,  start, []},
index 13e7cc169b78d77506a56575c32ff59bc3de125e..a6b4e691a08138efff2afa36d1a1ae36b938c911 100644 (file)
@@ -13,7 +13,7 @@
 -behaviour(gen_fsm).
 
 %% External exports
--export([start/2, receiver/3, sender/2, send_text/2, send_element/2]).
+-export([start/2, receiver/4, send_text/2, send_element/2]).
 
 %% gen_fsm callbacks
 -export([init/1, wait_for_stream/2, wait_for_auth/2, session_established/2,
@@ -28,8 +28,9 @@
 
 -define(SETS, gb_sets).
 
--record(state, {socket, sender, receiver, streamid,
+-record(state, {socket, receiver, streamid,
                access,
+               shaper,
                user = "", server = ?MYNAME, resource = "",
                pres_t = ?SETS:new(),
                pres_f = ?SETS:new(),
@@ -76,19 +77,20 @@ start(SockData, Opts) ->
 %%          {stop, StopReason}                   
 %%----------------------------------------------------------------------
 init([{SockMod, Socket}, Opts]) ->
-    SenderPid = spawn(?MODULE, sender, [Socket, SockMod]),
-    ReceiverPid = spawn(?MODULE, receiver, [Socket, SockMod, self()]),
+    ReceiverPid = spawn(?MODULE, receiver, [Socket, SockMod, none, self()]),
     Access = case lists:keysearch(access, 1, Opts) of
-                {value, {_, A}} ->
-                    A;
-                _ ->
-                    all
+                {value, {_, A}} -> A;
+                _ -> all
+            end,
+    Shaper = case lists:keysearch(shaper, 1, Opts) of
+                {value, {_, S}} -> S;
+                _ -> none
             end,
     {ok, wait_for_stream, #state{socket   = Socket,
                                 receiver = ReceiverPid,
-                                sender   = SenderPid,
                                 streamid = new_id(),
-                                access   = Access}}.
+                                access   = Access,
+                                shaper   = Shaper}}.
 
 %%----------------------------------------------------------------------
 %% Func: StateName/2
@@ -101,13 +103,13 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
     % TODO
     Header = io_lib:format(?STREAM_HEADER,
                           [StateData#state.streamid, ?MYNAME]),
-    send_text(StateData#state.sender, Header),
+    send_text(StateData#state.socket, Header),
     case lists:keysearch("xmlns:stream", 1, Attrs) of
        {value, {"xmlns:stream", "http://etherx.jabber.org/streams"}} ->
            % TODO
            {next_state, wait_for_auth, StateData};
        _ ->
-           send_text(StateData#state.sender, ?INVALID_NS_ERR ?STREAM_TRAILER),
+           send_text(StateData#state.socket, ?INVALID_NS_ERR ?STREAM_TRAILER),
            {stop, normal, StateData}
     end;
 
@@ -119,18 +121,20 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
     case is_auth_packet(El) of
        {auth, ID, {U, P, D, ""}} ->
            Err = jlib:make_error_reply(El, "406", "Not Acceptable"),
-           send_element(StateData#state.sender, Err),
+           send_element(StateData#state.socket, Err),
            {next_state, wait_for_auth, StateData};
        {auth, ID, {U, P, D, R}} ->
            io:format("AUTH: ~p~n", [{U, P, D, R}]),
-           case acl:match_rule(StateData#state.access, {U, ?MYNAME, R}) of
+           JID = {U, ?MYNAME, R},
+           case acl:match_rule(StateData#state.access, JID) of
                allow ->
                    case ejabberd_auth:check_password(
                           U, P, StateData#state.streamid, D) of
                        true ->
                            ejabberd_sm:open_session(U, R),
                            Res = jlib:make_result_iq_reply(El),
-                           send_element(StateData#state.sender, Res),
+                           send_element(StateData#state.socket, Res),
+                           change_shaper(StateData, JID),
                            {Fs, Ts} = mod_roster:get_subscription_lists(U),
                            {next_state, session_established,
                             StateData#state{user = U,
@@ -140,12 +144,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
                        _ ->
                            Err = jlib:make_error_reply(
                                    El, "401", "Unauthorized"),
-                           send_element(StateData#state.sender, Err),
+                           send_element(StateData#state.socket, Err),
                            {next_state, wait_for_auth, StateData}
                    end;
                _ ->
                    Err = jlib:make_error_reply(El, "405", "Not Allowed"),
-                   send_element(StateData#state.sender, Err),
+                   send_element(StateData#state.socket, Err),
                    {next_state, wait_for_auth, StateData}
            end;
        _ ->
@@ -158,7 +162,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
                                                {"", "", ""},
                                                jlib:iq_to_xml(ResIQ)),
                    Res = jlib:remove_attr("to", Res1),
-                   send_element(StateData#state.sender, Res),
+                   send_element(StateData#state.socket, Res),
                    {next_state, wait_for_auth, StateData};
                _ ->
                    {next_state, wait_for_auth, StateData}
@@ -262,7 +266,7 @@ code_change(OldVsn, StateName, StateData, Extra) ->
 %%          {stop, Reason, NewStateData}                         
 %%----------------------------------------------------------------------
 handle_info({send_text, Text}, StateName, StateData) ->
-    send_text(StateData#state.sender, Text),
+    send_text(StateData#state.socket, Text),
     {next_state, StateName, StateData};
 handle_info(replaced, StateName, StateData) ->
     % TODO
@@ -333,7 +337,7 @@ handle_info({route, From, To, Packet}, StateName, StateData) ->
                                                jlib:jid_to_string(To),
                                                NewAttrs),
            Text = xml:element_to_string({xmlelement, Name, Attrs2, Els}),
-           send_text(StateData#state.sender, Text),
+           send_text(StateData#state.socket, Text),
            {next_state, StateName, NewState};
        true ->
            {next_state, StateName, NewState}
@@ -360,43 +364,46 @@ terminate(Reason, StateName, StateData) ->
             presence_broadcast(From, StateData#state.pres_a, Packet),
             presence_broadcast(From, StateData#state.pres_i, Packet)
     end,
-    StateData#state.sender ! close,
     ok.
 
 %%%----------------------------------------------------------------------
 %%% Internal functions
 %%%----------------------------------------------------------------------
 
-receiver(Socket, SockMod, C2SPid) ->
+receiver(Socket, SockMod, Shaper, C2SPid) ->
     XMLStreamPid = xml_stream:start(C2SPid),
-    receiver(Socket, SockMod, C2SPid, XMLStreamPid).
+    ShaperState = shaper:new(Shaper),
+    receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid).
 
-receiver(Socket, SockMod, C2SPid, XMLStreamPid) ->
+receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid) ->
     case SockMod:recv(Socket, 0) of
         {ok, Text} ->
+           ShaperSt1 = receive
+                           {change_shaper, Shaper} ->
+                               io:format("RECV: ChShaper to ~p~n", [Shaper]),
+                               shaper:new(Shaper)
+                       after 0 ->
+                               ShaperState
+                       end,
+           NewShaperState = shaper:update(ShaperSt1, size(Text)),
            xml_stream:send_text(XMLStreamPid, Text),
-           receiver(Socket, SockMod, C2SPid, XMLStreamPid);
+           receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid);
         {error, Reason} ->
            exit(XMLStreamPid, closed),
            gen_fsm:send_event(C2SPid, closed),
            ok
     end.
 
-sender(Socket, SockMod) ->
-    receive
-       {send_text, Text} ->
-           SockMod:send(Socket,Text),
-           sender(Socket, SockMod);
-       close ->
-           SockMod:close(Socket),
-           ok
-    end.
+change_shaper(StateData, JID) ->
+    Shaper =  acl:match_rule(StateData#state.shaper, JID),
+    StateData#state.receiver ! {change_shaper, Shaper}.
+
+send_text(Socket, Text) ->
+    gen_tcp:send(Socket,Text).
 
-send_text(Pid, Text) ->
-    Pid ! {send_text, Text}.
+send_element(Socket, El) ->
+    send_text(Socket, xml:element_to_string(El)).
 
-send_element(Pid, El) ->
-    send_text(Pid, xml:element_to_string(El)).
 
 new_id() ->
     randoms:get_string().
index 16dc956024fefa4c42d583e5de8e5df4a6f36f2b..9afa704dcab8ca4b223613de94fe594c79d5f3e8 100644 (file)
@@ -62,6 +62,10 @@ process_term(Term, State) ->
            State#state{opts = [#config{key = {access, RuleName},
                                        value = Rules} |
                                State#state.opts]};
+       {shaper, Name, Data} ->
+           State#state{opts = [#config{key = {shaper, Name},
+                                       value = Data} |
+                               State#state.opts]};
        {Opt, Val} ->
            add_option(Opt, Val, State)
     end.
diff --git a/src/shaper.erl b/src/shaper.erl
new file mode 100644 (file)
index 0000000..811315f
--- /dev/null
@@ -0,0 +1,58 @@
+%%%----------------------------------------------------------------------
+%%% File    : shaper.erl
+%%% Author  : Alexey Shchepin <alexey@sevcom.net>
+%%% Purpose : Functions to control connections traffic
+%%% Created :  9 Feb 2003 by Alexey Shchepin <alexey@sevcom.net>
+%%% Id      : $Id$
+%%%----------------------------------------------------------------------
+
+-module(shaper).
+-author('alexey@sevcom.net').
+-vsn('$Revision$ ').
+
+-export([new/1, new1/1, update/2]).
+
+-record(maxrate, {maxrate, lastrate, lasttime}).
+
+
+new(Name) ->
+    Data = case ejabberd_config:get_global_option({shaper, Name}) of
+              undefined ->
+                  none;
+              D ->
+                  D
+          end,
+    new1(Data).
+
+
+new1(none) ->
+    none;
+new1({maxrate, MaxRate}) ->
+    #maxrate{maxrate = MaxRate,
+            lastrate = 0,
+            lasttime = now_to_usec(now())}.
+
+
+update(none, Size) ->
+    none;
+update(#maxrate{} = State, Size) ->
+    MinInterv = 1000 * Size /
+       (2 * State#maxrate.maxrate - State#maxrate.lastrate),
+    Interv = (now_to_usec(now()) - State#maxrate.lasttime) / 1000,
+    %io:format("State: ~p, Size=~p~nM=~p, I=~p~n",
+    %          [State, Size, MinInterv, Interv]),
+    if
+       MinInterv > Interv ->
+           timer:sleep(1 + trunc(MinInterv - Interv));
+       true ->
+           ok
+    end,
+    Now = now_to_usec(now()),
+    State#maxrate{
+      lastrate = (State#maxrate.lastrate +
+                 1000000 * Size / (Now - State#maxrate.lasttime))/2,
+      lasttime = Now}.
+
+
+now_to_usec({MSec, Sec, USec}) ->
+    (MSec*1000000 + Sec)*1000000 + USec.