]> granicus.if.org Git - postgresql/commitdiff
Fix a low-probability crash in our qsort implementation.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 17 Jul 2015 02:57:46 +0000 (22:57 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 17 Jul 2015 02:57:46 +0000 (22:57 -0400)
It's standard for quicksort implementations, after having partitioned the
input into two subgroups, to recurse to process the smaller partition and
then handle the larger partition by iterating.  This method guarantees
that no more than log2(N) levels of recursion can be needed.  However,
Bentley and McIlroy argued that checking to see which partition is smaller
isn't worth the cycles, and so their code doesn't do that but just always
recurses on the left partition.  In most cases that's fine; but with
worst-case input we might need O(N) levels of recursion, and that means
that qsort could be driven to stack overflow.  Such an overflow seems to
be the only explanation for today's report from Yiqing Jin of a SIGSEGV
in med3_tuple while creating an index of a couple billion entries with a
very large maintenance_work_mem setting.  Therefore, let's spend the few
additional cycles and lines of code needed to choose the smaller partition
for recursion.

Also, fix up the qsort code so that it properly uses size_t not int for
some intermediate values representing numbers of items.  This would only
be a live risk when sorting more than INT_MAX bytes (in qsort/qsort_arg)
or tuples (in qsort_tuple), which I believe would never happen with any
caller in the current core code --- but perhaps it could happen with
call sites in third-party modules?  In any case, this is trouble waiting
to happen, and the corrected code is probably if anything shorter and
faster than before, since it removes sign-extension steps that had to
happen when converting between int and size_t.

In passing, move a couple of CHECK_FOR_INTERRUPTS() calls so that it's
not necessary to preserve the value of "r" across them, and prettify
the output of gen_qsort_tuple.pl a little.

Back-patch to all supported branches.  The odds of hitting this issue
are probably higher in 9.4 and up than before, due to the new ability
to allocate sort workspaces exceeding 1GB, but there's no good reason
to believe that it's impossible to crash older branches this way.

src/backend/utils/sort/gen_qsort_tuple.pl
src/port/qsort.c
src/port/qsort_arg.c

index 18dd751b38272fd9eaf3abf844dfc7857b4df560..6186d0a5babda42aea2b72efb2fb50e890812aa0 100644 (file)
 #
 #      Modifications from vanilla NetBSD source:
 #        Add do ... while() macro fix
-#        Remove __inline, _DIAGASSERTs, __P
-#        Remove ill-considered "swap_cnt" switch to insertion sort,
-#        in favor of a simple check for presorted input.
-#     Instead of sorting arbitrary objects, we're always sorting SortTuples
-#     Add CHECK_FOR_INTERRUPTS()
+#        Remove __inline, _DIAGASSERTs, __P
+#        Remove ill-considered "swap_cnt" switch to insertion sort,
+#        in favor of a simple check for presorted input.
+#        Take care to recurse on the smaller partition, to bound stack usage.
+#
+#     Instead of sorting arbitrary objects, we're always sorting SortTuples.
+#     Add CHECK_FOR_INTERRUPTS().
 #
 # CAUTION: if you change this file, see also qsort.c and qsort_arg.c
 #
@@ -43,9 +45,11 @@ $EXTRAARGS   = ', SortSupport ssup';
 $EXTRAPARAMS = ', ssup';
 $CMPPARAMS   = ', ssup';
 print <<'EOM';
+
 #define cmp_ssup(a, b, ssup) \
        ApplySortComparator((a)->datum1, (a)->isnull1, \
                                                (b)->datum1, (b)->isnull1, ssup)
+
 EOM
 emit_qsort_implementation();
 
@@ -53,7 +57,8 @@ sub emit_qsort_boilerplate
 {
        print <<'EOM';
 /*
- * autogenerated by src/backend/utils/sort/gen_qsort_tuple.pl, do not edit
+ * autogenerated by src/backend/utils/sort/gen_qsort_tuple.pl, do not edit!
+ *
  * This file is included by tuplesort.c, rather than compiled separately.
  */
 
@@ -78,7 +83,7 @@ sub emit_qsort_boilerplate
  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.     IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
@@ -92,8 +97,16 @@ sub emit_qsort_boilerplate
  * Qsort routine based on J. L. Bentley and M. D. McIlroy,
  * "Engineering a sort function",
  * Software--Practice and Experience 23 (1993) 1249-1265.
+ *
  * We have modified their original by adding a check for already-sorted input,
  * which seems to be a win per discussions on pgsql-hackers around 2006-03-21.
+ *
+ * Also, we recurse on the smaller partition and iterate on the larger one,
+ * which ensures we cannot recurse more than log(N) levels (since the
+ * partition recursed to is surely no more than half of the input).  Bentley
+ * and McIlroy explicitly rejected doing this on the grounds that it's "not
+ * worth the effort", but we have seen crashes in the field due to stack
+ * overrun, so that judgment seems wrong.
  */
 
 static void
@@ -114,7 +127,8 @@ swapfunc(SortTuple *a, SortTuple *b, size_t n)
                *(b) = t;                                               \
        } while (0);
 
-#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n))
+#define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n)
+
 EOM
 }
 
