wait_for_auth_result/2,
wait_for_starttls_proceed/2,
reopen_socket/2,
+ wait_before_retry/2,
stream_established/2,
handle_event/3,
handle_sync_event/4,
%% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS).
-define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_out, [From, Host, Type],
- ?FSMOPTS)).
+ ?FSMLIMITS ++ ?FSMOPTS)).
-else.
-define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_out_sup,
[From, Host, Type])).
%% Only change this value if you now what your are doing:
-define(FSMLIMITS,[]).
%% -define(FSMLIMITS, [{max_queue, 2000}]).
+-define(FSMTIMEOUT, 5000).
-define(STREAM_HEADER,
"<?xml version='1.0'?>"
start_link(From, Host, Type) ->
p1_fsm:start_link(ejabberd_s2s_out, [From, Host, Type],
- ?FSMLIMITS ++ ?FSMOPTS).
+ ?FSMLIMITS ++ ?FSMOPTS).
start_connection(Pid) ->
p1_fsm:send_event(Pid, init).
server = Server,
new = New,
verify = Verify,
- timer = Timer}}.
+ timer = Timer}, ?FSMTIMEOUT}.
%%----------------------------------------------------------------------
%% Func: StateName/2
send_text(NewStateData, io_lib:format(?STREAM_HEADER,
[StateData#state.server,
Version])),
- {next_state, wait_for_stream, NewStateData};
+ {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
{error, _Reason} ->
?INFO_MSG("s2s connection: ~s -> ~s (remote server not found)",
[StateData#state.myname, StateData#state.server]),
- {stop, normal, StateData}
+ wait_before_reconnect(StateData, 300000)
+ %%{stop, normal, StateData}
end;
open_socket(stop, StateData) ->
+ ?INFO_MSG("s2s connection: ~s -> ~s (stopped in open socket)",
+ [StateData#state.myname, StateData#state.server]),
+ {stop, normal, StateData};
+open_socket(timeout, StateData) ->
+ ?INFO_MSG("s2s connection: ~s -> ~s (timeout in open socket)",
+ [StateData#state.myname, StateData#state.server]),
{stop, normal, StateData};
open_socket(_, StateData) ->
- {next_state, open_socket, StateData}.
+ {next_state, open_socket, StateData, ?FSMTIMEOUT}.
%%----------------------------------------------------------------------
open_socket1(Addr, Port) ->
send_db_request(StateData);
{"jabber:server", "jabber:server:dialback", true} when
StateData#state.use_v10 ->
- {next_state, wait_for_features, StateData};
+ {next_state, wait_for_features, StateData, ?FSMTIMEOUT};
{"jabber:server", "", true} when StateData#state.use_v10 ->
- {next_state, wait_for_features, StateData#state{db_enabled = false}};
+ {next_state, wait_for_features, StateData#state{db_enabled = false}, ?FSMTIMEOUT};
_ ->
send_text(StateData, ?INVALID_NAMESPACE_ERR),
?INFO_MSG("Closing s2s connection: ~s -> ~s (invalid namespace)",
?DEBUG("recv verify: ~p", [{From, To, Id, Type}]),
case StateData#state.verify of
false ->
- {next_state, wait_for_validation, StateData};
+ %% TODO: Should'nt we close the connection here ?
+ {next_state, wait_for_validation, StateData, ?FSMTIMEOUT};
{Pid, _Key, _SID} ->
case Type of
"valid" ->
StateData#state.verify == false ->
{stop, normal, StateData};
true ->
- {next_state, wait_for_validation, StateData}
+ {next_state, wait_for_validation, StateData,
+ ?FSMTIMEOUT*3}
end
end;
_ ->
- {next_state, wait_for_validation, StateData}
+ {next_state, wait_for_validation, StateData, ?FSMTIMEOUT*3}
end;
wait_for_validation({xmlstreamend, Name}, StateData) ->
+ ?INFO_MSG("wait for validation: ~s -> ~s (xmlstreamend)",
+ [StateData#state.myname, StateData#state.server]),
{stop, normal, StateData};
wait_for_validation({xmlstreamerror, _}, StateData) ->
+ ?INFO_MSG("wait for validation: ~s -> ~s (xmlstreamerror)",
+ [StateData#state.myname, StateData#state.server]),
send_text(StateData,
?INVALID_XML_ERR ++ ?STREAM_TRAILER),
{stop, normal, StateData};
wait_for_validation(timeout, StateData) ->
+ ?INFO_MSG("wait_for_validation: ~s -> ~s (connect timeout)",
+ [StateData#state.myname, StateData#state.server]),
{stop, normal, StateData};
wait_for_validation(closed, StateData) ->
+ ?INFO_MSG("wait for validation: ~s -> ~s (closed)",
+ [StateData#state.myname, StateData#state.server]),
{stop, normal, StateData}.
jlib:encode_base64(
StateData#state.myname)}]}),
{next_state, wait_for_auth_result,
- StateData#state{try_auth = false}};
+ StateData#state{try_auth = false}, ?FSMTIMEOUT};
StartTLS and StateData#state.tls and
(not StateData#state.tls_enabled) ->
send_element(StateData,
{xmlelement, "starttls",
[{"xmlns", ?NS_TLS}], []}),
- {next_state, wait_for_starttls_proceed, StateData};
+ {next_state, wait_for_starttls_proceed, StateData,
+ ?FSMTIMEOUT};
StartTLSRequired and (not StateData#state.tls) ->
?DEBUG("restarted: ~p", [{StateData#state.myname,
StateData#state.server}]),
ejabberd_socket:close(StateData#state.socket),
{next_state, reopen_socket,
StateData#state{socket = undefined,
- use_v10 = false}};
+ use_v10 = false}, ?FSMTIMEOUT};
StateData#state.db_enabled ->
send_db_request(StateData);
true ->
% TODO: clear message queue
ejabberd_socket:close(StateData#state.socket),
{next_state, reopen_socket, StateData#state{socket = undefined,
- use_v10 = false}}
+ use_v10 = false}, ?FSMTIMEOUT}
end;
_ ->
send_text(StateData,
{next_state, wait_for_stream,
StateData#state{streamid = new_id(),
authenticated = true
- }};
+ }, ?FSMTIMEOUT};
_ ->
send_text(StateData,
xml:element_to_string(?SERR_BAD_FORMAT) ++
StateData#state.server}]),
ejabberd_socket:close(StateData#state.socket),
{next_state, reopen_socket,
- StateData#state{socket = undefined}};
+ StateData#state{socket = undefined}, ?FSMTIMEOUT};
_ ->
send_text(StateData,
xml:element_to_string(?SERR_BAD_FORMAT) ++
io_lib:format(?STREAM_HEADER,
[StateData#state.server,
" version='1.0'"])),
- {next_state, wait_for_stream, NewStateData};
+ {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
_ ->
send_text(StateData,
xml:element_to_string(?SERR_BAD_FORMAT) ++
reopen_socket({xmlstreamelement, El}, StateData) ->
- {next_state, reopen_socket, StateData};
+ {next_state, reopen_socket, StateData, ?FSMTIMEOUT};
reopen_socket({xmlstreamend, Name}, StateData) ->
- {next_state, reopen_socket, StateData};
+ {next_state, reopen_socket, StateData, ?FSMTIMEOUT};
reopen_socket({xmlstreamerror, _}, StateData) ->
- {next_state, reopen_socket, StateData};
+ {next_state, reopen_socket, StateData, ?FSMTIMEOUT};
reopen_socket(timeout, StateData) ->
+ ?INFO_MSG("reopen socket: timeout", []),
{stop, normal, StateData};
reopen_socket(closed, StateData) ->
p1_fsm:send_event(self(), init),
- {next_state, open_socket, StateData}.
+ {next_state, open_socket, StateData, ?FSMTIMEOUT}.
+%% This state is use to avoid reconnecting to often to bad sockets
+wait_before_retry(Event, StateData) ->
+ {next_state, wait_before_retry, StateData, ?FSMTIMEOUT}.
stream_established({xmlstreamelement, El}, StateData) ->
?DEBUG("s2S stream established", []),
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
- {next_state, StateName, StateData}.
+ {next_state, StateName, StateData, get_timeout_interval(StateName)}.
%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
- {reply, Reply, StateName, StateData}.
+ {reply, Reply, StateName, StateData, get_timeout_interval(StateName)}.
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.
send_text(StateData, Text),
cancel_timer(StateData#state.timer),
Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
- {next_state, StateName, StateData#state{timer = Timer}};
+ {next_state, StateName, StateData#state{timer = Timer},
+ get_timeout_interval(StateName)};
handle_info({send_element, El}, StateName, StateData) ->
- cancel_timer(StateData#state.timer),
- Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
case StateName of
stream_established ->
+ cancel_timer(StateData#state.timer),
+ Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
send_element(StateData, El),
{next_state, StateName, StateData#state{timer = Timer}};
+ %% In this state we bounce all message: We are waiting before
+ %% trying to reconnect
+ wait_before_retry ->
+ bounce_element(El, ?ERR_REMOTE_SERVER_NOT_FOUND),
+ {next_state, StateName, StateData};
_ ->
Q = queue:in(El, StateData#state.queue),
- {next_state, StateName, StateData#state{queue = Q,
- timer = Timer}}
+ {next_state, StateName, StateData#state{queue = Q},
+ get_timeout_interval(StateName)}
end;
+handle_info({timeout, Timer, _}, wait_before_retry,
+ #state{timer = Timer} = StateData) ->
+ ?INFO_MSG("Reconnect delay expired: Will now retry to connect to ~s when needed.", [StateData#state.server]),
+ {stop, normal, StateData};
+
handle_info({timeout, Timer, _}, StateName,
#state{timer = Timer} = StateData) ->
+ ?INFO_MSG("Closing connection with ~s: timeout", [StateData#state.server]),
{stop, normal, StateData};
handle_info(_, StateName, StateData) ->
- {next_state, StateName, StateData}.
+ {next_state, StateName, StateData, get_timeout_interval(StateName)}.
%%----------------------------------------------------------------------
%% Func: terminate/3
ok
end.
+%% Bounce a single message (xmlelement)
+bounce_element(El, Error) ->
+ Err = jlib:make_error_reply(El, Error),
+ From = jlib:string_to_jid(xml:get_tag_attr_s("from", El)),
+ To = jlib:string_to_jid(xml:get_tag_attr_s("to", El)),
+ ejabberd_router:route(To, From, Err).
+
bounce_queue(Q, Error) ->
case queue:out(Q) of
{{value, El}, Q1} ->
- Err = jlib:make_error_reply(El, Error),
- From = jlib:string_to_jid(xml:get_tag_attr_s("from", El)),
- To = jlib:string_to_jid(xml:get_tag_attr_s("to", El)),
- ejabberd_router:route(To, From, Err),
+ bounce_element(El, Error),
bounce_queue(Q1, Error);
- {empty, Q1} ->
+ {empty, _} ->
ok
end.
"error" ->
ok;
_ ->
- Err = jlib:make_error_reply(El, Error),
- From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)),
- To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)),
- ejabberd_router:route(To, From, Err)
+ bounce_element(El, Error)
end,
bounce_messages(Error)
after 0 ->
{"id", SID}],
[{xmlcdata, Key2}]})
end,
- {next_state, wait_for_validation, StateData#state{new = New}}.
+ {next_state, wait_for_validation, StateData#state{new = New}, ?FSMTIMEOUT*6}.
is_verify_res({xmlelement, Name, Attrs, Els}) when Name == "db:result" ->
%% Log new outgoing connections:
log_s2s_out(_, Myname, Server) ->
?INFO_MSG("Trying to open s2s connection: ~s -> ~s",[Myname, Server]).
+
+%% Calcultate timeout depending on which state we are in:
+%% Can return integer > 0 | infinity
+get_timeout_interval(StateName) ->
+ case StateName of
+ %% Validation implies dialback: Networking can take longer:
+ wait_for_validation ->
+ ?FSMTIMEOUT*6;
+ %% When stream is established, we only rely on S2S Timeout timer:
+ stream_established ->
+ infinity;
+ _ ->
+ ?FSMTIMEOUT
+ end.
+
+%% This function is intended to be called at the end of a state
+%% function that want to wait for a reconnect delay before stopping.
+wait_before_reconnect(StateData, Delay) ->
+ %% bounce queue manage by process and Erlang message queue
+ bounce_queue(StateData#state.queue, ?ERR_REMOTE_SERVER_NOT_FOUND),
+ bounce_messages(?ERR_REMOTE_SERVER_NOT_FOUND),
+ cancel_timer(StateData#state.timer),
+ Timer = erlang:start_timer(Delay, self(), []),
+ {next_state, wait_before_retry, StateData#state{timer=Timer,
+ queue = queue:new()}}.