]> granicus.if.org Git - ejabberd/commitdiff
* src/ejabberd_sm.erl: Partially rewritten to work more
authorAlexey Shchepin <alexey@process-one.net>
Mon, 23 Jan 2006 23:13:06 +0000 (23:13 +0000)
committerAlexey Shchepin <alexey@process-one.net>
Mon, 23 Jan 2006 23:13:06 +0000 (23:13 +0000)
efficiently and avoid race conditions
* src/ejabberd_c2s.erl: Likewise

* src/mod_irc/mod_irc_connection.erl: Cleanup

SVN Revision: 488

ChangeLog
src/ejabberd_c2s.erl
src/ejabberd_sm.erl
src/mod_irc/mod_irc_connection.erl

index 2e657bcecf8d29349c000ef5ad4d3760861ce08a..c485c679688a176d7f3e3f929dcac8162c8fe911 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,9 +1,22 @@
+<<<<<<< .mine
+2006-01-23  Alexey Shchepin  <alexey@sevcom.net>
+
+       * src/ejabberd_sm.erl: Partially rewritten to work more
+       efficiently and avoid race conditions
+       * src/ejabberd_c2s.erl: Likewise
+
+2006-01-21  Alexey Shchepin  <alexey@sevcom.net>
+
+       * src/mod_irc/mod_irc_connection.erl: Cleanup
+
+=======
 2006-01-20  Mickael Remond  <mickael.remond@process-one.net>
 
        * src/ejabberd_receiver.erl: Added new debugging trace: It is now
        possible to dump the XML stream received from a client (usefull for
         client debugging).
 
+>>>>>>> .r487
 2006-01-19  Alexey Shchepin  <alexey@sevcom.net>
 
        * src/aclocal.m4: Updated for zlib support
index 4a21772cb9223c6634ab09b3b156fd5c45864ab8..636cd96a2563ab2a19e4a21f5b96e2fefb1e28c7 100644 (file)
@@ -54,6 +54,7 @@
                authenticated = false,
                jid,
                user = "", server = ?MYNAME, resource = "",
+               sid,
                pres_t = ?SETS:new(),
                pres_f = ?SETS:new(),
                pres_a = ?SETS:new(),
@@ -372,8 +373,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
                               "(~w) Accepted legacy authentication for ~s",
                               [StateData#state.socket,
                                jlib:jid_to_string(JID)]),
+                           SID = {now(), self()},
                            ejabberd_sm:open_session(
-                             U, StateData#state.server, R),
+                             SID, U, StateData#state.server, R),
                            Res1 = jlib:make_result_iq_reply(El),
                            Res = setelement(4, Res1, []),
                            send_element(StateData, Res),
@@ -397,6 +399,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
                             StateData#state{user = U,
                                             resource = R,
                                             jid = JID,
+                                            sid = SID,
                                             pres_f = ?SETS:from_list(Fs1),
                                             pres_t = ?SETS:from_list(Ts1),
                                             privacy_list = PrivList}};
@@ -664,8 +667,9 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
                    ?INFO_MSG("(~w) Opened session for ~s",
                              [StateData#state.socket,
                               jlib:jid_to_string(JID)]),
+                   SID = {now(), self()},
                    ejabberd_sm:open_session(
-                     U, StateData#state.server, R),
+                     SID, U, StateData#state.server, R),
                    Res = jlib:make_result_iq_reply(El),
                    send_element(StateData, Res),
                    change_shaper(StateData, JID),
@@ -684,7 +688,8 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
                            PL -> PL
                        end,
                    {next_state, session_established,
-                    StateData#state{pres_f = ?SETS:from_list(Fs1),
+                    StateData#state{sid = SID,
+                                    pres_f = ?SETS:from_list(Fs1),
                                     pres_t = ?SETS:from_list(Ts1),
                                     privacy_list = PrivList}};
                _ ->
@@ -1037,10 +1042,12 @@ terminate(_Reason, StateName, StateData) ->
                              [{"type", "unavailable"}],
                              [{xmlelement, "status", [],
                                [{xmlcdata, "Replaced by new connection"}]}]},
