_ ->
SockMod:setopts(Socket, [{packet, 0}, {active, true}])
end,
- ws_loop(none, Socket, WsHandleLoopPid, SockMod).
+ ws_loop(none, Socket, WsHandleLoopPid, SockMod, none).
handshake(#ws{headers = Headers} = State) ->
{_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1,
end.
-ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) ->
+ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
receive
{DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw ->
- case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of
+ case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) of
{error, Error} ->
?DEBUG("tls decode error ~p", [Error]),
websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error
- {NewFrameInfo, ToSend} ->
+ {NewFrameInfo, ToSend, NewShaper} ->
lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt)
end, ToSend),
- ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode)
+ ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper)
end;
+ {new_shaper, NewShaper} ->
+ NewShaper = case NewShaper of
+ none when Shaper /= none ->
+ activate(Socket, SocketMode, true), none;
+ _ ->
+ NewShaper
+ end,
+ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper);
{tcp_closed, _Socket} ->
?DEBUG("tcp connection was closed, exit", []),
websocket_close(Socket, WsHandleLoopPid, SocketMode, 0);
{send, Data} ->
SocketMode:send(Socket, encode_frame(Data, 1)),
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
- SocketMode);
+ SocketMode, Shaper);
{ping, Data} ->
SocketMode:send(Socket, encode_frame(Data, 9)),
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
- SocketMode);
+ SocketMode, Shaper);
shutdown ->
?DEBUG("shutdown request received, closing websocket "
"with pid ~p",
?WARNING_MSG("received unexpected message, ignoring: ~p",
[_Ignored]),
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
- SocketMode)
+ SocketMode, Shaper)
end.
encode_frame(Data, Opcode) ->
process_frame(FrameInfo#frame_info{unprocessed = <<>>},
<<UnprocessedPre/binary, Data/binary>>).
-handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls) ->
+handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls, Shaper) ->
case fast_tls:recv_data(Socket, Data) of
{ok, NewData} ->
- handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls);
+ handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls, Shaper);
{error, Error} ->
{error, Error}
end;
-handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) ->
- handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod).
+handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper) ->
+ handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper).
-handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) ->
+handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
{NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data),
lists:foreach(fun (El) ->
case El of
end
end,
Recv),
- {NewFrameInfo, Send}.
+ {NewFrameInfo, Send, handle_shaping(Data, Socket, SocketMode, Shaper)}.
websocket_close(Socket, WsHandleLoopPid,
SocketMode, CloseCode) when CloseCode > 0 ->
websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) ->
WsHandleLoopPid ! closed,
SocketMode:close(Socket).
+
+handle_shaping(_Data, _Socket, _SocketMode, none) ->
+ none;
+handle_shaping(Data, Socket, SocketMode, Shaper) ->
+ {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)),
+ if Pause > 0 ->
+ activate_after(Socket, self(), Pause);
+ true -> activate(Socket, SocketMode, once)
+ end,
+ NewShaper.
+
+activate(Socket, SockMod, ActiveState) ->
+ case SockMod of
+ gen_tcp -> inet:setopts(Socket, [{active, ActiveState}]);
+ _ -> SockMod:setopts(Socket, [{active, ActiveState}])
+ end.
+
+activate_after(Socket, Pid, Pause) ->
+ if Pause > 0 ->
+ erlang:send_after(Pause, Pid, {tcp, Socket, <<>>});
+ true ->
+ Pid ! {tcp, Socket, <<>>}
+ end,
+ ok.