From be44ed27b86ebd165bbedf06a4ac5a8eb943d43c Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 21 Jan 2016 19:47:15 -0500 Subject: [PATCH] Improve index AMs' opclass validation procedures. The amvalidate functions added in commit 65c5fcd353a859da were on the crude side. Improve them in a few ways: * Perform signature checking for operators and support functions. * Apply more thorough checks for missing operators and functions, where possible. * Instead of reporting problems as ERRORs, report most problems as INFO messages and make the amvalidate function return FALSE. This allows more than one problem to be discovered per run. * Report object names rather than OIDs, and work a bit harder on making the messages understandable. Also, remove a few more opr_sanity regression test queries that are now superseded by the amvalidate checks. --- src/backend/access/brin/brin_validate.c | 260 ++++++++++++++------ src/backend/access/gin/ginvalidate.c | 233 +++++++++++++----- src/backend/access/gist/gistvalidate.c | 241 +++++++++++++++---- src/backend/access/hash/hashvalidate.c | 289 +++++++++++++++++------ src/backend/access/index/Makefile | 2 +- src/backend/access/index/amvalidate.c | 246 +++++++++++++++++++ src/backend/access/nbtree/nbtvalidate.c | 274 ++++++++++++--------- src/backend/access/spgist/spgvalidate.c | 211 +++++++++++++---- src/backend/utils/cache/lsyscache.c | 23 ++ src/include/access/amvalidate.h | 36 +++ src/include/utils/lsyscache.h | 1 + src/test/regress/expected/opr_sanity.out | 204 +--------------- src/test/regress/sql/opr_sanity.sql | 171 +------------- 13 files changed, 1417 insertions(+), 774 deletions(-) create mode 100644 src/backend/access/index/amvalidate.c create mode 100644 src/include/access/amvalidate.h diff --git a/src/backend/access/brin/brin_validate.c b/src/backend/access/brin/brin_validate.c index 956ecb95a0..4cb1bca8e4 100644 --- a/src/backend/access/brin/brin_validate.c +++ b/src/backend/access/brin/brin_validate.c @@ -13,31 +13,46 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/brin_internal.h" #include "access/htup_details.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" -#include "utils/catcache.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" #include "utils/syscache.h" /* * Validator for a BRIN opclass. + * + * Some of the checks done here cover the whole opfamily, and therefore are + * redundant when checking each opclass in a family. But they don't run long + * enough to be much of a problem, so we accept the duplication rather than + * complicate the amvalidate API. */ bool brinvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; - int i, - j; + uint64 allfuncs = 0; + uint64 allops = 0; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; + int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -47,106 +62,217 @@ brinvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum < 1 || - procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM) - ereport(ERROR, + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case BRIN_PROCNUM_OPCINFO: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 1, 1, INTERNALOID); + break; + case BRIN_PROCNUM_ADDVALUE: + ok = check_amproc_signature(procform->amproc, BOOLOID, true, + 4, 4, INTERNALOID, INTERNALOID, + INTERNALOID, INTERNALOID); + break; + case BRIN_PROCNUM_CONSISTENT: + ok = check_amproc_signature(procform->amproc, BOOLOID, true, + 3, 3, INTERNALOID, INTERNALOID, + INTERNALOID); + break; + case BRIN_PROCNUM_UNION: + ok = check_amproc_signature(procform->amproc, BOOLOID, true, + 3, 3, INTERNALOID, INTERNALOID, + INTERNALOID); + break; + default: + /* Complain if it's not a valid optional proc number */ + if (procform->amprocnum < BRIN_FIRST_OPTIONAL_PROCNUM || + procform->amprocnum > BRIN_LAST_OPTIONAL_PROCNUM) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("brin opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* omit bad proc numbers from allfuncs */ + } + /* Can't check signatures of optional procs, so assume OK */ + ok = true; + break; + } + + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("brin opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } + + /* Track all valid procedure numbers seen in opfamily */ + allfuncs |= ((uint64) 1) << procform->amprocnum; } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); - bool found = false; - /* TODO: Check that only allowed strategy numbers exist */ - if (oprform->amopstrategy < 1) - ereport(ERROR, + /* Check that only allowed strategy numbers exist */ + if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); - - /* TODO: check more thoroughly for missing support functions */ - for (j = 0; j < proclist->n_members; j++) + errmsg("brin opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } + else { - HeapTuple proctup = &proclist->members[j]->tuple; - Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); - - /* note only the operator's lefttype matters */ - if (procform->amproclefttype == oprform->amoplefttype && - procform->amprocrighttype == oprform->amoplefttype) - { - found = true; - break; - } + /* + * The set of operators supplied varies across BRIN opfamilies. + * Our plan is to identify all operator strategy numbers used in + * the opfamily and then complain about datatype combinations that + * are missing any operator(s). However, consider only numbers + * that appear in some non-cross-type case, since cross-type + * operators may have unique strategies. (This is not a great + * heuristic, in particular an erroneous number used in a + * cross-type operator will not get noticed; but the core BRIN + * opfamilies are messy enough to make it necessary.) + */ + if (oprform->amoplefttype == oprform->amoprighttype) + allops |= ((uint64) 1) << oprform->amopstrategy; } - if (!found) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opfamily %u lacks support function for operator %u", - opfamilyoid, oprform->amopopr))); - /* brin doesn't support ORDER BY operators */ if (oprform->amoppurpose != AMOP_SEARCH || OidIsValid(oprform->amopsortfamily)) - ereport(ERROR, + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("brin opfamily %s contains invalid ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + + /* Check operator signature --- same for all brin strategies */ + if (!check_amop_signature(oprform->amopopr, BOOLOID, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opfamily %u contains invalid ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("brin opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); + + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; - /* Count operators that are specifically for the named opclass */ - if (oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype) - numclassops++; + /* + * Some BRIN opfamilies expect cross-type support functions to exist, + * and some don't. We don't know exactly which are which, so if we + * find a cross-type operator for which there are no support functions + * at all, let it pass. (Don't expect that all operators exist for + * such cross-type cases, either.) + */ + if (thisgroup->functionset == 0 && + thisgroup->lefttype != thisgroup->righttype) + continue; + + /* + * Else complain if there seems to be an incomplete set of either + * operators or support functions for this datatype pair. + */ + if (thisgroup->operatorset != allops) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("brin opfamily %s is missing operator(s) for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } + if (thisgroup->functionset != allfuncs) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("brin opfamily %s is missing support function(s) for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } } - /* Check that the named opclass is complete */ - if (numclassops == 0) - ereport(ERROR, + /* Check that the originally-named opclass is complete */ + if (!opclassgroup || opclassgroup->operatorset != allops) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opclass %u is missing operator(s)", - opclassoid))); + errmsg("brin opclass %s is missing operator(s)", + opclassname))); + result = false; + } for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++) { - if ((classfuncbits & (1 << i)) != 0) + if (opclassgroup && + (opclassgroup->functionset & (((int64) 1) << i)) != 0) continue; /* got it */ - ereport(ERROR, + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin opclass %u is missing required support function %d", - opclassoid, i))); + errmsg("brin opclass %s is missing support function %d", + opclassname, i))); + result = false; } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } diff --git a/src/backend/access/gin/ginvalidate.c b/src/backend/access/gin/ginvalidate.c index e934fe824b..b87833bd5a 100644 --- a/src/backend/access/gin/ginvalidate.c +++ b/src/backend/access/gin/ginvalidate.c @@ -13,12 +13,16 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/gin_private.h" #include "access/htup_details.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" -#include "utils/catcache.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" @@ -28,15 +32,22 @@ bool ginvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + Oid opckeytype; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -46,100 +57,212 @@ ginvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opckeytype = classform->opckeytype; + if (!OidIsValid(opckeytype)) + opckeytype = opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; + + /* + * All GIN support functions should be registered with matching + * left/right types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gin opfamily %s contains support procedure %s with cross-type registration", + opfamilyname, + format_procedure(procform->amproc)))); + result = false; + } + + /* + * We can't check signatures except within the specific opclass, since + * we need to know the associated opckeytype in many cases. + */ + if (procform->amproclefttype != opcintype) + continue; - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum < 1 || - procform->amprocnum > GINNProcs) - ereport(ERROR, + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case GIN_COMPARE_PROC: + ok = check_amproc_signature(procform->amproc, INT4OID, false, + 2, 2, opckeytype, opckeytype); + break; + case GIN_EXTRACTVALUE_PROC: + /* Some opclasses omit nullFlags */ + ok = check_amproc_signature(procform->amproc, INTERNALOID, false, + 2, 3, opcintype, INTERNALOID, + INTERNALOID); + break; + case GIN_EXTRACTQUERY_PROC: + /* Some opclasses omit nullFlags and searchMode */ + ok = check_amproc_signature(procform->amproc, INTERNALOID, false, + 5, 7, opcintype, INTERNALOID, + INT2OID, INTERNALOID, INTERNALOID, + INTERNALOID, INTERNALOID); + break; + case GIN_CONSISTENT_PROC: + /* Some opclasses omit queryKeys and nullFlags */ + ok = check_amproc_signature(procform->amproc, BOOLOID, false, + 6, 8, INTERNALOID, INT2OID, + opcintype, INT4OID, + INTERNALOID, INTERNALOID, + INTERNALOID, INTERNALOID); + break; + case GIN_COMPARE_PARTIAL_PROC: + ok = check_amproc_signature(procform->amproc, INT4OID, false, + 4, 4, opckeytype, opckeytype, + INT2OID, INTERNALOID); + break; + case GIN_TRICONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, CHAROID, false, + 7, 7, INTERNALOID, INT2OID, + opcintype, INT4OID, + INTERNALOID, INTERNALOID, + INTERNALOID); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gin opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } + + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("gin opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); /* TODO: Check that only allowed strategy numbers exist */ - if (oprform->amopstrategy < 1) - ereport(ERROR, + if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); + errmsg("gin opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } /* gin doesn't support ORDER BY operators */ if (oprform->amoppurpose != AMOP_SEARCH || OidIsValid(oprform->amopsortfamily)) - ereport(ERROR, + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opfamily %u contains invalid ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("gin opfamily %s contains invalid ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } - /* Count operators that are specifically for the named opclass */ - if (oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype) - numclassops++; + /* Check operator signature --- same for all gin strategies */ + if (!check_amop_signature(oprform->amopopr, BOOLOID, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gin opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } } - /* Check that the named opclass is complete */ + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); - /* XXX needs work: we need to detect applicability of ANYARRAY operators */ -#ifdef NOT_USED - if (numclassops == 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opclass %u is missing operator(s)", - opclassoid))); -#endif + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * There is not a lot we can do to check the operator sets, since each + * GIN opclass is more or less a law unto itself, and some contain + * only operators that are binary-compatible with the opclass datatype + * (meaning that empty operator sets can be OK). That case also means + * that we shouldn't insist on nonempty function sets except for the + * opclass's own group. + */ + } + /* Check that the originally-named opclass is complete */ for (i = 1; i <= GINNProcs; i++) { - if ((classfuncbits & (1 << i)) != 0) + if (opclassgroup && (opclassgroup->functionset & (1 << i)) != 0) continue; /* got it */ if (i == GIN_COMPARE_PARTIAL_PROC) continue; /* optional method */ if (i == GIN_CONSISTENT_PROC || i == GIN_TRICONSISTENT_PROC) - continue; /* don't need to have both, see check below - * loop */ - ereport(ERROR, + continue; /* don't need both, see check below loop */ + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opclass %u is missing required support function %d", - opclassoid, i))); + errmsg("gin opclass %s is missing support function %d", + opclassname, i))); + result = false; } - if ((classfuncbits & (1 << GIN_CONSISTENT_PROC)) == 0 && - (classfuncbits & (1 << GIN_TRICONSISTENT_PROC)) == 0) - ereport(ERROR, + if (!opclassgroup || + ((opclassgroup->functionset & (1 << GIN_CONSISTENT_PROC)) == 0 && + (opclassgroup->functionset & (1 << GIN_TRICONSISTENT_PROC)) == 0)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin opclass %u is missing required support function", - opclassoid))); + errmsg("gin opclass %s is missing support function %d or %d", + opclassname, + GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC))); + result = false; + } + ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } diff --git a/src/backend/access/gist/gistvalidate.c b/src/backend/access/gist/gistvalidate.c index 86b5aeaec5..190b9787bb 100644 --- a/src/backend/access/gist/gistvalidate.c +++ b/src/backend/access/gist/gistvalidate.c @@ -13,12 +13,16 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/gist_private.h" #include "access/htup_details.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" -#include "utils/catcache.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" @@ -28,15 +32,22 @@ bool gistvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + Oid opckeytype; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -46,88 +57,218 @@ gistvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opckeytype = classform->opckeytype; + if (!OidIsValid(opckeytype)) + opckeytype = opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; + + /* + * All GiST support functions should be registered with matching + * left/right types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains support procedure %s with cross-type registration", + opfamilyname, + format_procedure(procform->amproc)))); + result = false; + } + + /* + * We can't check signatures except within the specific opclass, since + * we need to know the associated opckeytype in many cases. + */ + if (procform->amproclefttype != opcintype) + continue; - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum < 1 || - procform->amprocnum > GISTNProcs) - ereport(ERROR, + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case GIST_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, BOOLOID, false, + 5, 5, INTERNALOID, opcintype, + INT2OID, OIDOID, INTERNALOID); + break; + case GIST_UNION_PROC: + ok = check_amproc_signature(procform->amproc, opckeytype, false, + 2, 2, INTERNALOID, INTERNALOID); + break; + case GIST_COMPRESS_PROC: + case GIST_DECOMPRESS_PROC: + case GIST_FETCH_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 1, 1, INTERNALOID); + break; + case GIST_PENALTY_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 3, 3, INTERNALOID, + INTERNALOID, INTERNALOID); + break; + case GIST_PICKSPLIT_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + case GIST_EQUAL_PROC: + ok = check_amproc_signature(procform->amproc, INTERNALOID, false, + 3, 3, opckeytype, opckeytype, + INTERNALOID); + break; + case GIST_DISTANCE_PROC: + ok = check_amproc_signature(procform->amproc, FLOAT8OID, false, + 5, 5, INTERNALOID, opcintype, + INT2OID, OIDOID, INTERNALOID); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } + + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("gist opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); + Oid op_rettype; /* TODO: Check that only allowed strategy numbers exist */ if (oprform->amopstrategy < 1) - ereport(ERROR, + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); - - /* GiST supports ORDER BY operators, but must have distance proc */ - if (oprform->amoppurpose != AMOP_SEARCH && - oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype && - (classfuncbits & (1 << GIST_DISTANCE_PROC)) == 0) - ereport(ERROR, + errmsg("gist opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } + + /* GiST supports ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH) + { + /* ... but must have matching distance proc */ + if (!OidIsValid(get_opfamily_proc(opfamilyoid, + oprform->amoplefttype, + oprform->amoplefttype, + GIST_DISTANCE_PROC))) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains unsupported ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + /* ... and operator result must match the claimed btree opfamily */ + op_rettype = get_op_rettype(oprform->amopopr); + if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("gist opfamily %s contains incorrect ORDER BY opfamily specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + else + { + /* Search operators must always return bool */ + op_rettype = BOOLOID; + } + + /* Check operator signature */ + if (!check_amop_signature(oprform->amopopr, op_rettype, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opfamily %u contains unsupported ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("gist opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); - /* Count operators that are specifically for the named opclass */ - /* XXX we consider only lefttype here */ - if (oprform->amoplefttype == opcintype) - numclassops++; + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * There is not a lot we can do to check the operator sets, since each + * GiST opclass is more or less a law unto itself, and some contain + * only operators that are binary-compatible with the opclass datatype + * (meaning that empty operator sets can be OK). That case also means + * that we shouldn't insist on nonempty function sets except for the + * opclass's own group. + */ } - /* Check that the named opclass is complete */ - if (numclassops == 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opclass %u is missing operator(s)", - opclassoid))); + /* Check that the originally-named opclass is complete */ for (i = 1; i <= GISTNProcs; i++) { - if ((classfuncbits & (1 << i)) != 0) + if (opclassgroup && (opclassgroup->functionset & (1 << i)) != 0) continue; /* got it */ if (i == GIST_DISTANCE_PROC || i == GIST_FETCH_PROC) continue; /* optional methods */ - ereport(ERROR, + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist opclass %u is missing required support function %d", - opclassoid, i))); + errmsg("gist opclass %s is missing support function %d", + opclassname, i))); + result = false; } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } diff --git a/src/backend/access/hash/hashvalidate.c b/src/backend/access/hash/hashvalidate.c index abd678483c..f0395c5ce5 100644 --- a/src/backend/access/hash/hashvalidate.c +++ b/src/backend/access/hash/hashvalidate.c @@ -13,16 +13,24 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/hash.h" #include "access/htup_details.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "parser/parse_coerce.h" #include "utils/builtins.h" -#include "utils/catcache.h" +#include "utils/fmgroids.h" #include "utils/syscache.h" +static bool check_hash_func_signature(Oid funcid, Oid restype, Oid argtype); + + /* * Validator for a hash opclass. * @@ -30,23 +38,26 @@ * redundant when checking each opclass in a family. But they don't run long * enough to be much of a problem, so we accept the duplication rather than * complicate the amvalidate API. - * - * Some of the code here relies on the fact that hash has only one operator - * strategy and support function; we don't have to check for incomplete sets. */ bool hashvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; - int i, - j; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; + List *hashabletypes = NIL; + int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -56,102 +67,246 @@ hashvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum != HASHPROC) - ereport(ERROR, + /* + * All hash functions should be registered with matching left/right + * types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("hash opfamily %s contains support procedure %s with cross-type registration", + opfamilyname, + format_procedure(procform->amproc)))); + result = false; + } + + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case HASHPROC: + if (!check_hash_func_signature(procform->amproc, INT4OID, + procform->amproclefttype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("hash opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } + else + { + /* Remember which types we can hash */ + hashabletypes = + list_append_unique_oid(hashabletypes, + procform->amproclefttype); + } + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("hash opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + break; + } } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); - bool leftFound = false, - rightFound = false; /* Check that only allowed strategy numbers exist */ if (oprform->amopstrategy < 1 || oprform->amopstrategy > HTMaxStrategyNumber) - ereport(ERROR, + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); + errmsg("hash opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } - /* - * There should be relevant hash procedures for each operator - */ - for (j = 0; j < proclist->n_members; j++) + /* hash doesn't support ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH || + OidIsValid(oprform->amopsortfamily)) { - HeapTuple proctup = &proclist->members[j]->tuple; - Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); - - if (procform->amproclefttype == oprform->amoplefttype) - leftFound = true; - if (procform->amproclefttype == oprform->amoprighttype) - rightFound = true; + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("hash opfamily %s contains invalid ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; } - if (!leftFound || !rightFound) - ereport(ERROR, + /* Check operator signature --- same for all hash strategies */ + if (!check_amop_signature(oprform->amopopr, BOOLOID, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opfamily %u lacks support function for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("hash opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } - /* hash doesn't support ORDER BY operators */ - if (oprform->amoppurpose != AMOP_SEARCH || - OidIsValid(oprform->amopsortfamily)) - ereport(ERROR, + /* There should be relevant hash procedures for each datatype */ + if (!list_member_oid(hashabletypes, oprform->amoplefttype) || + !list_member_oid(hashabletypes, oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opfamily %u contains invalid ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("hash opfamily %s lacks support function for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } + + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); + + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; - /* Count operators that are specifically for the named opclass */ - if (oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype) - numclassops++; + /* + * Complain if there seems to be an incomplete set of operators for + * this datatype pair (implying that we have a hash function but no + * operator). + */ + if (thisgroup->operatorset != (1 << HTEqualStrategyNumber)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("hash opfamily %s is missing operator(s) for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } } - /* Check that the named opclass is complete */ - if (numclassops != HTMaxStrategyNumber) - ereport(ERROR, + /* Check that the originally-named opclass is supported */ + /* (if group is there, we already checked it adequately above) */ + if (!opclassgroup) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opclass %u is missing operator(s)", - opclassoid))); - if ((classfuncbits & (1 << HASHPROC)) == 0) - ereport(ERROR, + errmsg("hash opclass %s is missing operator(s)", + opclassname))); + result = false; + } + + /* + * Complain if the opfamily doesn't have entries for all possible + * combinations of its supported datatypes. While missing cross-type + * operators are not fatal, it seems reasonable to insist that all + * built-in hash opfamilies be complete. + */ + if (list_length(grouplist) != + list_length(hashabletypes) * list_length(hashabletypes)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash opclass %u is missing required support function", - opclassoid))); + errmsg("hash opfamily %s is missing cross-type operator(s)", + opfamilyname))); + result = false; + } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); + + return result; +} + + +/* + * We need a custom version of check_amproc_signature because of assorted + * hacks in the core hash opclass definitions. + */ +static bool +check_hash_func_signature(Oid funcid, Oid restype, Oid argtype) +{ + bool result = true; + HeapTuple tp; + Form_pg_proc procform; + + tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for function %u", funcid); + procform = (Form_pg_proc) GETSTRUCT(tp); + + if (procform->prorettype != restype || procform->proretset || + procform->pronargs != 1) + result = false; + + if (!IsBinaryCoercible(argtype, procform->proargtypes.values[0])) + { + /* + * Some of the built-in hash opclasses cheat by using hash functions + * that are different from but physically compatible with the opclass + * datatype. In some of these cases, even a "binary coercible" check + * fails because there's no relevant cast. For the moment, fix it by + * having a whitelist of allowed cases. Test the specific function + * identity, not just its input type, because hashvarlena() takes + * INTERNAL and allowing any such function seems too scary. + */ + if (funcid == F_HASHINT4 && + (argtype == DATEOID || + argtype == ABSTIMEOID || argtype == RELTIMEOID || + argtype == XIDOID || argtype == CIDOID)) + /* okay, allowed use of hashint4() */ ; + else if (funcid == F_TIMESTAMP_HASH && + argtype == TIMESTAMPTZOID) + /* okay, allowed use of timestamp_hash() */ ; + else if (funcid == F_HASHCHAR && + argtype == BOOLOID) + /* okay, allowed use of hashchar() */ ; + else if (funcid == F_HASHVARLENA && + argtype == BYTEAOID) + /* okay, allowed use of hashvarlena() */ ; + else + result = false; + } - return true; + ReleaseSysCache(tp); + return result; } diff --git a/src/backend/access/index/Makefile b/src/backend/access/index/Makefile index b82e5d727f..cb38be8f55 100644 --- a/src/backend/access/index/Makefile +++ b/src/backend/access/index/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/access/index top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = amapi.o genam.o indexam.o +OBJS = amapi.o amvalidate.o genam.o indexam.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/index/amvalidate.c b/src/backend/access/index/amvalidate.c new file mode 100644 index 0000000000..1a3c5f16b9 --- /dev/null +++ b/src/backend/access/index/amvalidate.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * amvalidate.c + * Support routines for index access methods' amvalidate functions. + * + * Copyright (c) 2016, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/index/amvalidate.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amvalidate.h" +#include "access/htup_details.h" +#include "catalog/pg_am.h" +#include "catalog/pg_amop.h" +#include "catalog/pg_amproc.h" +#include "catalog/pg_opclass.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_proc.h" +#include "parser/parse_coerce.h" +#include "utils/syscache.h" + + +/* + * identify_opfamily_groups() returns a List of OpFamilyOpFuncGroup structs, + * one for each combination of lefttype/righttype present in the family's + * operator and support function lists. If amopstrategy K is present for + * this datatype combination, we set bit 1 << K in operatorset, and similarly + * for the support functions. With uint64 fields we can handle operator and + * function numbers up to 63, which is plenty for the foreseeable future. + * + * The given CatCLists are expected to represent a single opfamily fetched + * from the AMOPSTRATEGY and AMPROCNUM caches, so that they will be in + * order by those caches' second and third cache keys, namely the datatypes. + */ +List * +identify_opfamily_groups(CatCList *oprlist, CatCList *proclist) +{ + List *result = NIL; + OpFamilyOpFuncGroup *thisgroup; + Form_pg_amop oprform; + Form_pg_amproc procform; + int io, + ip; + + /* We need the lists to be ordered; should be true in normal operation */ + if (!oprlist->ordered || !proclist->ordered) + elog(ERROR, "cannot validate operator family without ordered data"); + + /* + * Advance through the lists concurrently. Thanks to the ordering, we + * should see all operators and functions of a given datatype pair + * consecutively. + */ + thisgroup = NULL; + io = ip = 0; + if (io < oprlist->n_members) + { + oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple); + io++; + } + else + oprform = NULL; + if (ip < proclist->n_members) + { + procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple); + ip++; + } + else + procform = NULL; + + while (oprform || procform) + { + if (oprform && thisgroup && + oprform->amoplefttype == thisgroup->lefttype && + oprform->amoprighttype == thisgroup->righttype) + { + /* Operator belongs to current group; include it and advance */ + + /* Ignore strategy numbers outside supported range */ + if (oprform->amopstrategy > 0 && oprform->amopstrategy < 64) + thisgroup->operatorset |= ((uint64) 1) << oprform->amopstrategy; + + if (io < oprlist->n_members) + { + oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple); + io++; + } + else + oprform = NULL; + continue; + } + + if (procform && thisgroup && + procform->amproclefttype == thisgroup->lefttype && + procform->amprocrighttype == thisgroup->righttype) + { + /* Procedure belongs to current group; include it and advance */ + + /* Ignore function numbers outside supported range */ + if (procform->amprocnum > 0 && procform->amprocnum < 64) + thisgroup->functionset |= ((uint64) 1) << procform->amprocnum; + + if (ip < proclist->n_members) + { + procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple); + ip++; + } + else + procform = NULL; + continue; + } + + /* Time for a new group */ + thisgroup = (OpFamilyOpFuncGroup *) palloc(sizeof(OpFamilyOpFuncGroup)); + if (oprform && + (!procform || + (oprform->amoplefttype < procform->amproclefttype || + (oprform->amoplefttype == procform->amproclefttype && + oprform->amoprighttype < procform->amprocrighttype)))) + { + thisgroup->lefttype = oprform->amoplefttype; + thisgroup->righttype = oprform->amoprighttype; + } + else + { + thisgroup->lefttype = procform->amproclefttype; + thisgroup->righttype = procform->amprocrighttype; + } + thisgroup->operatorset = thisgroup->functionset = 0; + result = lappend(result, thisgroup); + } + + return result; +} + +/* + * Validate the signature (argument and result types) of an opclass support + * function. Return TRUE if OK, FALSE if not. + * + * The "..." represents maxargs argument-type OIDs. If "exact" is TRUE, they + * must match the function arg types exactly, else only binary-coercibly. + * In any case the function result type must match restype exactly. + */ +bool +check_amproc_signature(Oid funcid, Oid restype, bool exact, + int minargs, int maxargs,...) +{ + bool result = true; + HeapTuple tp; + Form_pg_proc procform; + va_list ap; + int i; + + tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for function %u", funcid); + procform = (Form_pg_proc) GETSTRUCT(tp); + + if (procform->prorettype != restype || procform->proretset || + procform->pronargs < minargs || procform->pronargs > maxargs) + result = false; + + va_start(ap, maxargs); + for (i = 0; i < maxargs; i++) + { + Oid argtype = va_arg(ap, Oid); + + if (i >= procform->pronargs) + continue; + if (exact ? (argtype != procform->proargtypes.values[i]) : + !IsBinaryCoercible(argtype, procform->proargtypes.values[i])) + result = false; + } + va_end(ap); + + ReleaseSysCache(tp); + return result; +} + +/* + * Validate the signature (argument and result types) of an opclass operator. + * Return TRUE if OK, FALSE if not. + * + * Currently, we can hard-wire this as accepting only binary operators. Also, + * we can insist on exact type matches, since the given lefttype/righttype + * come from pg_amop and should always match the operator exactly. + */ +bool +check_amop_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype) +{ + bool result = true; + HeapTuple tp; + Form_pg_operator opform; + + tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno)); + if (!HeapTupleIsValid(tp)) /* shouldn't happen */ + elog(ERROR, "cache lookup failed for operator %u", opno); + opform = (Form_pg_operator) GETSTRUCT(tp); + + if (opform->oprresult != restype || opform->oprkind != 'b' || + opform->oprleft != lefttype || opform->oprright != righttype) + result = false; + + ReleaseSysCache(tp); + return result; +} + +/* + * Is the datatype a legitimate input type for the btree opfamily? + */ +bool +opfamily_can_sort_type(Oid opfamilyoid, Oid datatypeoid) +{ + bool result = false; + CatCList *opclist; + int i; + + /* + * We search through all btree opclasses to see if one matches. This is a + * bit inefficient but there is no better index available. It also saves + * making an explicit check that the opfamily belongs to btree. + */ + opclist = SearchSysCacheList1(CLAAMNAMENSP, ObjectIdGetDatum(BTREE_AM_OID)); + + for (i = 0; i < opclist->n_members; i++) + { + HeapTuple classtup = &opclist->members[i]->tuple; + Form_pg_opclass classform = (Form_pg_opclass) GETSTRUCT(classtup); + + if (classform->opcfamily == opfamilyoid && + classform->opcintype == datatypeoid) + { + result = true; + break; + } + } + + ReleaseCatCacheList(opclist); + + return result; +} diff --git a/src/backend/access/nbtree/nbtvalidate.c b/src/backend/access/nbtree/nbtvalidate.c index b814b54a36..e05fe4875f 100644 --- a/src/backend/access/nbtree/nbtvalidate.c +++ b/src/backend/access/nbtree/nbtvalidate.c @@ -13,13 +13,15 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" #include "utils/builtins.h" -#include "utils/catcache.h" #include "utils/syscache.h" @@ -34,19 +36,22 @@ bool btvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; - Oid lastlefttype, - lastrighttype; - int numOps; - int i, - j; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; + List *familytypes; + int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -56,45 +61,63 @@ btvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We rely on the oprlist to be ordered */ - if (!oprlist->ordered) - elog(ERROR, "cannot validate btree opclass without ordered data"); - - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); + bool ok; + + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case BTORDER_PROC: + ok = check_amproc_signature(procform->amproc, INT4OID, true, + 2, 2, procform->amproclefttype, + procform->amprocrighttype); + break; + case BTSORTSUPPORT_PROC: + ok = check_amproc_signature(procform->amproc, VOIDOID, true, + 1, 1, INTERNALOID); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("btree opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum != BTORDER_PROC && - procform->amprocnum != BTSORTSUPPORT_PROC) - ereport(ERROR, + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("btree opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } } - /* Check operators */ - lastlefttype = lastrighttype = InvalidOid; - numOps = 0; + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; @@ -103,102 +126,127 @@ btvalidate(Oid opclassoid) /* Check that only allowed strategy numbers exist */ if (oprform->amopstrategy < 1 || oprform->amopstrategy > BTMaxStrategyNumber) - ereport(ERROR, + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); + errmsg("btree opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } - /* - * Check that we have all strategies for each supported datatype - * combination. This is easy since the list will be sorted in - * datatype order and there can't be duplicate strategy numbers. - */ - if (oprform->amoplefttype == lastlefttype && - oprform->amoprighttype == lastrighttype) - numOps++; - else + /* btree doesn't support ORDER BY operators */ + if (oprform->amoppurpose != AMOP_SEARCH || + OidIsValid(oprform->amopsortfamily)) { - /* reached a group boundary, so check ... */ - if (numOps > 0 && numOps != BTMaxStrategyNumber) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u has a partial set of operators for datatypes %s and %s", - opfamilyoid, - format_type_be(lastlefttype), - format_type_be(lastrighttype)))); - /* ... and reset for new group */ - lastlefttype = oprform->amoplefttype; - lastrighttype = oprform->amoprighttype; - numOps = 1; + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("btree opfamily %s contains invalid ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + + /* Check operator signature --- same for all btree strategies */ + if (!check_amop_signature(oprform->amopopr, BOOLOID, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("btree opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; } + } + + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + familytypes = NIL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); + + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * Identify all distinct data types handled in this opfamily. This + * implementation is O(N^2), but there aren't likely to be enough + * types in the family for it to matter. + */ + familytypes = list_append_unique_oid(familytypes, thisgroup->lefttype); + familytypes = list_append_unique_oid(familytypes, thisgroup->righttype); /* - * There should be a relevant support function for each operator, but - * we only need to check this once per pair of datatypes. + * Complain if there seems to be an incomplete set of either operators + * or support functions for this datatype pair. The only thing that + * is considered optional is the sortsupport function. */ - if (numOps == 1) + if (thisgroup->operatorset != + ((1 << BTLessStrategyNumber) | + (1 << BTLessEqualStrategyNumber) | + (1 << BTEqualStrategyNumber) | + (1 << BTGreaterEqualStrategyNumber) | + (1 << BTGreaterStrategyNumber))) { - bool found = false; - - for (j = 0; j < proclist->n_members; j++) - { - HeapTuple proctup = &proclist->members[j]->tuple; - Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); - - if (procform->amprocnum == BTORDER_PROC && - procform->amproclefttype == oprform->amoplefttype && - procform->amprocrighttype == oprform->amoprighttype) - { - found = true; - break; - } - } - - if (!found) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u lacks support function for operator %u", - opfamilyoid, oprform->amopopr))); + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("btree opfamily %s is missing operator(s) for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; } - - /* btree doesn't support ORDER BY operators */ - if (oprform->amoppurpose != AMOP_SEARCH || - OidIsValid(oprform->amopsortfamily)) - ereport(ERROR, + if ((thisgroup->functionset & (1 << BTORDER_PROC)) == 0) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u contains invalid ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); - - /* Count operators that are specifically for the named opclass */ - if (oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype) - numclassops++; + errmsg("btree opfamily %s is missing support function for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } } - /* don't forget to check the last batch of operators for completeness */ - if (numOps > 0 && numOps != BTMaxStrategyNumber) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opfamily %u has a partial set of operators for datatypes %s and %s", - opfamilyoid, - format_type_be(lastlefttype), - format_type_be(lastrighttype)))); - - /* Check that the named opclass is complete */ - if (numclassops != BTMaxStrategyNumber) - ereport(ERROR, + /* Check that the originally-named opclass is supported */ + /* (if group is there, we already checked it adequately above) */ + if (!opclassgroup) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opclass %u is missing operator(s)", - opclassoid))); - if ((classfuncbits & (1 << BTORDER_PROC)) == 0) - ereport(ERROR, + errmsg("btree opclass %s is missing operator(s)", + opclassname))); + result = false; + } + + /* + * Complain if the opfamily doesn't have entries for all possible + * combinations of its supported datatypes. While missing cross-type + * operators are not fatal, they do limit the planner's ability to derive + * additional qual clauses from equivalence classes, so it seems + * reasonable to insist that all built-in btree opfamilies be complete. + */ + if (list_length(grouplist) != + list_length(familytypes) * list_length(familytypes)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree opclass %u is missing required support function", - opclassoid))); + errmsg("btree opfamily %s is missing cross-type operator(s)", + opfamilyname))); + result = false; + } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c index c2d2d466d9..ba7c828453 100644 --- a/src/backend/access/spgist/spgvalidate.c +++ b/src/backend/access/spgist/spgvalidate.c @@ -13,30 +13,44 @@ */ #include "postgres.h" +#include "access/amvalidate.h" #include "access/htup_details.h" #include "access/spgist_private.h" #include "catalog/pg_amop.h" #include "catalog/pg_amproc.h" #include "catalog/pg_opclass.h" -#include "utils/catcache.h" +#include "catalog/pg_opfamily.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" #include "utils/syscache.h" /* * Validator for an SP-GiST opclass. + * + * Some of the checks done here cover the whole opfamily, and therefore are + * redundant when checking each opclass in a family. But they don't run long + * enough to be much of a problem, so we accept the duplication rather than + * complicate the amvalidate API. */ bool spgvalidate(Oid opclassoid) { + bool result = true; HeapTuple classtup; Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; - int numclassops; - int32 classfuncbits; + char *opclassname; + HeapTuple familytup; + Form_pg_opfamily familyform; + char *opfamilyname; CatCList *proclist, *oprlist; + List *grouplist; + OpFamilyOpFuncGroup *opclassgroup; int i; + ListCell *lc; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -46,84 +60,185 @@ spgvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opclassname = NameStr(classform->opcname); - ReleaseSysCache(classtup); + /* Fetch opfamily information */ + familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid)); + if (!HeapTupleIsValid(familytup)) + elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid); + familyform = (Form_pg_opfamily) GETSTRUCT(familytup); + + opfamilyname = NameStr(familyform->opfname); /* Fetch all operators and support functions of the opfamily */ oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid)); proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid)); - /* We'll track the ops and functions belonging to the named opclass */ - numclassops = 0; - classfuncbits = 0; - - /* Check support functions */ + /* Check individual support functions */ for (i = 0; i < proclist->n_members; i++) { HeapTuple proctup = &proclist->members[i]->tuple; Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup); - - /* Check that only allowed procedure numbers exist */ - if (procform->amprocnum < 1 || - procform->amprocnum > SPGISTNProc) - ereport(ERROR, + bool ok; + + /* + * All SP-GiST support functions should be registered with matching + * left/right types + */ + if (procform->amproclefttype != procform->amprocrighttype) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("spgist opfamily %s contains support procedure %s with cross-type registration", + opfamilyname, + format_procedure(procform->amproc)))); + result = false; + } + + /* Check procedure numbers and function signatures */ + switch (procform->amprocnum) + { + case SPGIST_CONFIG_PROC: + case SPGIST_CHOOSE_PROC: + case SPGIST_PICKSPLIT_PROC: + case SPGIST_INNER_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, VOIDOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + case SPGIST_LEAF_CONSISTENT_PROC: + ok = check_amproc_signature(procform->amproc, BOOLOID, true, + 2, 2, INTERNALOID, INTERNALOID); + break; + default: + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("spgist opfamily %s contains function %s with invalid support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + continue; /* don't want additional message */ + } + + if (!ok) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist opfamily %u contains invalid support number %d for procedure %u", - opfamilyoid, - procform->amprocnum, procform->amproc))); - - /* Remember functions that are specifically for the named opclass */ - if (procform->amproclefttype == opcintype && - procform->amprocrighttype == opcintype) - classfuncbits |= (1 << procform->amprocnum); + errmsg("spgist opfamily %s contains function %s with wrong signature for support number %d", + opfamilyname, + format_procedure(procform->amproc), + procform->amprocnum))); + result = false; + } } - /* Check operators */ + /* Check individual operators */ for (i = 0; i < oprlist->n_members; i++) { HeapTuple oprtup = &oprlist->members[i]->tuple; Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup); /* TODO: Check that only allowed strategy numbers exist */ - if (oprform->amopstrategy < 1) - ereport(ERROR, + if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist opfamily %u contains invalid strategy number %d for operator %u", - opfamilyoid, - oprform->amopstrategy, oprform->amopopr))); + errmsg("spgist opfamily %s contains operator %s with invalid strategy number %d", + opfamilyname, + format_operator(oprform->amopopr), + oprform->amopstrategy))); + result = false; + } /* spgist doesn't support ORDER BY operators */ if (oprform->amoppurpose != AMOP_SEARCH || OidIsValid(oprform->amopsortfamily)) - ereport(ERROR, + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("spgist opfamily %s contains invalid ORDER BY specification for operator %s", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + + /* Check operator signature --- same for all spgist strategies */ + if (!check_amop_signature(oprform->amopopr, BOOLOID, + oprform->amoplefttype, + oprform->amoprighttype)) + { + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist opfamily %u contains invalid ORDER BY specification for operator %u", - opfamilyoid, oprform->amopopr))); + errmsg("spgist opfamily %s contains operator %s with wrong signature", + opfamilyname, + format_operator(oprform->amopopr)))); + result = false; + } + } - /* Count operators that are specifically for the named opclass */ - if (oprform->amoplefttype == opcintype && - oprform->amoprighttype == opcintype) - numclassops++; + /* Now check for inconsistent groups of operators/functions */ + grouplist = identify_opfamily_groups(oprlist, proclist); + opclassgroup = NULL; + foreach(lc, grouplist) + { + OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc); + + /* Remember the group exactly matching the test opclass */ + if (thisgroup->lefttype == opcintype && + thisgroup->righttype == opcintype) + opclassgroup = thisgroup; + + /* + * Complain if there are any datatype pairs with functions but no + * operators. This is about the best we can do for now to detect + * missing operators. + */ + if (thisgroup->operatorset == 0) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("spgist opfamily %s is missing operator(s) for types %s and %s", + opfamilyname, + format_type_be(thisgroup->lefttype), + format_type_be(thisgroup->righttype)))); + result = false; + } + + /* + * Complain if we're missing functions for any datatype, remembering + * that SP-GiST doesn't use cross-type support functions. + */ + if (thisgroup->lefttype != thisgroup->righttype) + continue; + + for (i = 1; i <= SPGISTNProc; i++) + { + if ((thisgroup->functionset & (1 << i)) != 0) + continue; /* got it */ + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("spgist opfamily %s is missing support function %d for type %s", + opfamilyname, i, + format_type_be(thisgroup->lefttype)))); + result = false; + } } - /* Check that the named opclass is complete */ - if (numclassops == 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist opclass %u is missing operator(s)", - opclassoid))); - for (i = 1; i <= SPGISTNProc; i++) + /* Check that the originally-named opclass is supported */ + /* (if group is there, we already checked it adequately above) */ + if (!opclassgroup) { - if ((classfuncbits & (1 << i)) != 0) - continue; /* got it */ - ereport(ERROR, + ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist opclass %u is missing required support function %d", - opclassoid, i))); + errmsg("spgist opclass %s is missing operator(s)", + opclassname))); + result = false; } ReleaseCatCacheList(proclist); ReleaseCatCacheList(oprlist); + ReleaseSysCache(familytup); + ReleaseSysCache(classtup); - return true; + return result; } diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index a180d2b507..cb26d79afb 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -1102,6 +1102,29 @@ get_opname(Oid opno) return NULL; } +/* + * get_op_rettype + * Given operator oid, return the operator's result type. + */ +Oid +get_op_rettype(Oid opno) +{ + HeapTuple tp; + + tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno)); + if (HeapTupleIsValid(tp)) + { + Form_pg_operator optup = (Form_pg_operator) GETSTRUCT(tp); + Oid result; + + result = optup->oprresult; + ReleaseSysCache(tp); + return result; + } + else + return InvalidOid; +} + /* * op_input_types * diff --git a/src/include/access/amvalidate.h b/src/include/access/amvalidate.h new file mode 100644 index 0000000000..7f4289a996 --- /dev/null +++ b/src/include/access/amvalidate.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * amvalidate.h + * Support routines for index access methods' amvalidate functions. + * + * Copyright (c) 2016, PostgreSQL Global Development Group + * + * src/include/access/amvalidate.h + * + *------------------------------------------------------------------------- + */ +#ifndef AMVALIDATE_H +#define AMVALIDATE_H + +#include "utils/catcache.h" + + +/* Struct returned (in a list) by identify_opfamily_groups() */ +typedef struct OpFamilyOpFuncGroup +{ + Oid lefttype; /* amoplefttype/amproclefttype */ + Oid righttype; /* amoprighttype/amprocrighttype */ + uint64 operatorset; /* bitmask of operators with these types */ + uint64 functionset; /* bitmask of support funcs with these types */ +} OpFamilyOpFuncGroup; + + +/* Functions in access/index/amvalidate.c */ +extern List *identify_opfamily_groups(CatCList *oprlist, CatCList *proclist); +extern bool check_amproc_signature(Oid funcid, Oid restype, bool exact, + int minargs, int maxargs,...); +extern bool check_amop_signature(Oid opno, Oid restype, + Oid lefttype, Oid righttype); +extern bool opfamily_can_sort_type(Oid opfamilyoid, Oid datatypeoid); + +#endif /* AMVALIDATE_H */ diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 9efdea22fd..dcb89807e9 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -75,6 +75,7 @@ extern Oid get_opclass_family(Oid opclass); extern Oid get_opclass_input_type(Oid opclass); extern RegProcedure get_opcode(Oid opno); extern char *get_opname(Oid opno); +extern Oid get_op_rettype(Oid opno); extern void op_input_types(Oid opno, Oid *lefttype, Oid *righttype); extern bool op_mergejoinable(Oid opno, Oid inputtype); extern bool op_hashjoinable(Oid opno, Oid inputtype); diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out index 45f13f3d06..7c09fa3d59 100644 --- a/src/test/regress/expected/opr_sanity.out +++ b/src/test/regress/expected/opr_sanity.out @@ -1567,6 +1567,7 @@ WHERE p1.oid != p2.oid AND (0 rows) -- Ask access methods to validate opclasses +-- (this replaces a lot of SQL-level checks that used to be done in this file) SELECT oid, opcname FROM pg_opclass WHERE NOT amvalidate(oid); oid | opcname -----+--------- @@ -1610,15 +1611,6 @@ WHERE NOT ((p1.amoppurpose = 's' AND p1.amopsortfamily = 0) OR ------------+-------------- (0 rows) --- amoplefttype/amoprighttype must match the operator -SELECT p1.oid, p2.oid -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND NOT - (p1.amoplefttype = p2.oprleft AND p1.amoprighttype = p2.oprright); - oid | oid ------+----- -(0 rows) - -- amopmethod must match owning opfamily's opfmethod SELECT p1.oid, p2.oid FROM pg_amop AS p1, pg_opfamily AS p2 @@ -1627,44 +1619,6 @@ WHERE p1.amopfamily = p2.oid AND p1.amopmethod != p2.opfmethod; -----+----- (0 rows) --- amopsortfamily, if present, must reference a btree family -SELECT p1.amopfamily, p1.amopstrategy -FROM pg_amop AS p1 -WHERE p1.amopsortfamily <> 0 AND NOT EXISTS - (SELECT 1 from pg_opfamily op WHERE op.oid = p1.amopsortfamily - AND op.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')); - amopfamily | amopstrategy -------------+-------------- -(0 rows) - --- Check that amopopr points at a reasonable-looking operator, ie a binary --- operator. If it's a search operator it had better yield boolean, --- otherwise an input type of its sort opfamily. -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND - p2.oprkind != 'b'; - amopfamily | amopopr | oid | oprname -------------+---------+-----+--------- -(0 rows) - -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND p1.amoppurpose = 's' AND - p2.oprresult != 'bool'::regtype; - amopfamily | amopopr | oid | oprname -------------+---------+-----+--------- -(0 rows) - -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND p1.amoppurpose = 'o' AND NOT EXISTS - (SELECT 1 FROM pg_opclass op - WHERE opcfamily = p1.amopsortfamily AND opcintype = p2.oprresult); - amopfamily | amopopr | oid | oprname -------------+---------+-----+--------- -(0 rows) - -- Make a list of all the distinct operator names being used in particular -- strategy slots. This is a bit hokey, since the list might need to change -- in future releases, but it's an effective way of spotting mistakes such as @@ -1843,72 +1797,6 @@ WHERE p1.amopopr = p2.oid AND p2.oprcode = p3.oid AND ------------+---------+---------+-------- (0 rows) --- Multiple-datatype btree opfamilies should provide closed sets of equality --- operators; that is if you provide int2 = int4 and int4 = int8 then you --- should also provide int2 = int8 (and commutators of all these). This is --- important because the planner tries to deduce additional qual clauses from --- transitivity of mergejoinable operators. If there are clauses --- int2var = int4var and int4var = int8var, the planner will want to deduce --- int2var = int8var ... so there should be a way to represent that. While --- a missing cross-type operator is now only an efficiency loss rather than --- an error condition, it still seems reasonable to insist that all built-in --- opfamilies be complete. --- check commutative closure -SELECT p1.amoplefttype, p1.amoprighttype -FROM pg_amop AS p1 -WHERE p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p1.amopstrategy = 3 AND - p1.amoplefttype != p1.amoprighttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p2 WHERE - p2.amopfamily = p1.amopfamily AND - p2.amoplefttype = p1.amoprighttype AND - p2.amoprighttype = p1.amoplefttype AND - p2.amopstrategy = 3); - amoplefttype | amoprighttype ---------------+--------------- -(0 rows) - --- check transitive closure -SELECT p1.amoplefttype, p1.amoprighttype, p2.amoprighttype -FROM pg_amop AS p1, pg_amop AS p2 -WHERE p1.amopfamily = p2.amopfamily AND - p1.amoprighttype = p2.amoplefttype AND - p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p2.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p1.amopstrategy = 3 AND p2.amopstrategy = 3 AND - p1.amoplefttype != p1.amoprighttype AND - p2.amoplefttype != p2.amoprighttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p3 WHERE - p3.amopfamily = p1.amopfamily AND - p3.amoplefttype = p1.amoplefttype AND - p3.amoprighttype = p2.amoprighttype AND - p3.amopstrategy = 3); - amoplefttype | amoprighttype | amoprighttype ---------------+---------------+--------------- -(0 rows) - --- We also expect that built-in multiple-datatype hash opfamilies provide --- complete sets of cross-type operators. Again, this isn't required, but --- it is reasonable to expect it for built-in opfamilies. --- if same family has x=x and y=y, it should have x=y -SELECT p1.amoplefttype, p2.amoplefttype -FROM pg_amop AS p1, pg_amop AS p2 -WHERE p1.amopfamily = p2.amopfamily AND - p1.amoplefttype = p1.amoprighttype AND - p2.amoplefttype = p2.amoprighttype AND - p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') AND - p2.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') AND - p1.amopstrategy = 1 AND p2.amopstrategy = 1 AND - p1.amoplefttype != p2.amoplefttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p3 WHERE - p3.amopfamily = p1.amopfamily AND - p3.amoplefttype = p1.amoplefttype AND - p3.amoprighttype = p2.amoplefttype AND - p3.amopstrategy = 1); - amoplefttype | amoplefttype ---------------+-------------- -(0 rows) - -- **************** pg_amproc **************** -- Look for illegal values in pg_amproc fields SELECT p1.amprocfamily, p1.amprocnum @@ -1919,96 +1807,6 @@ WHERE p1.amprocfamily = 0 OR p1.amproclefttype = 0 OR p1.amprocrighttype = 0 --------------+----------- (0 rows) --- Unfortunately, we can't check the amproc link very well because the --- signature of the function may be different for different support routines --- or different base data types. --- We can check that all the referenced instances of the same support --- routine number take the same number of parameters, but that's about it --- for a general check... -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname, - p4.amprocfamily, p4.amprocnum, - p5.oid, p5.proname, - p6.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3, - pg_amproc AS p4, pg_proc AS p5, pg_opfamily AS p6 -WHERE p1.amprocfamily = p3.oid AND p4.amprocfamily = p6.oid AND - p3.opfmethod = p6.opfmethod AND p1.amprocnum = p4.amprocnum AND - p1.amproc = p2.oid AND p4.amproc = p5.oid AND - (p2.proretset OR p5.proretset OR p2.pronargs != p5.pronargs); - amprocfamily | amprocnum | oid | proname | opfname | amprocfamily | amprocnum | oid | proname | opfname ---------------+-----------+-----+---------+---------+--------------+-----------+-----+---------+--------- -(0 rows) - --- For btree, though, we can do better since we know the support routines --- must be of the form cmp(lefttype, righttype) returns int4 --- or sortsupport(internal) returns void. -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (CASE WHEN amprocnum = 1 - THEN prorettype != 'int4'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != amproclefttype - OR proargtypes[1] != amprocrighttype - WHEN amprocnum = 2 - THEN prorettype != 'void'::regtype OR proretset OR pronargs != 1 - OR proargtypes[0] != 'internal'::regtype - ELSE true END); - amprocfamily | amprocnum | oid | proname | opfname ---------------+-----------+-----+---------+--------- -(0 rows) - --- For hash we can also do a little better: the support routines must be --- of the form hash(lefttype) returns int4. There are several cases where --- we cheat and use a hash function that is physically compatible with the --- datatype even though there's no cast, so this check does find a small --- number of entries. -SELECT p1.amprocfamily, p1.amprocnum, p2.proname, p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (amprocnum != 1 - OR proretset - OR prorettype != 'int4'::regtype - OR pronargs != 1 - OR NOT physically_coercible(amproclefttype, proargtypes[0]) - OR amproclefttype != amprocrighttype) -ORDER BY 1; - amprocfamily | amprocnum | proname | opfname ---------------+-----------+----------------+----------------- - 435 | 1 | hashint4 | date_ops - 1999 | 1 | timestamp_hash | timestamptz_ops - 2222 | 1 | hashchar | bool_ops - 2223 | 1 | hashvarlena | bytea_ops - 2225 | 1 | hashint4 | xid_ops - 2226 | 1 | hashint4 | cid_ops -(6 rows) - --- We can also check SP-GiST carefully, since the support routine signatures --- are independent of the datatype being indexed. -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'spgist') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (CASE WHEN amprocnum = 1 OR amprocnum = 2 OR amprocnum = 3 OR amprocnum = 4 - THEN prorettype != 'void'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != 'internal'::regtype - OR proargtypes[1] != 'internal'::regtype - WHEN amprocnum = 5 - THEN prorettype != 'bool'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != 'internal'::regtype - OR proargtypes[1] != 'internal'::regtype - ELSE true END); - amprocfamily | amprocnum | oid | proname | opfname ---------------+-----------+-----+---------+--------- -(0 rows) - -- Support routines that are primary members of opfamilies must be immutable -- (else it suggests that the index ordering isn't fixed). But cross-type -- members need only be stable, since they are just shorthands diff --git a/src/test/regress/sql/opr_sanity.sql b/src/test/regress/sql/opr_sanity.sql index c42c8a3561..6c9784a387 100644 --- a/src/test/regress/sql/opr_sanity.sql +++ b/src/test/regress/sql/opr_sanity.sql @@ -1037,6 +1037,7 @@ WHERE p1.oid != p2.oid AND p1.opcdefault AND p2.opcdefault; -- Ask access methods to validate opclasses +-- (this replaces a lot of SQL-level checks that used to be done in this file) SELECT oid, opcname FROM pg_opclass WHERE NOT amvalidate(oid); @@ -1073,47 +1074,12 @@ FROM pg_amop as p1 WHERE NOT ((p1.amoppurpose = 's' AND p1.amopsortfamily = 0) OR (p1.amoppurpose = 'o' AND p1.amopsortfamily <> 0)); --- amoplefttype/amoprighttype must match the operator - -SELECT p1.oid, p2.oid -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND NOT - (p1.amoplefttype = p2.oprleft AND p1.amoprighttype = p2.oprright); - -- amopmethod must match owning opfamily's opfmethod SELECT p1.oid, p2.oid FROM pg_amop AS p1, pg_opfamily AS p2 WHERE p1.amopfamily = p2.oid AND p1.amopmethod != p2.opfmethod; --- amopsortfamily, if present, must reference a btree family - -SELECT p1.amopfamily, p1.amopstrategy -FROM pg_amop AS p1 -WHERE p1.amopsortfamily <> 0 AND NOT EXISTS - (SELECT 1 from pg_opfamily op WHERE op.oid = p1.amopsortfamily - AND op.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree')); - --- Check that amopopr points at a reasonable-looking operator, ie a binary --- operator. If it's a search operator it had better yield boolean, --- otherwise an input type of its sort opfamily. - -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND - p2.oprkind != 'b'; - -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND p1.amoppurpose = 's' AND - p2.oprresult != 'bool'::regtype; - -SELECT p1.amopfamily, p1.amopopr, p2.oid, p2.oprname -FROM pg_amop AS p1, pg_operator AS p2 -WHERE p1.amopopr = p2.oid AND p1.amoppurpose = 'o' AND NOT EXISTS - (SELECT 1 FROM pg_opclass op - WHERE opcfamily = p1.amopsortfamily AND opcintype = p2.oprresult); - -- Make a list of all the distinct operator names being used in particular -- strategy slots. This is a bit hokey, since the list might need to change -- in future releases, but it's an effective way of spotting mistakes such as @@ -1171,65 +1137,6 @@ WHERE p1.amopopr = p2.oid AND p2.oprcode = p3.oid AND p1.amoplefttype != p1.amoprighttype AND p3.provolatile = 'v'; --- Multiple-datatype btree opfamilies should provide closed sets of equality --- operators; that is if you provide int2 = int4 and int4 = int8 then you --- should also provide int2 = int8 (and commutators of all these). This is --- important because the planner tries to deduce additional qual clauses from --- transitivity of mergejoinable operators. If there are clauses --- int2var = int4var and int4var = int8var, the planner will want to deduce --- int2var = int8var ... so there should be a way to represent that. While --- a missing cross-type operator is now only an efficiency loss rather than --- an error condition, it still seems reasonable to insist that all built-in --- opfamilies be complete. - --- check commutative closure -SELECT p1.amoplefttype, p1.amoprighttype -FROM pg_amop AS p1 -WHERE p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p1.amopstrategy = 3 AND - p1.amoplefttype != p1.amoprighttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p2 WHERE - p2.amopfamily = p1.amopfamily AND - p2.amoplefttype = p1.amoprighttype AND - p2.amoprighttype = p1.amoplefttype AND - p2.amopstrategy = 3); - --- check transitive closure -SELECT p1.amoplefttype, p1.amoprighttype, p2.amoprighttype -FROM pg_amop AS p1, pg_amop AS p2 -WHERE p1.amopfamily = p2.amopfamily AND - p1.amoprighttype = p2.amoplefttype AND - p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p2.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') AND - p1.amopstrategy = 3 AND p2.amopstrategy = 3 AND - p1.amoplefttype != p1.amoprighttype AND - p2.amoplefttype != p2.amoprighttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p3 WHERE - p3.amopfamily = p1.amopfamily AND - p3.amoplefttype = p1.amoplefttype AND - p3.amoprighttype = p2.amoprighttype AND - p3.amopstrategy = 3); - --- We also expect that built-in multiple-datatype hash opfamilies provide --- complete sets of cross-type operators. Again, this isn't required, but --- it is reasonable to expect it for built-in opfamilies. - --- if same family has x=x and y=y, it should have x=y -SELECT p1.amoplefttype, p2.amoplefttype -FROM pg_amop AS p1, pg_amop AS p2 -WHERE p1.amopfamily = p2.amopfamily AND - p1.amoplefttype = p1.amoprighttype AND - p2.amoplefttype = p2.amoprighttype AND - p1.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') AND - p2.amopmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') AND - p1.amopstrategy = 1 AND p2.amopstrategy = 1 AND - p1.amoplefttype != p2.amoplefttype AND - NOT EXISTS(SELECT 1 FROM pg_amop p3 WHERE - p3.amopfamily = p1.amopfamily AND - p3.amoplefttype = p1.amoplefttype AND - p3.amoprighttype = p2.amoplefttype AND - p3.amopstrategy = 1); - -- **************** pg_amproc **************** @@ -1240,82 +1147,6 @@ FROM pg_amproc as p1 WHERE p1.amprocfamily = 0 OR p1.amproclefttype = 0 OR p1.amprocrighttype = 0 OR p1.amprocnum < 1 OR p1.amproc = 0; --- Unfortunately, we can't check the amproc link very well because the --- signature of the function may be different for different support routines --- or different base data types. --- We can check that all the referenced instances of the same support --- routine number take the same number of parameters, but that's about it --- for a general check... - -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname, - p4.amprocfamily, p4.amprocnum, - p5.oid, p5.proname, - p6.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3, - pg_amproc AS p4, pg_proc AS p5, pg_opfamily AS p6 -WHERE p1.amprocfamily = p3.oid AND p4.amprocfamily = p6.oid AND - p3.opfmethod = p6.opfmethod AND p1.amprocnum = p4.amprocnum AND - p1.amproc = p2.oid AND p4.amproc = p5.oid AND - (p2.proretset OR p5.proretset OR p2.pronargs != p5.pronargs); - --- For btree, though, we can do better since we know the support routines --- must be of the form cmp(lefttype, righttype) returns int4 --- or sortsupport(internal) returns void. - -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'btree') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (CASE WHEN amprocnum = 1 - THEN prorettype != 'int4'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != amproclefttype - OR proargtypes[1] != amprocrighttype - WHEN amprocnum = 2 - THEN prorettype != 'void'::regtype OR proretset OR pronargs != 1 - OR proargtypes[0] != 'internal'::regtype - ELSE true END); - --- For hash we can also do a little better: the support routines must be --- of the form hash(lefttype) returns int4. There are several cases where --- we cheat and use a hash function that is physically compatible with the --- datatype even though there's no cast, so this check does find a small --- number of entries. - -SELECT p1.amprocfamily, p1.amprocnum, p2.proname, p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'hash') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (amprocnum != 1 - OR proretset - OR prorettype != 'int4'::regtype - OR pronargs != 1 - OR NOT physically_coercible(amproclefttype, proargtypes[0]) - OR amproclefttype != amprocrighttype) -ORDER BY 1; - --- We can also check SP-GiST carefully, since the support routine signatures --- are independent of the datatype being indexed. - -SELECT p1.amprocfamily, p1.amprocnum, - p2.oid, p2.proname, - p3.opfname -FROM pg_amproc AS p1, pg_proc AS p2, pg_opfamily AS p3 -WHERE p3.opfmethod = (SELECT oid FROM pg_am WHERE amname = 'spgist') - AND p1.amprocfamily = p3.oid AND p1.amproc = p2.oid AND - (CASE WHEN amprocnum = 1 OR amprocnum = 2 OR amprocnum = 3 OR amprocnum = 4 - THEN prorettype != 'void'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != 'internal'::regtype - OR proargtypes[1] != 'internal'::regtype - WHEN amprocnum = 5 - THEN prorettype != 'bool'::regtype OR proretset OR pronargs != 2 - OR proargtypes[0] != 'internal'::regtype - OR proargtypes[1] != 'internal'::regtype - ELSE true END); - -- Support routines that are primary members of opfamilies must be immutable -- (else it suggests that the index ordering isn't fixed). But cross-type -- members need only be stable, since they are just shorthands -- 2.40.0