]> granicus.if.org Git - ejabberd/commitdiff
Merge 1855 from trunk.
authorBadlop <badlop@process-one.net>
Fri, 6 Mar 2009 11:42:56 +0000 (11:42 +0000)
committerBadlop <badlop@process-one.net>
Fri, 6 Mar 2009 11:42:56 +0000 (11:42 +0000)
* src/eldap/eldap.erl: implemented queue for pending
queries (thanks to Evgeniy Khramtsov)

SVN Revision: 1973

ChangeLog
src/eldap/eldap.erl

index 1ab428396c0c0732a37f4e015d23c46aec94a4aa..ff3490c77f66d92fadfc4a31225b44244891a269 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,8 @@
 2009-03-06  Badlop  <badlop@process-one.net>
 
+       * src/eldap/eldap.erl: implemented queue for pending
+       queries (thanks to Evgeniy Khramtsov)
+
        * src/eldap/eldap.erl: Close a connection on tcp_error (thanks to
        Evgeniy Khramtsov)
 
index 799012fc54da9fe7bb9066872b3d468e6957db49..24e234cf70677228a5d66f1a2570a290ff2ddf37 100644 (file)
 -define(RETRY_TIMEOUT, 500).
 -define(BIND_TIMEOUT, 10000).
 -define(CMD_TIMEOUT, 100000).
+%% Used in gen_fsm sync calls.
+-define(CALL_TIMEOUT, ?CMD_TIMEOUT + ?BIND_TIMEOUT + ?RETRY_TIMEOUT).
+%% Used as a timeout for gen_tcp:send/2
+-define(SEND_TIMEOUT, 30000).
 -define(MAX_TRANSACTION_ID, 65535).
 -define(MIN_TRANSACTION_ID, 0).
 
                id = 0,        % LDAP Request ID 
                bind_timer,    % Ref to bind timeout
                dict,          % dict holding operation params and results
-               bind_q         % Queue for bind() requests
+               req_q          % Queue for requests
        }).
 
 %%%----------------------------------------------------------------------
@@ -141,7 +145,8 @@ close(Handle) ->
 %%% --------------------------------------------------------------------
 add(Handle, Entry, Attributes) when list(Entry),list(Attributes) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}).
+    gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)},
+                           ?CALL_TIMEOUT).
 
 %%% Do sanity check !
 add_attrs(Attrs) ->
@@ -166,7 +171,7 @@ add_attrs(Attrs) ->
 %%% --------------------------------------------------------------------
 delete(Handle, Entry) when list(Entry) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {delete, Entry}).
+    gen_fsm:sync_send_event(Handle1, {delete, Entry}, ?CALL_TIMEOUT).
 
 %%% --------------------------------------------------------------------
 %%% Modify an entry. Given an entry a number of modification
@@ -181,7 +186,7 @@ delete(Handle, Entry) when list(Entry) ->
 %%% --------------------------------------------------------------------
 modify(Handle, Object, Mods) when list(Object), list(Mods) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}).
+    gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}, ?CALL_TIMEOUT).
 
 %%%
 %%% Modification operations. 
@@ -214,7 +219,10 @@ m(Operation, Type, Values) ->
 modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup) 
   when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}).
+    gen_fsm:sync_send_event(
+      Handle1,
+      {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)},
+      ?CALL_TIMEOUT).
 
 
 %%% --------------------------------------------------------------------
@@ -228,7 +236,7 @@ modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
 bind(Handle, RootDN, Passwd) 
   when list(RootDN),list(Passwd) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, infinity).
+    gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, ?CALL_TIMEOUT).
 
 %%% Sanity checks !
 
@@ -273,7 +281,7 @@ search(Handle, L) when list(L) ->
 
 call_search(Handle, A) ->
     Handle1 = get_handle(Handle),
-    gen_fsm:sync_send_event(Handle1, {search, A}, infinity).
+    gen_fsm:sync_send_event(Handle1, {search, A}, ?CALL_TIMEOUT).
 
 parse_search_args(Args) ->
     parse_search_args(Args, #eldap_search{scope = wholeSubtree}).
@@ -382,7 +390,7 @@ init({Hosts, Port, Rootdn, Passwd}) ->
                            passwd = Passwd,
                            id = 0,
                            dict = dict:new(),
-                           bind_q = queue:new()}, 0}.
+                           req_q = queue:new()}, 0}.
 
 %%----------------------------------------------------------------------
 %% Func: StateName/2