@@ -141,8 +155,9 @@ qsort_$SUFFIX(SortTuple *a, size_t n$EXTRAARGS)
                           *pl,
                           *pm,
                           *pn;
-       int                     d,
-                               r,
+       size_t          d1,
+                               d2;
+       int                     r,
                                presorted;
 
 loop:
@@ -173,7 +188,8 @@ loop:
                pn = a + (n - 1);
                if (n > 40)
                {
-                       d = (n / 8);
+                       size_t          d = (n / 8);
+
                        pl = med3_$SUFFIX(pl, pl + d, pl + 2 * d$EXTRAPARAMS);
                        pm = med3_$SUFFIX(pm - d, pm, pm + d$EXTRAPARAMS);
                        pn = med3_$SUFFIX(pn - 2 * d, pn - d, pn$EXTRAPARAMS);
@@ -187,23 +203,23 @@ loop:
        {
                while (pb <= pc && (r = cmp_$SUFFIX(pb, a$CMPPARAMS)) <= 0)
                {
-                       CHECK_FOR_INTERRUPTS();
                        if (r == 0)
                        {
                                swap(pa, pb);
                                pa++;
                        }
                        pb++;
+                       CHECK_FOR_INTERRUPTS();
                }
                while (pb <= pc && (r = cmp_$SUFFIX(pc, a$CMPPARAMS)) >= 0)
                {
-                       CHECK_FOR_INTERRUPTS();
                        if (r == 0)
                        {
                                swap(pc, pd);
                                pd--;
                        }
                        pc--;
+                       CHECK_FOR_INTERRUPTS();
                }
                if (pb > pc)
                        break;
@@ -212,21 +228,39 @@ loop:
                pc--;
        }
        pn = a + n;
-       r = Min(pa - a, pb - pa);
-       vecswap(a, pb - r, r);
-       r = Min(pd - pc, pn - pd - 1);
-       vecswap(pb, pn - r, r);
-       if ((r = pb - pa) > 1)
-               qsort_$SUFFIX(a, r$EXTRAPARAMS);
-       if ((r = pd - pc) > 1)
+       d1 = Min(pa - a, pb - pa);
+       vecswap(a, pb - d1, d1);
+       d1 = Min(pd - pc, pn - pd - 1);
+       vecswap(pb, pn - d1, d1);
+       d1 = pb - pa;
+       d2 = pd - pc;
+       if (d1 <= d2)
        {
-               /* Iterate rather than recurse to save stack space */
-               a = pn - r;
-               n = r;
-               goto loop;
+               /* Recurse on left partition, then iterate on right partition */
+               if (d1 > 1)
+                       qsort_$SUFFIX(a, d1$EXTRAPARAMS);
+               if (d2 > 1)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* qsort_$SUFFIX(pn - d2, d2$EXTRAPARAMS); */
+                       a = pn - d2;
+                       n = d2;
+                       goto loop;
+               }
+       }
+       else
+       {
+               /* Recurse on right partition, then iterate on left partition */
+               if (d2 > 1)
+                       qsort_$SUFFIX(pn - d2, d2$EXTRAPARAMS);
+               if (d1 > 1)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* qsort_$SUFFIX(a, d1$EXTRAPARAMS); */
+                       n = d1;
+                       goto loop;
+               }
        }
-/*             qsort_$SUFFIX(pn - r, r$EXTRAPARAMS);*/
 }
-
 EOM
 }
index fa35b1b15387d1de5a005e2097e35f1d37e6b708..1a8ee08c8beea120e527a41bcb7574a54fe37ba7 100644 (file)
@@ -6,6 +6,7 @@
  *       Remove __inline, _DIAGASSERTs, __P
  *       Remove ill-considered "swap_cnt" switch to insertion sort,
  *       in favor of a simple check for presorted input.
