]> granicus.if.org Git - ejabberd/commitdiff
Improve SQL timeouts handling
authorEvgeny Khramtsov <ekhramtsov@process-one.net>
Wed, 31 Jul 2019 07:39:53 +0000 (10:39 +0300)
committerEvgeny Khramtsov <ekhramtsov@process-one.net>
Wed, 31 Jul 2019 07:39:53 +0000 (10:39 +0300)
Also improve some formatting

src/ejabberd_sql.erl

index b225a107a27e8c3f424497e41e982fff294d162b..d085d3ee496654913f53b55ef914c34dd14c64d2 100644 (file)
 -include("ejabberd_stacktrace.hrl").
 
 -record(state,
-       {db_ref = self()                     :: pid(),
-        db_type = odbc                      :: pgsql | mysql | sqlite | odbc | mssql,
-        db_version = undefined              :: undefined | non_neg_integer(),
-        host = <<"">>                       :: binary(),
-        pending_requests                    :: p1_queue:queue()}).
+       {db_ref               :: undefined | pid(),
+        db_type = odbc       :: pgsql | mysql | sqlite | odbc | mssql,
+        db_version           :: undefined | non_neg_integer(),
+        host                 :: binary(),
+        pending_requests     :: p1_queue:queue(),
+        overload_reported    :: undefined | integer()}).
 
 -define(STATE_KEY, ejabberd_sql_state).
-
 -define(NESTING_KEY, ejabberd_sql_nesting_level).
-
 -define(TOP_LEVEL_TXN, 0).
-
 -define(MAX_TRANSACTION_RESTARTS, 10).
-
 -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]).
-
 -define(PREPARE_KEY, ejabberd_sql_prepare).
-
 %%-define(DBGFSM, true).
-
 -ifdef(DBGFSM).
-
 -define(FSMOPTS, [{debug, [trace]}]).
-
 -else.
-
 -define(FSMOPTS, []).
-
 -endif.
 
-%%%----------------------------------------------------------------------
-%%% API
-%%%----------------------------------------------------------------------
--spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
-start_link(Host, I) ->
-    Proc = binary_to_atom(get_worker_name(Host, I), utf8),
-    p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
-                     fsm_limit_opts() ++ ?FSMOPTS).
-
+-type state() :: #state{}.
 -type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} |
                            fun(() -> any()) | fun((atom(), _) -> any()).
 -type sql_query() :: sql_query_simple() |
@@ -119,8 +101,16 @@ start_link(Host, I) ->
                             {selected, [any()]} |
                            ok.
 
--spec sql_query(binary(), sql_query()) -> sql_query_result().
+%%%----------------------------------------------------------------------
+%%% API
+%%%----------------------------------------------------------------------
+-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
+start_link(Host, I) ->
+    Proc = binary_to_atom(get_worker_name(Host, I), utf8),
+    p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
+                     fsm_limit_opts() ++ ?FSMOPTS).
 
+-spec sql_query(binary(), sql_query()) -> sql_query_result().
 sql_query(Host, Query) ->
     sql_call(Host, {sql_query, Query}).
 
@@ -129,7 +119,6 @@ sql_query(Host, Query) ->
 -spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) ->
                              {atomic, any()} |
                              {aborted, any()}.
-
 sql_transaction(Host, Queries)
     when is_list(Queries) ->
     F = fun () ->
@@ -149,26 +138,27 @@ sql_transaction(Host, F) when is_function(F) ->
 sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}).
 
 sql_call(Host, Msg) ->
+    Timeout = query_timeout(Host),
     case get(?STATE_KEY) of
        undefined ->
            Proc = get_worker(Host),
-           sync_send_event(Proc, {sql_cmd, Msg,
-                                  erlang:monotonic_time(millisecond)},
-                           query_timeout(Host));
+           sync_send_event(Proc, {sql_cmd, Msg, current_time() + Timeout},
+                           Timeout);
        _State ->
            nested_op(Msg)
     end.
 
 keep_alive(Host, Proc) ->
-    case sync_send_event(Proc,
-                   {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
-                    erlang:monotonic_time(millisecond)},
-                   query_timeout(Host)) of
+    Timeout = query_timeout(Host),
+    case sync_send_event(
+          Proc,
+          {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout},
+          Timeout) of
        {selected,_,[[<<"1">>]]} ->
            ok;
        _Err ->
            ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]),
-           sync_send_event(Proc, force_timeout, query_timeout(Host))
+           sync_send_event(Proc, force_timeout, Timeout)
     end.
 
 sync_send_event(Proc, Msg, Timeout) ->
@@ -178,15 +168,14 @@ sync_send_event(Proc, Msg, Timeout) ->
     end.
 
 -spec sql_query_t(sql_query()) -> sql_query_result().
-
 %% This function is intended to be used from inside an sql_transaction:
 sql_query_t(Query) ->
     QRes = sql_query_internal(Query),
     case QRes of
-      {error, Reason} -> throw({aborted, Reason});
+      {error, Reason} -> restart(Reason);
       Rs when is_list(Rs) ->
          case lists:keysearch(error, 1, Rs) of
-           {value, {error, Reason}} -> throw({aborted, Reason});
+           {value, {error, Reason}} -> restart(Reason);
            _ -> QRes
          end;
       _ -> QRes
@@ -372,11 +361,9 @@ connecting(Event, State) ->
                 [Event]),
     {next_state, connecting, State}.
 
-connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
-           _Timestamp},
+connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp},
           From, State) ->
-    p1_fsm:reply(From,
-                    {error, <<"SQL connection failed">>}),
+    reply(From, {error, <<"SQL connection failed">>}, Timestamp),
     {next_state, connecting, State};
 connecting({sql_cmd, Command, Timestamp} = Req, From,
           State) ->