@@ -405,38 +413,20 @@ connecting(timeout, S) ->
 %%          {stop, Reason, NewStateData}                          |
 %%          {stop, Reason, Reply, NewStateData}                    
 %%----------------------------------------------------------------------
-connecting(_Event, _From, S) ->
-    Reply = {error, connecting},
-    {reply, Reply, connecting, S}.
+connecting(Event, From, S) ->
+    Q = queue:in({Event, From}, S#eldap.req_q),
+    {next_state, connecting, S#eldap{req_q=Q}}.
 
-wait_bind_response(_Event, _From, S) ->
-    Reply = {error, wait_bind_response},
-    {reply, Reply, wait_bind_response, S}.
+wait_bind_response(Event, From, S) ->
+    Q = queue:in({Event, From}, S#eldap.req_q),
+    {next_state, wait_bind_response, S#eldap{req_q=Q}}.
 
-active(Event, From, S) ->
-    case catch send_command(Event, From, S) of
-       {ok, NewS} ->
-           case Event of
-               {bind, _, _} ->
-                   {next_state, active_bind, NewS};
-               _ ->
-                   {next_state, active, NewS}
-           end;
-       {error, Reason} ->
-           {reply, {error, Reason}, active, S};
-       {'EXIT', Reason} ->
-           {reply, {error, Reason}, active, S}
-    end.
-
-active_bind({bind, RootDN, Passwd}, From, #eldap{bind_q=Q} = S) ->
-    NewQ = queue:in({{bind, RootDN, Passwd}, From}, Q),
-    {next_state, active_bind, S#eldap{bind_q=NewQ}};
 active_bind(Event, From, S) ->
-    case catch send_command(Event, From, S) of
-       {ok, NewS}       -> {next_state, active_bind, NewS};
-       {error, Reason}  -> {reply, {error, Reason}, active_bind, S};
-       {'EXIT', Reason} -> {reply, {error, Reason}, active_bind, S}
-    end.
+    Q = queue:in({Event, From}, S#eldap.req_q),
+    {next_state, active_bind, S#eldap{req_q=Q}}.
+
+active(Event, From, S) ->
+    process_command(S, Event, From).
 
 %%----------------------------------------------------------------------
 %% Func: handle_event/3
@@ -446,21 +436,8 @@ active_bind(Event, From, S) ->
 %%          {stop, Reason, NewStateData}                         
 %%----------------------------------------------------------------------
 handle_event(close, _StateName, S) ->
-    gen_tcp:close(S#eldap.fd),
-    {stop, closed, S};
-
-handle_event(process_bind_q, active_bind, #eldap{bind_q=Q} = S) ->
-    case queue:out(Q) of
-       {{value, {BindEvent, To}}, NewQ} ->
-           NewStateData = case catch send_command(BindEvent, To, S) of
-                              {ok, NewS}       -> NewS;
-                              {error, Reason}  -> gen_fsm:reply(To, {error, Reason}), S;
-                              {'EXIT', Reason} -> gen_fsm:reply(To, {error, Reason}), S
-                          end,
-           {next_state, active_bind, NewStateData#eldap{bind_q=NewQ}};
-       {empty, Q} ->
-           {next_state, active, S}
-    end;
+    catch gen_tcp:close(S#eldap.fd),
+    {stop, normal, S};
 
 handle_event(_Event, StateName, S) ->
     {next_state, StateName, S}.
@@ -489,50 +466,61 @@ handle_sync_event(_Event, _From, StateName, S) ->
 %% Packets arriving in various states
 %%
 handle_info({tcp, _Socket, Data}, connecting, S) ->
-    ?DEBUG("eldap. tcp packet received when disconnected!~n~p", [Data]),
+    ?DEBUG("tcp packet received when disconnected!~n~p", [Data]),
     {next_state, connecting, S};
 
 handle_info({tcp, _Socket, Data}, wait_bind_response, S) ->
     cancel_timer(S#eldap.bind_timer),
     case catch recvd_wait_bind_response(Data, S) of
-       bound                -> {next_state, active, S};
-       {fail_bind, _Reason}  -> close_and_retry(S),
-                               {next_state, connecting, S#eldap{fd = null}};
-       {'EXIT', _Reason}     -> close_and_retry(S),
-                               {next_state, connecting, S#eldap{fd = null}};
-       {error, _Reason}      -> close_and_retry(S),
-                               {next_state, connecting, S#eldap{fd = null}}
+       bound ->
+           dequeue_commands(S);
+       {fail_bind, _Reason} ->
+           {next_state, connecting, close_and_retry(S)};
+       {'EXIT', _Reason} ->
+           {next_state, connecting, close_and_retry(S)};
+       {error, _Reason} ->
+           {next_state, connecting, close_and_retry(S)}
     end;
 
 handle_info({tcp, _Socket, Data}, StateName, S)
-  when StateName==active; StateName==active_bind ->
+  when StateName == active orelse StateName == active_bind ->
     case catch recvd_packet(Data, S) of
-       {reply, Reply, To, NewS} -> gen_fsm:reply(To, Reply),
-                                   {next_state, StateName, NewS};
-       {ok, NewS}               -> {next_state, StateName, NewS};
-       {'EXIT', _Reason}         -> {next_state, StateName, S};
-       {error, _Reason}          -> {next_state, StateName, S}
+       {response, Response, RequestType} ->
+           NewS = case Response of
+                      {reply, Reply, To, S1} ->
+                          gen_fsm:reply(To, Reply),
+                          S1;
+                      {ok, S1} ->
+                          S1
+                  end,
+           if (StateName == active_bind andalso
+               RequestType == bindRequest) orelse
+              (StateName == active) ->
+                   dequeue_commands(NewS);
+              true ->
+                   {next_state, StateName, NewS}
+           end;
+       _ ->
+           {next_state, StateName, S}
     end;
 
 handle_info({tcp_closed, _Socket}, Fsm_state, S) ->
     ?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p",
          [S#eldap.host, S#eldap.port ,Fsm_state]),
-    {ok, NextState, NewS} = close_and_rebind(S, tcp_closed),
-    {next_state, NextState, NewS};
+    {next_state, connecting, close_and_retry(S)};
 
 handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) ->
     ?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]),
-    {ok, NextState, NewS} = close_and_rebind(S, tcp_error),
-    {next_state, NextState, NewS};
+    {next_state, connecting, close_and_retry(S)};
 
 %%
 %% Timers
 %%
-handle_info({timeout, Timer, {cmd_timeout, Id}}, active, S) ->
+handle_info({timeout, Timer, {cmd_timeout, Id}}, StateName, S) ->
     case cmd_timeout(Timer, Id, S) of
        {reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason),
-                                    {next_state, active, NewS};
-       {error, _Reason}           -> {next_state, active, S}
+                                    {next_state, StateName, NewS};
+       {error, _Reason}           -> {next_state, StateName, S}
     end;
 
 handle_info({timeout, retry_connect}, connecting, S) ->
@@ -540,8 +528,7 @@ handle_info({timeout, retry_connect}, connecting, S) ->
     {next_state, NextState, NewS};
 
 handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) ->
-    close_and_retry(S),
-    {next_state, connecting, S#eldap{fd = null}};
+    {next_state, connecting, close_and_retry(S)};
 
 %%
 %% Make sure we don't fill the message queue with rubbish
@@ -570,6 +557,34 @@ code_change(_OldVsn, StateName, S, _Extra) ->
 %%%----------------------------------------------------------------------
 %%% Internal functions
 %%%----------------------------------------------------------------------
+dequeue_commands(S) ->
+    case queue:out(S#eldap.req_q) of
+       {{value, {Event, From}}, Q} ->
+           case process_command(S#eldap{req_q=Q}, Event, From) of
+               {_, active, NewS} -> 
+                   dequeue_commands(NewS);
+               Res ->
+                   Res
+           end;
+       {empty, _} ->
+           {next_state, active, S}
+    end.
+
+process_command(S, Event, From) ->
+    case send_command(Event, From, S) of
+       {ok, NewS} ->
+           case Event of
+               {bind, _, _} ->
+                   {next_state, active_bind, NewS};
+               _ ->
+                   {next_state, active, NewS}
+           end;
+       {error, _Reason} ->
+           Q = queue:in_r({Event, From}, S#eldap.req_q),
+           NewS = close_and_retry(S#eldap{req_q=Q}),
+           {next_state, connecting, NewS}
+    end.
+
 send_command(Command, From, S) ->
     Id = bump_id(S),
     {Name, Request} = gen_req(Command),
@@ -640,6 +655,7 @@ recvd_packet(Pkt, S) ->
            Dict = S#eldap.dict,
            Id = Msg#'LDAPMessage'.messageID,
            {Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict),
+           Answer = 
            case {Name, Op} of
                {searchRequest, {searchResEntry, R}} when
                record(R,'SearchResultEntry') ->
@@ -687,14 +703,14 @@ recvd_packet(Pkt, S) ->
                    New_dict = dict:erase(Id, Dict),
                    cancel_timer(Timer),
                    Reply = check_bind_reply(Result, From),
-                   gen_fsm:send_all_state_event(self(), process_bind_q),
                    {reply, Reply, From, S#eldap{dict = New_dict}};
                {OtherName, OtherResult} ->
                    New_dict = dict:erase(Id, Dict),
                    cancel_timer(Timer),
                    {reply, {error, {invalid_result, OtherName, OtherResult}},
                     From, S#eldap{dict = New_dict}}
-           end;
+           end,
+           {response, Answer, Name};
        Error -> Error
     end.
 
@@ -775,13 +791,9 @@ check_tag(Data) ->
     end.
 
 close_and_retry(S) ->
-    gen_tcp:close(S#eldap.fd),
-    retry_connect().
-
-retry_connect() ->
-    erlang:send_after(?RETRY_TIMEOUT, self(),
-                     {timeout, retry_connect}).
-
+    catch gen_tcp:close(S#eldap.fd),
+    erlang:send_after(?RETRY_TIMEOUT, self(), {timeout, retry_connect}),
+    S#eldap{fd = null}.
 
 %%-----------------------------------------------------------------------
 %% Sort out timed out commands
@@ -832,7 +844,8 @@ polish([], Res, Ref) ->
 %%-----------------------------------------------------------------------
 connect_bind(S) ->
     Host = next_host(S#eldap.host, S#eldap.hosts),
-    TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, binary],
+    TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true},
+              {send_timeout, ?SEND_TIMEOUT}, binary],
     ?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]),
     case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of
        {ok, Socket} ->
@@ -844,15 +857,16 @@ connect_bind(S) ->
                                                        host = Host,
                                                        bind_timer = Timer}};
                {error, Reason} ->
-                   ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
-                   gen_tcp:close(Socket),
-                   retry_connect(),
-                   {ok, connecting, S#eldap{host = Host}}
+                   ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p",
+                              [Host, S#eldap.port, Reason]),
+                   NewS = close_and_retry(S),
+                   {ok, connecting, NewS#eldap{host = Host}}
            end;
        {error, Reason} ->
-           ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
-           retry_connect(),
-           {ok, connecting, S#eldap{host = Host}}
+           ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p",
+                      [Host, S#eldap.port, Reason]),
+           NewS = close_and_retry(S),
+           {ok, connecting, NewS#eldap{host = Host}}
     end.
 
 bind_request(Socket, S) ->
@@ -997,13 +1011,3 @@ bump_id(#eldap{id = Id}) when Id > ?MAX_TRANSACTION_ID ->
     ?MIN_TRANSACTION_ID;
 bump_id(#eldap{id = Id}) ->
     Id + 1.
-
-close_and_rebind(State, Err) ->
-    F = fun(_Id, [{Timer, From, _Name}|_]) ->
-               gen_fsm:reply(From, {error, Err}),
-               cancel_timer(Timer)
-       end,
-    dict:map(F, State#eldap.dict),
-    connect_bind(State#eldap{fd = null,
-                            dict = dict:new(),
-                            bind_q=queue:new()}).