-                   ejabberd_sm:unset_presence(StateData#state.user,
-                                              StateData#state.server,
-                                              StateData#state.resource,
-                                              "Replaced by new connection"),
+                   ejabberd_sm:close_session_unset_presence(
+                     StateData#state.sid,
+                     StateData#state.user,
+                     StateData#state.server,
+                     StateData#state.resource,
+                     "Replaced by new connection"),
                    presence_broadcast(
                      StateData, From, StateData#state.pres_a, Packet),
                    presence_broadcast(
@@ -1049,25 +1056,24 @@ terminate(_Reason, StateName, StateData) ->
                    ?INFO_MSG("(~w) Close session for ~s",
                              [StateData#state.socket,
                               jlib:jid_to_string(StateData#state.jid)]),
-                   ejabberd_sm:close_session(StateData#state.user,
-                                             StateData#state.server,
-                                             StateData#state.resource),
 
-                   Tmp = ?SETS:new(),
+                   EmptySet = ?SETS:new(),
                    case StateData of
                        #state{pres_last = undefined,
-                              pres_a = Tmp,
-                              pres_i = Tmp,
+                              pres_a = EmptySet,
+                              pres_i = EmptySet,
                               pres_invis = false} ->
-                           ok;
+                           ejabberd_sm:close_session(StateData#state.sid);
                        _ ->
                            From = StateData#state.jid,
                            Packet = {xmlelement, "presence",
                                      [{"type", "unavailable"}], []},
-                           ejabberd_sm:unset_presence(StateData#state.user,
-                                                      StateData#state.server,
-                                                      StateData#state.resource,
-                                                      ""),
+                           ejabberd_sm:close_session_unset_presence(
+                             StateData#state.sid,
+                             StateData#state.user,
+                             StateData#state.server,
+                             StateData#state.resource,
+                             ""),
                            presence_broadcast(
                              StateData, From, StateData#state.pres_a, Packet),
                            presence_broadcast(
@@ -1189,7 +1195,8 @@ presence_update(From, Packet, StateData) ->
                         StatusTag ->
                            xml:get_tag_cdata(StatusTag)
                     end,
-           ejabberd_sm:unset_presence(StateData#state.user,
+           ejabberd_sm:unset_presence(StateData#state.sid,
+                                      StateData#state.user,
                                       StateData#state.server,
                                       StateData#state.resource,
                                       Status),
@@ -1493,7 +1500,8 @@ update_priority(El, StateData) ->
                          0
                  end
          end,
-    ejabberd_sm:set_presence(StateData#state.user,
+    ejabberd_sm:set_presence(StateData#state.sid,
+                            StateData#state.user,
                             StateData#state.server,
                             StateData#state.resource,
                             Pri).
index 79876dd35da89a05ab25ab42aef469e60a01d745..34d2fec47a306a56f1d4d433af4089ac93c59b60 100644 (file)
 
 -export([start_link/0, init/0,
         route/3,
-        open_session/3, close_session/3,
+        open_session/4, close_session/1,
         bounce_offline_message/3,
         disconnect_removed_user/2,
         get_user_resources/2,
-        set_presence/4,
-        unset_presence/4,
+        set_presence/5,
+        unset_presence/5,
+        close_session_unset_presence/5,
         dirty_get_sessions_list/0,
         dirty_get_my_sessions_list/0,
         get_vh_session_list/1,
@@ -29,8 +30,7 @@
 -include("ejabberd.hrl").
 -include("jlib.hrl").
 
--record(session, {usr, us, pid}).
--record(presence, {usr, us, priority}).
+-record(session, {sid, usr, us, priority}).
 
 start_link() ->
     Pid = proc_lib:spawn_link(ejabberd_sm, init, []),
@@ -39,14 +39,12 @@ start_link() ->
 
 init() ->
     update_tables(),
-    mnesia:create_table(session, [{ram_copies, [node()]},
-                                 {attributes, record_info(fields, session)}]),
+    mnesia:create_table(session,
+                       [{ram_copies, [node()]},
+                        {attributes, record_info(fields, session)}]),
+    mnesia:add_table_index(session, usr),
     mnesia:add_table_index(session, us),
     mnesia:add_table_copy(session, node(), ram_copies),
-    mnesia:create_table(presence,
-                       [{ram_copies, [node()]},
-                        {attributes, record_info(fields, presence)}]),
-    mnesia:add_table_index(presence, us),
     mnesia:subscribe(system),
     ets:new(sm_iqtable, [named_table]),
     lists:foreach(
@@ -101,59 +99,57 @@ route(From, To, Packet) ->
            ok
     end.
 
-open_session(User, Server, Resource) ->
-    register_connection(User, Server, Resource, self()).
-
-close_session(User, Server, Resource) ->
-    remove_connection(User, Server, Resource).
+open_session(SID, User, Server, Resource) ->
+    set_session(SID, User, Server, Resource, undefined).
 
-
-register_connection(User, Server, Resource, Pid) ->
+set_session(SID, User, Server, Resource, Priority) ->
     LUser = jlib:nodeprep(User),
     LServer = jlib:nameprep(Server),
     LResource = jlib:resourceprep(Resource),
     US = {LUser, LServer},
     USR = {LUser, LServer, LResource},
     F = fun() ->
-               Ss = mnesia:wread({session, USR}),
-               mnesia:write(#session{usr = USR, us = US, pid = Pid}),
-               Ss
-        end,
-    case mnesia:transaction(F) of
-       {atomic, Ss} ->
+               mnesia:write(#session{sid = SID,
+                                     usr = USR,
+                                     us = US,
+                                     priority = Priority})
+       end,
+    mnesia:sync_dirty(F),
+    SIDs = mnesia:dirty_select(
+            session,
+            [{#session{sid = '$1', usr = USR, _ = '_'}, [], ['$1']}]),
+    if
+       SIDs == [] ->
+           ok;
+       true ->
+           MaxSID = lists:max(SIDs),
            lists:foreach(
-             fun(R) ->
-                     R#session.pid ! replaced
-             end, Ss);
-       _ ->
-           false
+             fun({_, Pid} = S) when S /= MaxSID ->
+                     Pid ! replaced;
+                (_) ->
+                     ok
+             end, SIDs)
     end.
 
-
-remove_connection(User, Server, Resource) ->
-    LUser = jlib:nodeprep(User),
-    LResource = jlib:resourceprep(Resource),
-    LServer = jlib:nameprep(Server),
-    USR = {LUser, LServer, LResource},
+close_session(SID) ->
     F = fun() ->
-               mnesia:delete({session, USR})
+               mnesia:delete({session, SID})
         end,
-    mnesia:transaction(F).
+    mnesia:sync_dirty(F).
 
 
 clean_table_from_bad_node(Node) ->
     F = fun() ->
                Es = mnesia:select(
                       session,
-                      [{#session{pid = '$1', _ = '_'},
+                      [{#session{sid = {'_', '$1'}, _ = '_'},
                         [{'==', {node, '$1'}, Node}],
                         ['$_']}]),
                lists:foreach(fun(E) ->
-                                     mnesia:delete_object(E),
-                                     mnesia:delete({presence, E#session.usr})
+                                     mnesia:delete_object(E)
                              end, Es)
         end,
-    mnesia:transaction(F).
+    mnesia:sync_dirty(F).
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
@@ -257,7 +253,7 @@ do_route(From, To, Packet) ->
            end;
        _ ->
            USR = {LUser, LServer, LResource},
-           case mnesia:dirty_read({session, USR}) of
+           case mnesia:dirty_index_read(session, USR, #session.usr) of
                [] ->
                    case Name of
                        "message" ->
@@ -275,8 +271,9 @@ do_route(From, To, Packet) ->
                        _ ->
                            ?DEBUG("packet droped~n", [])
                    end;
-               [Sess] ->
-                   Pid = Sess#session.pid,
+               Ss ->
+                   Session = lists:max(Ss),
+                   Pid = element(2, Session#session.sid),
                    ?DEBUG("sending to process ~p~n", [Pid]),
                    Pid ! {route, From, To, Packet}
            end
@@ -290,11 +287,12 @@ route_message(From, To, Packet) ->
                           Priority >= 0 ->
            LResource = jlib:resourceprep(R),
            USR = {LUser, LServer, LResource},
-           case mnesia:dirty_read({session, USR}) of
+           case mnesia:dirty_index_read(session, USR, #session.usr) of
                [] ->
                    ok;                         % Race condition
-               [Sess] ->
-                   Pid = Sess#session.pid,
+               Ss ->
+                   Session = lists:max(Ss),
+                   Pid = element(2, Session#session.sid),
                    ?DEBUG("sending to process ~p~n", [Pid]),
                    Pid ! {route, From, To, Packet}
            end;
@@ -337,53 +335,67 @@ get_user_resources(User, Server) ->
     case catch mnesia:dirty_index_read(session, US, #session.us) of
        {'EXIT', _Reason} ->
            [];
-       Rs ->
-           lists:map(fun(R) ->
-                             element(3, R#session.usr)
-                     end, Rs)
+       Ss ->
+           [element(3, S#session.usr) || S <- clean_session_list(Ss)]
+    end.
+
+clean_session_list(Ss) ->
+    clean_session_list(lists:keysort(#session.usr, Ss), []).
+
+clean_session_list([], Res) ->
+    Res;
+clean_session_list([S], Res) ->
+    [S | Res];
+clean_session_list([S1, S2 | Rest], Res) ->
+    if
+       S1#session.usr == S2#session.usr ->
+           if
+               S1#session.sid > S2#session.sid ->
+                   clean_session_list([S1 | Rest], Res);
+               true ->
+                   clean_session_list([S2 | Rest], Res)
+           end;
+       true ->
+           clean_session_list([S2 | Rest], [S1 | Res])
     end.
 
 
 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
-set_presence(User, Server, Resource, Priority) ->
-    LUser = jlib:nodeprep(User),
-    LServer = jlib:nameprep(Server),
-    USR = {User, Server, Resource},
-    US = {LUser, LServer},
-    F = fun() ->
-               mnesia:write(#presence{usr = USR, us = US,
-                                      priority = Priority})
-       end,
-    mnesia:transaction(F).
+set_presence(SID, User, Server, Resource, Priority) ->
+    set_session(SID, User, Server, Resource, Priority).
 
-unset_presence(User, Server, Resource, Status) ->
-    USR = {User, Server, Resource},
-    F = fun() ->
-               mnesia:delete({presence, USR})
-       end,
-    mnesia:transaction(F),
+unset_presence(SID, User, Server, Resource, Status) ->
+    set_session(SID, User, Server, Resource, undefined),
+    ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server),
+                      [User, Server, Resource, Status]).
+
+close_session_unset_presence(SID, User, Server, Resource, Status) ->
+    close_session(SID),
     ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server),
                       [User, Server, Resource, Status]).
 
 get_user_present_resources(LUser, LServer) ->
     US = {LUser, LServer},
-    case catch mnesia:dirty_index_read(presence, US, #presence.us) of
+    case catch mnesia:dirty_index_read(session, US, #session.us) of
        {'EXIT', _Reason} ->
            [];
-       Rs ->
-           lists:map(fun(R) ->
-                             {R#presence.priority, element(3, R#presence.usr)}
-                     end, Rs)
+       Ss ->
+           [{S#session.priority, element(3, S#session.usr)} ||
+               S <- clean_session_list(Ss), is_integer(S#session.priority)]
     end.
 
 dirty_get_sessions_list() ->
-    mnesia:dirty_all_keys(session).
+    mnesia:dirty_select(
+      session,
+      [{#session{usr = '$1', _ = '_'},
+       [],
+       ['$1']}]).
 
 dirty_get_my_sessions_list() ->
     mnesia:dirty_select(
       session,
-      [{#session{pid = '$1', _ = '_'},
+      [{#session{sid = {'_', '$1'}, _ = '_'},
        [{'==', {node, '$1'}, node()}],
        ['$_']}]).
 
@@ -447,16 +459,16 @@ update_tables() ->
        [ur, user, pid] ->
            mnesia:delete_table(session);
        [usr, us, pid] ->
+           mnesia:delete_table(session);
+       [sid, usr, us, priority] ->
            ok;
        {'EXIT', _} ->
            ok
     end,
-    case catch mnesia:table_info(presence, attributes) of
-       [ur, user, priority] ->
+    case lists:member(presence, mnesia:system_info(tables)) of
+       true ->
            mnesia:delete_table(presence);
-       [usr, us, priority] ->
-           ok;
-       {'EXIT', _} ->
+       false ->
            ok
     end,
     case lists:member(local_session, mnesia:system_info(tables)) of
index aae1decdc65073b3c9d1d9a067b08210d7dddf37..8343a6f922d056150868d6b849b3e552a677c5ab 100644 (file)
@@ -13,7 +13,7 @@
 -behaviour(gen_fsm).
 
 %% External exports
--export([start/5, receiver/2, route_chan/4, route_nick/3]).
+-export([start/5, route_chan/4, route_nick/3]).
 
 %% gen_fsm callbacks
 -export([init/1,
@@ -31,7 +31,7 @@
 
 -define(SETS, gb_sets).
 
--record(state, {socket, encoding, receiver, queue,
+-record(state, {socket, encoding, queue,
                user, host, server, nick,
                channels = dict:new(),
                inbuf = "", outbuf = ""}).
@@ -523,21 +523,6 @@ terminate(Reason, StateName, StateData) ->
 %%% Internal functions
 %%%----------------------------------------------------------------------
 
-receiver(Socket, C2SPid) ->
-    XMLStreamPid = xml_stream:start(C2SPid),
-    receiver(Socket, C2SPid, XMLStreamPid).
-
-receiver(Socket, C2SPid, XMLStreamPid) ->
-    case gen_tcp:recv(Socket, 0) of
-        {ok, Text} ->
-           xml_stream:send_text(XMLStreamPid, Text),
-           receiver(Socket, C2SPid, XMLStreamPid);
-        {error, Reason} ->
-           exit(XMLStreamPid, closed),
-           gen_fsm:send_event(C2SPid, closed),
-           ok
-    end.
-
 send_text(#state{socket = Socket, encoding = Encoding}, Text) ->
     CText = iconv:convert("utf-8", Encoding, lists:flatten(Text)),
     %?DEBUG("IRC OUTu: ~s~nIRC OUTk: ~s~n", [Text, CText]),