+ *       Take care to recurse on the smaller partition, to bound stack usage.
  *
  *     CAUTION: if you change this file, see also qsort_arg.c, gen_qsort_tuple.pl
  *
@@ -54,9 +55,18 @@ static void swapfunc(char *, char *, size_t, int);
  * Qsort routine based on J. L. Bentley and M. D. McIlroy,
  * "Engineering a sort function",
  * Software--Practice and Experience 23 (1993) 1249-1265.
+ *
  * We have modified their original by adding a check for already-sorted input,
  * which seems to be a win per discussions on pgsql-hackers around 2006-03-21.
+ *
+ * Also, we recurse on the smaller partition and iterate on the larger one,
+ * which ensures we cannot recurse more than log(N) levels (since the
+ * partition recursed to is surely no more than half of the input).  Bentley
+ * and McIlroy explicitly rejected doing this on the grounds that it's "not
+ * worth the effort", but we have seen crashes in the field due to stack
+ * overrun, so that judgment seems wrong.
  */
+
 #define swapcode(TYPE, parmi, parmj, n) \
 do {           \
        size_t i = (n) / sizeof (TYPE);                 \
@@ -89,7 +99,7 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
        } else                                                  \
                swapfunc(a, b, es, swaptype)
 
-#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
+#define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n, swaptype)
 
 static char *
 med3(char *a, char *b, char *c, int (*cmp) (const void *, const void *))
@@ -109,8 +119,9 @@ pg_qsort(void *a, size_t n, size_t es, int (*cmp) (const void *, const void *))
                           *pl,
                           *pm,
                           *pn;
-       int                     d,
-                               r,
+       size_t          d1,
+                               d2;
+       int                     r,
                                swaptype,
                                presorted;
 
@@ -141,7 +152,8 @@ loop:SWAPINIT(a, es);
                pn = (char *) a + (n - 1) * es;
                if (n > 40)
                {
-                       d = (n / 8) * es;
+                       size_t          d = (n / 8) * es;
+
                        pl = med3(pl, pl + d, pl + 2 * d, cmp);
                        pm = med3(pm - d, pm, pm + d, cmp);
                        pn = med3(pn - 2 * d, pn - d, pn, cmp);
@@ -178,27 +190,46 @@ loop:SWAPINIT(a, es);
                pc -= es;
        }
        pn = (char *) a + n * es;
-       r = Min(pa - (char *) a, pb - pa);
-       vecswap(a, pb - r, r);
-       r = Min(pd - pc, pn - pd - es);
-       vecswap(pb, pn - r, r);
-       if ((r = pb - pa) > es)
-               qsort(a, r / es, es, cmp);
-       if ((r = pd - pc) > es)
+       d1 = Min(pa - (char *) a, pb - pa);
+       vecswap(a, pb - d1, d1);
+       d1 = Min(pd - pc, pn - pd - es);
+       vecswap(pb, pn - d1, d1);
+       d1 = pb - pa;
+       d2 = pd - pc;
+       if (d1 <= d2)
        {
-               /* Iterate rather than recurse to save stack space */
-               a = pn - r;
-               n = r / es;
-               goto loop;
+               /* Recurse on left partition, then iterate on right partition */
+               if (d1 > es)
+                       pg_qsort(a, d1 / es, es, cmp);
+               if (d2 > es)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* pg_qsort(pn - d2, d2 / es, es, cmp); */
+                       a = pn - d2;
+                       n = d2 / es;
+                       goto loop;
+               }
+       }
+       else
+       {
+               /* Recurse on right partition, then iterate on left partition */
+               if (d2 > es)
+                       pg_qsort(pn - d2, d2 / es, es, cmp);
+               if (d1 > es)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* pg_qsort(a, d1 / es, es, cmp); */
+                       n = d1 / es;
+                       goto loop;
+               }
        }
-/*             qsort(pn - r, r / es, es, cmp);*/
 }
 
 /*
- * qsort wrapper for strcmp.
+ * qsort comparator wrapper for strcmp.
  */
 int
 pg_qsort_strcmp(const void *a, const void *b)
 {
-       return strcmp(*(char *const *) a, *(char *const *) b);
+       return strcmp(*(const char *const *) a, *(const char *const *) b);
 }
index c0aee733be5f6f14971c6f7d309d529bb809d8c0..24acd2cd4e4a4456472f68cb26d95add61b48759 100644 (file)
@@ -6,6 +6,7 @@
  *       Remove __inline, _DIAGASSERTs, __P
  *       Remove ill-considered "swap_cnt" switch to insertion sort,
  *       in favor of a simple check for presorted input.
