ejabberd_auth:start(),
ejabberd_router:start(),
ejabberd_sm:start(),
+ ejabberd_s2s:start(),
ejabberd_local:start(),
ejabberd_listener:start(),
loop(Port).
-define(DEBUG(F,A),[]).
-endif.
+
+-define(MYNAME,"127.0.0.1").
+
handle_info/3,
terminate/3]).
--record(state, {socket, sender, receiver, streamid,
- user = "", server = "localhost", resource = ""}).
-
-include("ejabberd.hrl").
+-record(state, {socket, sender, receiver, streamid,
+ user = "", server = ?MYNAME, resource = ""}).
+
-define(DBGFSM, true).
-ifdef(DBGFSM).
wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
% TODO
Header = io_lib:format(?STREAM_HEADER,
- [StateData#state.streamid, "localhost"]),
+ [StateData#state.streamid, ?MYNAME]),
send_text(StateData#state.sender, Header),
case lists:keysearch("xmlns:stream", 1, Attrs) of
{value, {"xmlns:stream", "http://etherx.jabber.org/streams"}} ->
spawn(ejabberd_local, init, []).
init() ->
- ejabberd_router:register_local_route("localhost"),
+ ejabberd_router:register_local_route(?MYNAME),
loop().
loop() ->
case mnesia:transaction(F) of
{atomic, error} ->
% TODO: start s2s instead of below
- {xmlelement, Name, Attrs, SubTags} = Packet,
- case xml:get_attr_s("type", Attrs) of
- "error" ->
- ok;
- _ ->
- Err = jlib:make_error_reply(Packet,
- "502", "Service Unavailable"),
- ejabberd_router ! {route, To, From, Err}
- end;
+ ejabberd_s2s ! {route, From, To, Packet};
+ %{xmlelement, Name, Attrs, SubTags} = Packet,
+ %case xml:get_attr_s("type", Attrs) of
+ % "error" ->
+ % ok;
+ % _ ->
+ % Err = jlib:make_error_reply(Packet,
+ % "502", "Service Unavailable"),
+ % ejabberd_router ! {route, To, From, Err}
+ %end;
{atomic, {ok, Node, Pid}} ->
case node() of
Node ->
--- /dev/null
+%%%----------------------------------------------------------------------
+%%% File : ejabberd_s2s.erl
+%%% Author : Alexey Shchepin <alexey@sevcom.net>
+%%% Purpose :
+%%% Created : 7 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
+%%% Id : $Id$
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_s2s).
+-author('alexey@sevcom.net').
+-vsn('$Revision$ ').
+
+-export([start/0, init/0, open_session/2, close_session/2,
+ have_connection/1,
+ get_key/1]).
+
+-include_lib("mnemosyne/include/mnemosyne.hrl").
+-include("ejabberd.hrl").
+
+-record(s2s, {server, node, key}).
+-record(mys2s, {server, pid}).
+
+
+start() ->
+ spawn(ejabberd_s2s, init, []).
+
+init() ->
+ register(ejabberd_s2s, self()),
+ mnesia:create_table(s2s,[{ram_copies, [node()]},
+ {attributes, record_info(fields, s2s)}]),
+ mnesia:add_table_index(session, node),
+ mnesia:create_table(mys2s,
+ [{ram_copies, [node()]},
+ {local_content, true},
+ {attributes, record_info(fields, mys2s)}]),
+ mnesia:subscribe(system),
+ loop().
+
+loop() ->
+ receive
+ %{open_connection, User, Resource, From} ->
+ % replace_and_register_my_connection(User, Resource, From),
+ % replace_alien_connection(User, Resource),
+ % loop();
+ {closed_conection, Server} ->
+ remove_connection(Server),
+ loop();
+ %{replace, User, Resource} ->
+ % replace_my_connection(User, Resource),
+ % loop();
+ {mnesia_system_event, {mnesia_down, Node}} ->
+ clean_table_from_bad_node(Node),
+ loop();
+ {route, From, To, Packet} ->
+ do_route(From, To, Packet),
+ loop();
+ _ ->
+ loop()
+ end.
+
+
+open_session(User, Resource) ->
+ ejabberd_s2s ! {open_session, User, Resource, self()}.
+
+close_session(User, Resource) ->
+ ejabberd_s2s ! {close_session, User, Resource}.
+
+%replace_alien_connection(User, Resource) ->
+% F = fun() ->
+% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource),
+% X.user = User,
+% X.resource = Resource]
+% end),
+% Es = mnesia:read({session, ID}),
+% mnesia:write(#session{id = ID, node = node()}),
+% Es
+% end,
+% case mnesia:transaction(F) of
+% {atomic, Rs} ->
+% lists:foreach(
+% fun(R) ->
+% if R#session.node /= node() ->
+% {ejabberd_s2s, R#session.node} !
+% {replace, User, Resource};
+% true ->
+% ok
+% end
+% end, Rs);
+% _ ->
+% false
+% end.
+%
+%
+%replace_my_connection(User, Resource) ->
+% F = fun() ->
+% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource),
+% X.user = User,
+% X.resource = Resource]
+% end),
+%
+% Es = mnesia:read({mysession, ID}),
+% mnesia:delete({mysession, ID}),
+% Es
+% end,
+% case mnesia:transaction(F) of
+% {atomic, Rs} ->
+% lists:foreach(
+% fun(R) ->
+% (R#mysession.info)#mysession_info.pid ! replaced
+% end, Rs);
+% _ ->
+% false
+% end.
+
+remove_connection(Server) ->
+ F = fun() ->
+ mnesia:delete({mys2s, Server}),
+ mnesia:delete({s2s, Server})
+ end,
+ mnesia:transaction(F).
+
+%replace_and_register_my_connection(User, Resource, Pid) ->
+% F = fun() ->
+% IDs = mnemosyne:eval(query [X.id || X <- table(user_resource),
+% X.user = User,
+% X.resource = Resource]
+% end),
+%
+% ID = case IDs of
+% [Id] -> Id;
+% [] ->
+% [CurID] =
+% mnemosyne:eval(
+% query [X.id ||
+% X <- table(user_resource_id_seq)]
+% end),
+% mnesia:write(
+% #user_resource_id_seq{id = CurID + 1}),
+% mnesia:write(
+% #user_resource{id = CurID,
+% user = User,
+% resource = Resource}),
+% CurID
+% end,
+% Es = mnesia:read({mysession, ID}),
+% mnesia:write(#mysession{id = ID,
+% info = #mysession_info{pid = Pid}}),
+% Es
+% end,
+% case mnesia:transaction(F) of
+% {atomic, Rs} ->
+% lists:foreach(
+% fun(R) ->
+% (R#mysession.info)#mysession_info.pid ! replaced
+% end, Rs);
+% _ ->
+% false
+% end.
+
+
+clean_table_from_bad_node(Node) ->
+ F = fun() ->
+ Es = mnesia:index_read(s2s, Node, #s2s.node),
+ lists:foreach(fun(E) ->
+ mnesia:delete_object(s2s, E, write)
+ end, Es)
+ end,
+ mnesia:transaction(F).
+
+have_connection(Server) ->
+ F = fun() ->
+ [E] = mnesia:read({s2s, Server})
+ end,
+ case mnesia:transaction(F) of
+ {atomic, _} ->
+ true;
+ _ ->
+ false
+ end.
+
+get_key(Server) ->
+ F = fun() ->
+ [E] = mnesia:read({s2s, Server}),
+ E
+ end,
+ case mnesia:transaction(F) of
+ {atomic, E} ->
+ E#s2s.key;
+ _ ->
+ ""
+ end.
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+do_route(From, To, Packet) ->
+ ?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
+ [From, To, Packet, 8]),
+ {User, Server, Resource} = To,
+ Key = lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])),
+ F = fun() ->
+ case mnesia:read({mys2s, Server}) of
+ [] ->
+ case mnesia:read({s2s, Server}) of
+ [Er] ->
+ {remote, Er#s2s.node};
+ [] ->
+ % TODO
+ mnesia:write(#s2s{server = Server,
+ node = node(),
+ key = Key}),
+ new
+ end;
+ [El] ->
+ {local, El#mys2s.pid}
+ end
+ end,
+ case mnesia:transaction(F) of
+ {atomic, {local, Pid}} ->
+ ?DEBUG("sending to process ~p~n", [Pid]),
+ % TODO
+ {xmlelement, Name, Attrs, Els} = Packet,
+ NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
+ jlib:jid_to_string(To),
+ Attrs),
+ send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
+ ok;
+ {atomic, {remote, Node}} ->
+ ?DEBUG("sending to node ~p~n", [Node]),
+ {ejabberd_s2s, Node} ! {route, From, To, Packet},
+ ok;
+ {atomic, new} ->
+ ?DEBUG("starting new s2s connection~n", []),
+ Pid = ejabberd_s2s_out:start(Server, {new, Key}),
+ mnesia:transaction(fun() -> mnesia:write(#mys2s{server = Server,
+ pid = Pid}) end),
+ {xmlelement, Name, Attrs, Els} = Packet,
+ NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
+ jlib:jid_to_string(To),
+ Attrs),
+ send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
+ ok;
+ {atomic, not_exists} ->
+ ?DEBUG("packet droped~n", []),
+ ok;
+ {aborted, Reason} ->
+ ?DEBUG("delivery failed: ~p~n", [Reason]),
+ false
+ end.
+
+send_element(Pid, El) ->
+ Pid ! {send_element, El}.
-export([start/1, receiver/2, send_text/2, send_element/2]).
%% gen_fsm callbacks
--export([init/1, wait_for_stream/2, wait_for_key/2, session_established/2,
+-export([init/1,
+ wait_for_stream/2,
+ wait_for_key/2,
+ wait_for_verification/2,
+ stream_established/2,
handle_info/3,
terminate/3]).
--record(state, {socket, receiver, streamid,
- myself = "localhost", server}).
-
-include("ejabberd.hrl").
+-record(state, {socket, receiver, streamid,
+ myself = ?MYNAME, server, queue}).
+
-define(DBGFSM, true).
-ifdef(DBGFSM).
ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]),
{ok, wait_for_stream, #state{socket = Socket,
receiver = ReceiverPid,
- streamid = new_id()}}.
+ streamid = new_id(),
+ queue = queue:new()}}.
%%----------------------------------------------------------------------
%% Func: StateName/2
case is_key_packet(El) of
{key, To, From, Id, Key} ->
io:format("GET KEY: ~p~n", [{To, From, Id, Key}]),
- % TODO
ejabberd_s2s_out:start(From, {verify, self(), Key}),
- {next_state, wait_for_key, StateData};
+ {next_state,
+ wait_for_verification,
+ StateData#state{server = From}};
{verify, To, From, Id, Key} ->
io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
- % TODO
+ Key1 = ejabberd_s2s:get_key(From),
+ Type = if Key == Key1 -> "valid";
+ true -> "invalid"
+ end,
+ send_element(StateData#state.socket,
+ {xmlelement,
+ "db:verify",
+ [{"from", ?MYNAME},
+ {"to", From},
+ {"id", Id},
+ {"type", Type}],
+ []}),
{next_state, wait_for_key, StateData};
_ ->
{next_state, wait_for_key, StateData}
wait_for_key(closed, StateData) ->
{stop, normal, StateData}.
-session_established({xmlstreamelement, El}, StateData) ->
+wait_for_verification(valid, StateData) ->
+ send_element(StateData#state.socket,
+ {xmlelement,
+ "db:result",
+ [{"from", ?MYNAME},
+ {"to", StateData#state.server},
+ {"type", "valid"}],
+ []}),
+ send_queue(StateData#state.socket, StateData#state.queue),
+ {next_state, stream_established, StateData};
+
+wait_for_verification(invalid, StateData) ->
+ send_element(StateData#state.socket,
+ {xmlelement,
+ "db:result",
+ [{"from", ?MYNAME},
+ {"to", StateData#state.server},
+ {"type", "invalid"}],
+ []}),
+ {stop, normal, StateData};
+
+wait_for_verification({xmlstreamelement, El}, StateData) ->
+ case is_key_packet(El) of
+ {verify, To, From, Id, Key} ->
+ io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
+ Key1 = ejabberd_s2s:get_key(From),
+ Type = if Key == Key1 -> "valid";
+ true -> "invalid"
+ end,
+ send_element(StateData#state.socket,
+ {xmlelement,
+ "db:verify",
+ [{"from", ?MYNAME},
+ {"to", From},
+ {"id", Id},
+ {"type", Type}],
+ []}),
+ {next_state, wait_for_verification, StateData};
+ _ ->
+ {next_state, wait_for_verification, StateData}
+ end;
+
+wait_for_verification({xmlstreamend, Name}, StateData) ->
+ % TODO
+ {stop, normal, StateData};
+
+wait_for_verification(closed, StateData) ->
+ {stop, normal, StateData}.
+
+stream_established({xmlstreamelement, El}, StateData) ->
{xmlelement, Name, Attrs, Els} = El,
% TODO
- FromJID = {StateData#state.server},
+ From = xml:get_attr_s("from", Attrs),
+ FromJID1 = jlib:string_to_jid(From),
+ FromJID = case FromJID1 of
+ {Node, Server, Resource} ->
+ if Server == StateData#state.server -> FromJID1;
+ true -> error
+ end;
+ _ -> error
+ end,
To = xml:get_attr_s("to", Attrs),
ToJID = case To of
- "" ->
- {"", StateData#state.server, ""};
- _ ->
- jlib:string_to_jid(To)
+ "" -> error;
+ _ -> jlib:string_to_jid(To)
end,
- case ToJID of
- error ->
- % TODO
- error;
- _ ->
- %?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]),
- ejabberd_router:route(FromJID, ToJID, El)
+ if ((Name == "iq") or (Name == "message") or (Name == "presence")) and
+ (ToJID /= error) and (FromJID /= error) ->
+ ejabberd_router:route(FromJID, ToJID, El);
+ true ->
+ error
end,
- {next_state, session_established, StateData};
+ {next_state, stream_established, StateData};
+
+stream_established({xmlstreamend, Name}, StateData) ->
+ % TODO
+ {stop, normal, StateData};
-session_established(closed, StateData) ->
+stream_established(closed, StateData) ->
% TODO
{stop, normal, StateData}.
%%----------------------------------------------------------------------
handle_info({send_text, Text}, StateName, StateData) ->
send_text(StateData#state.socket, Text),
- {next_state, StateName, StateData}.
+ {next_state, StateName, StateData};
+handle_info({send_element, El}, StateName, StateData) ->
+ case StateName of
+ stream_established ->
+ send_element(StateData#state.socket, El),
+ {next_state, StateName, StateData};
+ _ ->
+ Q = queue:in(El, StateData#state.queue),
+ {next_state, StateName, StateData#state{queue = Q}}
+ end.
+
%%----------------------------------------------------------------------
%% Func: terminate/3
% %ejabberd_sm:close_session(StateData#state.user,
% % StateData#state.resource)
% end,
+ %ejabberd_s2s ! {closed_conection, StateData#state.server},
gen_tcp:close(StateData#state.socket),
ok.
send_element(Socket, El) ->
send_text(Socket, xml:element_to_string(El)).
+send_queue(Socket, Q) ->
+ case queue:out(Q) of
+ {{value, El}, Q1} ->
+ send_element(Socket, El),
+ send_queue(Socket, Q1);
+ {empty, Q1} ->
+ ok
+ end.
+
+
new_id() ->
lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])).
is_key_packet(_) ->
false.
-get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) ->
- CData = xml:get_cdata(Els),
- case Name of
- "username" ->
- get_auth_tags(L, CData, P, D, R);
- "password" ->
- get_auth_tags(L, U, CData, D, R);
- "digest" ->
- get_auth_tags(L, U, P, CData, R);
- "resource" ->
- get_auth_tags(L, U, P, D, CData);
- _ ->
- get_auth_tags(L, U, P, D, R)
- end;
-get_auth_tags([_ | L], U, P, D, R) ->
- get_auth_tags(L, U, P, D, R);
-get_auth_tags([], U, P, D, R) ->
- {U, P, D, R}.
-
-
%% gen_fsm callbacks
-export([init/1,
+ open_socket/2,
wait_for_stream/2,
- wait_for_key/2,
+ wait_for_validation/2,
wait_for_verification/2,
- session_established/2,
+ stream_established/2,
handle_info/3,
terminate/3]).
--record(state, {socket, receiver, streamid,
- myself = "localhost", server, type, xmlpid}).
-
-include("ejabberd.hrl").
+-record(state, {socket, receiver, streamid,
+ myself = ?MYNAME, server, type, xmlpid, queue}).
+
-define(DBGFSM, true).
-ifdef(DBGFSM).
%%% API
%%%----------------------------------------------------------------------
start(Host, Type) ->
- gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS).
+ {ok, Pid} = gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS),
+ Pid.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
-init([Host, Type]) ->
- {ok, Socket} = gen_tcp:connect(Host, 5569,
- [binary, {packet, 0}]),
- XMLStreamPid = xml_stream:start(self()),
- %ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]),
- send_text(Socket, ?STREAM_HEADER),
- {ok, wait_for_stream, #state{socket = Socket,
- xmlpid = XMLStreamPid,
- streamid = new_id(),
- server = Host,
- type = Type}}.
+init([Server, Type]) ->
+ gen_fsm:send_event(self(), init),
+ {ok, open_socket, #state{queue = queue:new(),
+ server = Server,
+ type = Type}}.
%%----------------------------------------------------------------------
%% Func: StateName/2
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
-state_name(Event, StateData) ->
- {next_state, state_name, StateData}.
+open_socket(init, StateData) ->
+ case gen_tcp:connect(StateData#state.server, 5569,
+ [binary, {packet, 0}]) of
+ {ok, Socket} ->
+ XMLStreamPid = xml_stream:start(self()),
+ send_text(Socket, ?STREAM_HEADER),
+ {next_state, wait_for_stream,
+ StateData#state{socket = Socket,
+ xmlpid = XMLStreamPid,
+ streamid = new_id()}};
+ {error, Reason} ->
+ ?DEBUG("s2s_out: connect return ~p~n", [Reason]),
+ Text = case Reason of
+ timeout -> "Server Connect Timeout";
+ _ -> "Server Connect Failed"
+ end,
+ bounce_messages(Text),
+ {stop, normal, StateData}
+ end.
+
wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
% TODO
case {xml:get_attr_s("xmlns", Attrs), xml:get_attr_s("xmlns:db", Attrs)} of
{"jabber:server", "jabber:server:dialback"} ->
case StateData#state.type of
- new ->
+ {new, Key} ->
+ send_element(StateData#state.socket,
+ {xmlelement,
+ "db:result",
+ [{"from", ?MYNAME},
+ {"to", StateData#state.server}],
+ [{xmlcdata, Key}]}),
% TODO
- {next_state, wait_for_key, StateData};
+ {next_state, wait_for_validation, StateData};
{verify, Pid, Key} ->
send_element(StateData#state.socket,
{xmlelement,
"db:verify",
- [{"from", "127.0.0.1"},
+ [{"from", ?MYNAME},
{"to", StateData#state.server}],
[{xmlcdata, Key}]}),
{next_state, wait_for_verification, StateData}
{stop, normal, StateData}.
-wait_for_key({xmlstreamelement, El}, StateData) ->
- case is_key_packet(El) of
- {key, To, From, Id, Key} ->
- io:format("GET KEY: ~p~n", [{To, From, Id, Key}]),
- % TODO
- {next_state, wait_for_key, StateData};
- {verify, To, From, Id, Key} ->
- io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
- % TODO
- {next_state, wait_for_key, StateData};
+
+wait_for_validation({xmlstreamelement, El}, StateData) ->
+ case is_verify_res(El) of
+ {result, To, From, Id, Type} ->
+ case Type of
+ "valid" ->
+ % TODO
+ send_queue(StateData#state.socket, StateData#state.queue),
+ {next_state, stream_established, StateData};
+ _ ->
+ % TODO: bounce packets
+ {stop, normal, StateData}
+ end;
_ ->
- {next_state, wait_for_key, StateData}
+ {next_state, wait_for_validation, StateData}
end;
-wait_for_key({xmlstreamend, Name}, StateData) ->
+wait_for_validation({xmlstreamend, Name}, StateData) ->
% TODO
{stop, normal, StateData};
-wait_for_key(closed, StateData) ->
+wait_for_validation(closed, StateData) ->
{stop, normal, StateData}.
+
wait_for_verification({xmlstreamelement, El}, StateData) ->
case is_verify_res(El) of
{result, To, From, Id, Type} ->
+ {verify, Pid, Key} = StateData#state.type,
case Type of
"valid" ->
- io:format("VALID KEY~n", []);
+ io:format("VALID KEY~n", []),
+ gen_fsm:send_event(Pid, valid);
% TODO
_ ->
% TODO
- ok
+ gen_fsm:send_event(Pid, invalid)
end,
{stop, normal, StateData};
_ ->
wait_for_verification(closed, StateData) ->
{stop, normal, StateData}.
-session_established({xmlstreamelement, El}, StateData) ->
+
+stream_established({xmlstreamelement, El}, StateData) ->
{xmlelement, Name, Attrs, Els} = El,
% TODO
- FromJID = {StateData#state.server},
+ From = xml:get_attr_s("from", Attrs),
+ FromJID1 = jlib:string_to_jid(From),
+ FromJID = case FromJID1 of
+ {Node, Server, Resource} ->
+ if Server == StateData#state.server -> FromJID1;
+ true -> error
+ end;
+ _ -> error
+ end,
To = xml:get_attr_s("to", Attrs),
ToJID = case To of
- "" ->
- {"", StateData#state.server, ""};
- _ ->
- jlib:string_to_jid(To)
+ "" -> error;
+ _ -> jlib:string_to_jid(To)
end,
- case ToJID of
- error ->
- % TODO
- error;
- _ ->
- %?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]),
- ejabberd_router:route(FromJID, ToJID, El)
+ if ((Name == "iq") or (Name == "message") or (Name == "presence")) and
+ (ToJID /= error) and (FromJID /= error) ->
+ ejabberd_router:route(FromJID, ToJID, El);
+ true ->
+ error
end,
- {next_state, session_established, StateData};
+ {next_state, stream_established, StateData};
-session_established(closed, StateData) ->
+stream_established({xmlstreamend, Name}, StateData) ->
+ % TODO
+ {stop, normal, StateData};
+
+stream_established(closed, StateData) ->
% TODO
{stop, normal, StateData}.
handle_info({send_text, Text}, StateName, StateData) ->
send_text(StateData#state.socket, Text),
{next_state, StateName, StateData};
+handle_info({send_element, El}, StateName, StateData) ->
+ case StateName of
+ stream_established ->
+ send_element(StateData#state.socket, El),
+ {next_state, StateName, StateData};
+ _ ->
+ Q = queue:in(El, StateData#state.queue),
+ {next_state, StateName, StateData#state{queue = Q}}
+ end;
handle_info({tcp, Socket, Data}, StateName, StateData) ->
xml_stream:send_text(StateData#state.xmlpid, Data),
{next_state, StateName, StateData};
handle_info({tcp_closed, Socket}, StateName, StateData) ->
- self() ! closed,
+ gen_fsm:send_event(self(), closed),
{next_state, StateName, StateData};
handle_info({tcp_error, Socket, Reason}, StateName, StateData) ->
- self() ! closed,
+ gen_fsm:send_event(self(), closed),
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StateData) ->
-% case StateData#state.user of
-% "" ->
-% ok;
-% _ ->
-% %ejabberd_sm:close_session(StateData#state.user,
-% % StateData#state.resource)
-% end,
- gen_tcp:close(StateData#state.socket),
+ ?DEBUG("s2s_out: terminate ~p~n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!~n", [[Reason, StateName, StateData]]),
+ case StateData#state.type of
+ {new, Key} ->
+ ejabberd_s2s ! {closed_conection, StateData#state.server};
+ _ ->
+ ok
+ end,
+ case StateData#state.socket of
+ undefined ->
+ ok;
+ Socket ->
+ gen_tcp:close(Socket)
+ end,
ok.
%%%----------------------------------------------------------------------
send_element(Socket, El) ->
send_text(Socket, xml:element_to_string(El)).
+send_queue(Socket, Q) ->
+ case queue:out(Q) of
+ {{value, El}, Q1} ->
+ send_element(Socket, El),
+ send_queue(Socket, Q1);
+ {empty, Q1} ->
+ ok
+ end.
+
new_id() ->
lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])).
+bounce_messages(Reason) ->
+ receive
+ {send_element, El} ->
+ {xmlelement, Name, Attrs, SubTags} = El,
+ case xml:get_attr_s("type", Attrs) of
+ "error" ->
+ ok;
+ _ ->
+ Err = jlib:make_error_reply(El,
+ "502", Reason),
+ From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)),
+ To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)),
+ ejabberd_router ! {route, To, From, Err}
+ end,
+ bounce_messages(Reason)
+ after 0 ->
+ ok
+ end.
is_key_packet({xmlelement, Name, Attrs, Els}) when Name == "db:result" ->
{key,
is_verify_res(_) ->
false.
-get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) ->
- CData = xml:get_cdata(Els),
- case Name of
- "username" ->
- get_auth_tags(L, CData, P, D, R);
- "password" ->
- get_auth_tags(L, U, CData, D, R);
- "digest" ->
- get_auth_tags(L, U, P, CData, R);
- "resource" ->
- get_auth_tags(L, U, P, D, CData);
- _ ->
- get_auth_tags(L, U, P, D, R)
- end;
-get_auth_tags([_ | L], U, P, D, R) ->
- get_auth_tags(L, U, P, D, R);
-get_auth_tags([], U, P, D, R) ->
- {U, P, D, R}.
-
-
-
-
{local_content, true},
{attributes, record_info(fields, mysession)}]),
mnesia:subscribe(system),
- %ejabberd_router:register_local_route("localhost"),
loop().
loop() ->