@@ -386,10 +373,11 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
        try p1_queue:in({sql_cmd, Command, From, Timestamp},
                        State#state.pending_requests)
        catch error:full ->
+               Err = <<"SQL request queue is overfilled">>,
+               ?ERROR_MSG("~s, bouncing all pending requests", [Err]),
                Q = p1_queue:dropwhile(
-                     fun({sql_cmd, _, To, _Timestamp}) ->
-                             p1_fsm:reply(
-                               To, {error, <<"SQL connection failed">>}),
+                     fun({sql_cmd, _, To, TS}) ->
+                             reply(To, {error, Err}, TS),
                              true
                      end, State#state.pending_requests),
                p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
@@ -399,16 +387,15 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
 connecting(Request, {Who, _Ref}, State) ->
     ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'",
                 [Request, Who]),
-    {reply, {error, badarg}, connecting, State}.
+    {next_state, connecting, State}.
 
 session_established({sql_cmd, Command, Timestamp}, From,
                    State) ->
     run_sql_cmd(Command, From, State, Timestamp);
 session_established(Request, {Who, _Ref}, State) ->
-    ?WARNING_MSG("Unexpected call ~p from ~p in 'session_establ"
-                "ished'",
+    ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'",
                 [Request, Who]),
-    {reply, {error, badarg}, session_established, State}.
+    {next_state, session_established, State}.
 
 session_established({sql_cmd, Command, From, Timestamp},
                    State) ->
@@ -465,17 +452,14 @@ handle_reconnect(Reason, #state{host = Host} = State) ->
     {next_state, connecting, State}.
 
 run_sql_cmd(Command, From, State, Timestamp) ->
-    QueryTimeout = query_timeout(State#state.host),
-    case erlang:monotonic_time(millisecond) - Timestamp of
-      Age when Age < QueryTimeout ->
-         put(?NESTING_KEY, ?TOP_LEVEL_TXN),
-         put(?STATE_KEY, State),
-         abort_on_driver_error(outer_op(Command), From);
-      Age ->
-         ?ERROR_MSG("Database was not available or too slow, "
-                    "discarding ~p milliseconds old request~n~p~n",
-                    [Age, Command]),
-         {next_state, session_established, State}
+    case current_time() >= Timestamp of
+       true ->
+           State1 = report_overload(State),
+           {next_state, session_established, State1};
+       false ->
+           put(?NESTING_KEY, ?TOP_LEVEL_TXN),
+           put(?STATE_KEY, State),
+           abort_on_driver_error(outer_op(Command), From, Timestamp)
     end.
 
 %% Only called by handle_call, only handles top level operations.
@@ -620,6 +604,8 @@ sql_query_internal(#sql_query{} = Query) ->
                {error, <<"killed">>};
              exit:{normal, _} ->
                {error, <<"terminated unexpectedly">>};
+             exit:{shutdown, _} ->
+               {error, <<"shutdown">>};
              ?EX_RULE(Class, Reason, Stack) ->
                StackTrace = ?EX_STACK(Stack),
                 ?ERROR_MSG("Internal error while processing SQL query:~n** ~s",
@@ -779,30 +765,42 @@ sql_query_to_iolist(SQLQuery) ->
     generic_sql_query_format(SQLQuery).
 
 %% Generate the OTP callback return tuple depending on the driver result.
-abort_on_driver_error({error,
-                      <<"query timed out">>} = Reply,
-                     From) ->
-    p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) ->
+    reply(From, Reply, Timestamp),
     {stop, timeout, get(?STATE_KEY)};
-abort_on_driver_error({error,
-                      <<"Failed sending data on socket", _/binary>>} = Reply,
-                     From) ->
-    p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply,
+                     From, Timestamp) ->
+    reply(From, Reply, Timestamp),
     {stop, closed, get(?STATE_KEY)};
-abort_on_driver_error({error,
-                      <<"SQL connection failed">>} = Reply,
-                     From) ->
-    p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) ->
+    reply(From, Reply, Timestamp),
     {stop, timeout, get(?STATE_KEY)};
-abort_on_driver_error({error,
-                      <<"Communication link failure">>} = Reply,
-                     From) ->
-    p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) ->
+    reply(From, Reply, Timestamp),
     {stop, closed, get(?STATE_KEY)};
-abort_on_driver_error(Reply, From) ->
-    p1_fsm:reply(From, Reply),
+abort_on_driver_error(Reply, From, Timestamp) ->
+    reply(From, Reply, Timestamp),
     {next_state, session_established, get(?STATE_KEY)}.
 
+-spec report_overload(state()) -> state().
+report_overload(#state{overload_reported = PrevTime} = State) ->
+    CurrTime = current_time(),
+    case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of
+       true ->
+           ?ERROR_MSG("SQL connection pool is overloaded, "
+                      "discarding stale requests", []),
+           State#state{overload_reported = current_time()};
+       false ->
+           State
+    end.
+
+-spec reply({pid(), term()}, term(), integer()) -> term().
+reply(From, Reply, Timestamp) ->
+    case current_time() >= Timestamp of
+       true -> ok;
+       false -> p1_fsm:reply(From, Reply)
+    end.
+
 %% == pure ODBC code
 
 %% part of init/1
@@ -1143,6 +1141,9 @@ fsm_limit_opts() ->
 query_timeout(LServer) ->
     ejabberd_option:sql_query_timeout(LServer).
 
+current_time() ->
+    erlang:monotonic_time(millisecond).
+
 %% ***IMPORTANT*** This error format requires extended_errors turned on.
 extended_error({"08S01", _, Reason}) ->
     % TCP Provider: The specified network name is no longer available