+ *       Take care to recurse on the smaller partition, to bound stack usage.
  *
  *     CAUTION: if you change this file, see also qsort.c, gen_qsort_tuple.pl
  *
@@ -54,9 +55,18 @@ static void swapfunc(char *, char *, size_t, int);
  * Qsort routine based on J. L. Bentley and M. D. McIlroy,
  * "Engineering a sort function",
  * Software--Practice and Experience 23 (1993) 1249-1265.
+ *
  * We have modified their original by adding a check for already-sorted input,
  * which seems to be a win per discussions on pgsql-hackers around 2006-03-21.
+ *
+ * Also, we recurse on the smaller partition and iterate on the larger one,
+ * which ensures we cannot recurse more than log(N) levels (since the
+ * partition recursed to is surely no more than half of the input).  Bentley
+ * and McIlroy explicitly rejected doing this on the grounds that it's "not
+ * worth the effort", but we have seen crashes in the field due to stack
+ * overrun, so that judgment seems wrong.
  */
+
 #define swapcode(TYPE, parmi, parmj, n) \
 do {           \
        size_t i = (n) / sizeof (TYPE);                 \
@@ -89,7 +99,7 @@ swapfunc(char *a, char *b, size_t n, int swaptype)
        } else                                                  \
                swapfunc(a, b, es, swaptype)
 
-#define vecswap(a, b, n) if ((n) > 0) swapfunc((a), (b), (size_t)(n), swaptype)
+#define vecswap(a, b, n) if ((n) > 0) swapfunc(a, b, n, swaptype)
 
 static char *
 med3(char *a, char *b, char *c, qsort_arg_comparator cmp, void *arg)
@@ -109,8 +119,9 @@ qsort_arg(void *a, size_t n, size_t es, qsort_arg_comparator cmp, void *arg)
                           *pl,
                           *pm,
                           *pn;
-       int                     d,
-                               r,
+       size_t          d1,
+                               d2;
+       int                     r,
                                swaptype,
                                presorted;
 
@@ -141,7 +152,8 @@ loop:SWAPINIT(a, es);
                pn = (char *) a + (n - 1) * es;
                if (n > 40)
                {
-                       d = (n / 8) * es;
+                       size_t          d = (n / 8) * es;
+
                        pl = med3(pl, pl + d, pl + 2 * d, cmp, arg);
                        pm = med3(pm - d, pm, pm + d, cmp, arg);
                        pn = med3(pn - 2 * d, pn - d, pn, cmp, arg);
@@ -178,18 +190,37 @@ loop:SWAPINIT(a, es);
                pc -= es;
        }
        pn = (char *) a + n * es;
-       r = Min(pa - (char *) a, pb - pa);
-       vecswap(a, pb - r, r);
-       r = Min(pd - pc, pn - pd - es);
-       vecswap(pb, pn - r, r);
-       if ((r = pb - pa) > es)
-               qsort_arg(a, r / es, es, cmp, arg);
-       if ((r = pd - pc) > es)
+       d1 = Min(pa - (char *) a, pb - pa);
+       vecswap(a, pb - d1, d1);
+       d1 = Min(pd - pc, pn - pd - es);
+       vecswap(pb, pn - d1, d1);
+       d1 = pb - pa;
+       d2 = pd - pc;
+       if (d1 <= d2)
        {
-               /* Iterate rather than recurse to save stack space */
-               a = pn - r;
-               n = r / es;
-               goto loop;
+               /* Recurse on left partition, then iterate on right partition */
+               if (d1 > es)
+                       qsort_arg(a, d1 / es, es, cmp, arg);
+               if (d2 > es)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* qsort_arg(pn - d2, d2 / es, es, cmp, arg); */
+                       a = pn - d2;
+                       n = d2 / es;
+                       goto loop;
+               }
+       }
+       else
+       {
+               /* Recurse on right partition, then iterate on left partition */
+               if (d2 > es)
+                       qsort_arg(pn - d2, d2 / es, es, cmp, arg);
+               if (d1 > es)
+               {
+                       /* Iterate rather than recurse to save stack space */
+                       /* qsort_arg(a, d1 / es, es, cmp, arg); */
+                       n = d1 / es;
+                       goto loop;
+               }
        }
-/*             qsort_arg(pn - r, r / es, es, cmp, arg);*/
 }