allow_host/2,
incoming_s2s_number/0,
outgoing_s2s_number/0,
- migrate/1
+ migrate/1,
+ clean_temporarily_blocked_table/0,
+ list_temporarily_blocked_hosts/0,
+ external_host_overloaded/1,
+ is_temporarly_blocked/1
]).
%% gen_server callbacks
-define(PREFIXED_NS,
[{?NS_XMPP, ?NS_XMPP_pfx}, {?NS_DIALBACK, ?NS_DIALBACK_pfx}]).
+-define(S2S_OVERLOAD_BLOCK_PERIOD, 60).
+%% once a server is temporarly blocked, it stay blocked for 60 seconds
+
-record(s2s, {fromto, pid, key}).
-record(state, {}).
+-record(temporarily_blocked, {host, timestamp}).
+
%%====================================================================
%% API
%%====================================================================
ok
end.
+clean_temporarily_blocked_table() ->
+ mnesia:clear_table(temporarily_blocked).
+list_temporarily_blocked_hosts() ->
+ ets:tab2list(temporarily_blocked).
+
+external_host_overloaded(Host) ->
+ ?INFO_MSG("Disabling connections from ~s for ~s seconds", [Host, ?S2S_OVERLOAD_BLOCK_PERIOD]),
+ mnesia:transaction( fun() ->
+ mnesia:write(#temporarily_blocked{host = Host, timestamp = now()})
+ end).
+
+is_temporarly_blocked(Host) ->
+ case mnesia:dirty_read(temporarily_blocked, Host) of
+ [] -> false;
+ [#temporarily_blocked{timestamp = T}=Entry] ->
+ case timer:now_diff(now(), T) of
+ N when N > ?S2S_OVERLOAD_BLOCK_PERIOD * 1000 * 1000 ->
+ mnesia:dirty_delete_object(Entry),
+ false;
+ _ ->
+ true
+ end
+ end.
+
+
remove_connection(FromTo, Pid, Key) ->
case catch mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo,
pid = Pid,
mnesia:add_table_copy(s2s, node(), ram_copies),
ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:register_commands(commands()),
+ mnesia:create_table(temporarily_blocked, [{ram_copies, [node()]}, {attributes, record_info(fields, temporarily_blocked)}]),
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Check if host is in blacklist or white list
allow_host(MyServer, S2SHost) ->
+ allow_host2(MyServer, S2SHost) andalso (not is_temporarly_blocked(S2SHost)).
+allow_host2(MyServer, S2SHost) ->
Hosts = ?MYHOSTS,
case lists:dropwhile(
fun(ParentDomain) ->
-module(ejabberd_s2s_in).
-author('alexey@process-one.net').
--behaviour(gen_fsm).
+-behaviour(p1_fsm).
%% External exports
-export([start/2,
-define(FSMOPTS, []).
-endif.
+-define(FSMLIMITS, [{max_queue, 2000}]). %% if queue grows more than this, we shutdown this connection.
+
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
--define(SUPERVISOR_START, gen_fsm:start(ejabberd_s2s_in, [SockData, Opts],
- ?FSMOPTS)).
+-define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_in, [SockData, Opts],
+ ?FSMOPTS ++ ?FSMLIMITS)).
-else.
-define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_in_sup,
[SockData, Opts])).
?SUPERVISOR_START.
start_link(SockData, Opts) ->
- gen_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS).
+ p1_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS ++ ?FSMLIMITS).
socket_type() ->
xml_stream.
false
end,
if
- AuthRes ->
+ AllowRemoteHost = ejabberd_s2s:allow_host("", AuthDomain),
+ AuthRes andalso AllowRemoteHost ->
(StateData#state.sockmod):reset_stream(
StateData#state.socket),
send_element(StateData,
%%----------------------------------------------------------------------
terminate(Reason, _StateName, StateData) ->
?DEBUG("terminated: ~p", [Reason]),
+ case Reason of
+ {process_limit, _} ->
+ [ejabberd_s2s:external_host_overloaded(Host) || Host <- get_external_hosts(StateData)];
+ _ ->
+ ok
+ end,
(StateData#state.sockmod):close(StateData#state.socket),
ok.
+get_external_hosts(StateData) ->
+ case StateData#state.authenticated of
+ true ->
+ [StateData#state.auth_domain];
+ false ->
+ Connections = StateData#state.connections,
+ [D || {{D, _}, established} <- dict:to_list(Connections)]
+ end.
+
+
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------