PartitionBoundSpec *spec = (PartitionBoundSpec *) bound;
PartitionKey key = RelationGetPartitionKey(parent);
List *my_qual = NIL;
- TupleDesc parent_tupdesc = RelationGetDescr(parent);
- AttrNumber parent_attno;
- AttrNumber *partition_attnos;
- bool found_whole_row;
Assert(key != NULL);
(int) key->strategy);
}
- /*
- * Translate vars in the generated expression to have correct attnos. Note
- * that the vars in my_qual bear attnos dictated by key which carries
- * physical attnos of the parent. We must allow for a case where physical
- * attnos of a partition can be different from the parent.
- */
- partition_attnos = (AttrNumber *)
- palloc0(parent_tupdesc->natts * sizeof(AttrNumber));
- for (parent_attno = 1; parent_attno <= parent_tupdesc->natts;
- parent_attno++)
+ return my_qual;
+}
+
+/*
+ * map_partition_varattnos - maps varattno of any Vars in expr from the
+ * parent attno to partition attno.
+ *
+ * We must allow for a case where physical attnos of a partition can be
+ * different from the parent's.
+ */
+List *
+map_partition_varattnos(List *expr, Relation partrel, Relation parent)
+{
+ TupleDesc tupdesc = RelationGetDescr(parent);
+ AttrNumber attno;
+ AttrNumber *part_attnos;
+ bool found_whole_row;
+
+ if (expr == NIL)
+ return NIL;
+
+ part_attnos = (AttrNumber *) palloc0(tupdesc->natts * sizeof(AttrNumber));
+ for (attno = 1; attno <= tupdesc->natts; attno++)
{
- Form_pg_attribute attribute = parent_tupdesc->attrs[parent_attno - 1];
+ Form_pg_attribute attribute = tupdesc->attrs[attno - 1];
char *attname = NameStr(attribute->attname);
- AttrNumber partition_attno;
+ AttrNumber part_attno;
if (attribute->attisdropped)
continue;
- partition_attno = get_attnum(RelationGetRelid(rel), attname);
- partition_attnos[parent_attno - 1] = partition_attno;
+ part_attno = get_attnum(RelationGetRelid(partrel), attname);
+ part_attnos[attno - 1] = part_attno;
}
- my_qual = (List *) map_variable_attnos((Node *) my_qual,
- 1, 0,
- partition_attnos,
- parent_tupdesc->natts,
- &found_whole_row);
- /* there can never be a whole-row reference here */
+ expr = (List *) map_variable_attnos((Node *) expr,
+ 1, 0,
+ part_attnos,
+ tupdesc->natts,
+ &found_whole_row);
+ /* There can never be a whole-row reference here */
if (found_whole_row)
elog(ERROR, "unexpected whole-row reference found in partition key");
- return my_qual;
+ return expr;
}
/*
/* Guard against stack overflow due to overly deep partition tree */
check_stack_depth();
+ /* Quick copy */
+ if (rel->rd_partcheck != NIL)
+ return copyObject(rel->rd_partcheck);
+
/* Grab at least an AccessShareLock on the parent table */
parent = heap_open(get_partition_parent(RelationGetRelid(rel)),
AccessShareLock);
- /* Quick copy */
- if (rel->rd_partcheck)
- {
- if (parent->rd_rel->relispartition)
- result = list_concat(generate_partition_qual(parent),
- copyObject(rel->rd_partcheck));
- else
- result = copyObject(rel->rd_partcheck);
-
- heap_close(parent, AccessShareLock);
- return result;
- }
-
/* Get pg_class.relpartbound */
- if (!rel->rd_rel->relispartition) /* should not happen */
- elog(ERROR, "relation \"%s\" has relispartition = false",
- RelationGetRelationName(rel));
tuple = SearchSysCache1(RELOID, RelationGetRelid(rel));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for relation %u",
my_qual = get_qual_from_partbound(rel, parent, bound);
- /* If requested, add parent's quals to the list (if any) */
+ /* Add the parent's quals to the list (if any) */
if (parent->rd_rel->relispartition)
- {
- List *parent_check;
-
- parent_check = generate_partition_qual(parent);
- result = list_concat(parent_check, my_qual);
- }
+ result = list_concat(generate_partition_qual(parent), my_qual);
else
result = my_qual;
- /* Save a copy of my_qual in the relcache */
+ /*
+ * Change Vars to have partition's attnos instead of the parent's.
+ * We do this after we concatenate the parent's quals, because
+ * we want every Var in it to bear this relation's attnos.
+ */
+ result = map_partition_varattnos(result, rel, parent);
+
+ /* Save a copy in the relcache */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
- rel->rd_partcheck = copyObject(my_qual);
+ rel->rd_partcheck = copyObject(result);
MemoryContextSwitchTo(oldcxt);
/* Keep the parent locked until commit */
drop cascades to table part_5
drop cascades to table part_5_a
drop cascades to table part_1
+-- more tests for certain multi-level partitioning scenarios
+create table p (a int, b int) partition by range (a, b);
+create table p1 (b int, a int not null) partition by range (b);
+create table p11 (like p1);
+alter table p11 drop a;
+alter table p11 add a int;
+alter table p11 drop a;
+alter table p11 add a int not null;
+-- attnum for key attribute 'a' is different in p, p1, and p11
+select attrelid::regclass, attname, attnum
+from pg_attribute
+where attname = 'a'
+ and (attrelid = 'p'::regclass
+ or attrelid = 'p1'::regclass
+ or attrelid = 'p11'::regclass);
+ attrelid | attname | attnum
+----------+---------+--------
+ p | a | 1
+ p1 | a | 2
+ p11 | a | 4
+(3 rows)
+
+alter table p1 attach partition p11 for values from (2) to (5);
+insert into p1 (a, b) values (2, 3);
+-- check that partition validation scan correctly detects violating rows
+alter table p attach partition p1 for values from (1, 2) to (1, 10);
+ERROR: partition constraint is violated by some row
+-- cleanup
+drop table p, p1 cascade;
+NOTICE: drop cascades to table p11
-- cleanup
DROP TABLE list_parted, list_parted2, range_parted CASCADE;
+
+-- more tests for certain multi-level partitioning scenarios
+create table p (a int, b int) partition by range (a, b);
+create table p1 (b int, a int not null) partition by range (b);
+create table p11 (like p1);
+alter table p11 drop a;
+alter table p11 add a int;
+alter table p11 drop a;
+alter table p11 add a int not null;
+-- attnum for key attribute 'a' is different in p, p1, and p11
+select attrelid::regclass, attname, attnum
+from pg_attribute
+where attname = 'a'
+ and (attrelid = 'p'::regclass
+ or attrelid = 'p1'::regclass
+ or attrelid = 'p11'::regclass);
+
+alter table p1 attach partition p11 for values from (2) to (5);
+
+insert into p1 (a, b) values (2, 3);
+-- check that partition validation scan correctly detects violating rows
+alter table p attach partition p1 for values from (1, 2) to (1, 10);
+
+-- cleanup
+drop table p, p1 cascade;