From 3d3ca010aa7d74f9f88ee7f3659c49dee0dcee6b Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 25 Aug 2000 18:05:54 +0000
Subject: [PATCH] Avoid creating a TOAST table if we can prove that the maximum
 tuple length is < TOAST_TUPLE_THRESHOLD, even with toastable column types
 present.  For example, CREATE TABLE foo (f1 int, f2 varchar(100)) does not
 require a toast table, even though varchar is a toastable type.

---
 src/backend/commands/command.c      | 182 +++++++++++++++-------------
 src/backend/utils/adt/format_type.c |  50 +++++++-
 src/include/utils/builtins.h        |   3 +-
 3 files changed, 149 insertions(+), 86 deletions(-)

diff --git a/src/backend/commands/command.c b/src/backend/commands/command.c
index 13d0ad5649..60405ebd27 100644
--- a/src/backend/commands/command.c
+++ b/src/backend/commands/command.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/commands/Attic/command.c,v 1.95 2000/08/21 17:22:32 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/commands/Attic/command.c,v 1.96 2000/08/25 18:05:54 tgl Exp $
  *
  * NOTES
  *	  The PerformAddAttribute() code, like most of the relation
@@ -19,6 +19,7 @@
  */
 #include "postgres.h"
 
+#include "access/tuptoaster.h"
 #include "catalog/catalog.h"
 #include "catalog/catname.h"
 #include "catalog/index.h"
@@ -52,6 +53,9 @@
 #include "access/genam.h"
 
 
+static bool needs_toast_table(Relation rel);
+
+
 /* --------------------------------
  *		PortalCleanup
  * --------------------------------
@@ -715,6 +719,7 @@ systable_beginscan(Relation rel, const char *indexRelname, int nkeys, ScanKey en
 		sysscan->scan = heap_beginscan(rel, false, SnapshotNow, nkeys, entry);
 	return (void *) sysscan;
 }
+
 static void
 systable_endscan(void *scan)
 {
@@ -731,6 +736,7 @@ systable_endscan(void *scan)
 		heap_endscan(sysscan->scan);
 	pfree(scan);
 }
+
 static HeapTuple
 systable_getnext(void *scan)
 {
@@ -780,6 +786,7 @@ find_attribute_walker(Node *node, int attnum)
 	}
 	return expression_tree_walker(node, find_attribute_walker, (void *) attnum);
 }
+
 static bool
 find_attribute_in_node(Node *node, int attnum)
 {
@@ -1377,17 +1384,14 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 	HeapTuple			reltup;
 	HeapTupleData		classtuple;
 	TupleDesc			tupdesc;
-	Form_pg_attribute  *att;
 	Relation			class_rel;
 	Buffer				buffer;
 	Relation			ridescs[Num_pg_class_indices];
 	Oid					toast_relid;
 	Oid					toast_idxid;
-	bool				has_toastable_attrs = false;
-	int					i;
 	char				toast_relname[NAMEDATALEN + 1];
 	char				toast_idxname[NAMEDATALEN + 1];
-	Relation			toast_rel;
+	Relation			toast_idxrel;
 	IndexInfo		   *indexInfo;
 	Oid					classObjectId[1];
 
@@ -1400,15 +1404,23 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 #endif
 
 	/*
-	 * lock the pg_class tuple for update
+	 * Grab an exclusive lock on the target table, which we will NOT
+	 * release until end of transaction.
+	 */
+	rel = heap_openr(relationName, AccessExclusiveLock);
+	myrelid = RelationGetRelid(rel);
+
+	/*
+	 * lock the pg_class tuple for update (is that really needed?)
 	 */
+	class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
+
 	reltup = SearchSysCacheTuple(RELNAME, PointerGetDatum(relationName),
 								 0, 0, 0);
-
 	if (!HeapTupleIsValid(reltup))
 		elog(ERROR, "ALTER TABLE: relation \"%s\" not found",
 			 relationName);
-	class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
+
 	classtuple.t_self = reltup->t_self;
 	switch (heap_mark4update(class_rel, &classtuple, &buffer))
 	{
@@ -1422,27 +1434,18 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 	ReleaseBuffer(buffer);
 
 	/*
-	 * Grab an exclusive lock on the target table, which we will NOT
-	 * release until end of transaction.
-	 */
-	rel = heap_openr(relationName, AccessExclusiveLock);
-	myrelid = RelationGetRelid(rel);
-
-	/*
-	 * Check if there are any toastable attributes on the table
+	 * XXX is the following check sufficient? At least it would
+	 * allow to create TOAST tables for views. But why not - someone
+	 * can insert into a view, so it shouldn't be impossible to hide
+	 * huge data there :-)
 	 */
-	tupdesc = rel->rd_att;
-	att = tupdesc->attrs;
-	for (i = 0; i < tupdesc->natts; i++)
+	if (((Form_pg_class) GETSTRUCT(reltup))->relkind != RELKIND_RELATION)
 	{
-		if (att[i]->attstorage != 'p')
-		{
-			has_toastable_attrs = true;
-			break;
-		}
+		elog(ERROR, "ALTER TABLE: relation \"%s\" is not a table",
+				relationName);
 	}
 
-	if (!has_toastable_attrs)
+	if (((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid != InvalidOid)
 	{
 	    if (silent)
 		{
@@ -1452,24 +1455,14 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 			return;
 		}
 
-		elog(ERROR, "ALTER TABLE: relation \"%s\" has no toastable attributes",
-				relationName);
-	}
-
+		elog(ERROR, "ALTER TABLE: relation \"%s\" already has a toast table",
+			 relationName);
+    }
 
 	/*
-	 * XXX is the following check sufficient? At least it would
-	 * allow to create TOAST tables for views. But why not - someone
-	 * can insert into a view, so it shouldn't be impossible to hide
-	 * huge data there :-)
+	 * Check to see whether the table actually needs a TOAST table.
 	 */
-	if (((Form_pg_class) GETSTRUCT(reltup))->relkind != RELKIND_RELATION)
-	{
-		elog(ERROR, "ALTER TABLE: relation \"%s\" is not a table",
-				relationName);
-	}
-
-	if (((Form_pg_class) GETSTRUCT(reltup))->reltoastrelid != InvalidOid)
+	if (! needs_toast_table(rel))
 	{
 	    if (silent)
 		{
@@ -1479,9 +1472,9 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 			return;
 		}
 
-		elog(ERROR, "ALTER TABLE: relation \"%s\" already has a toast table",
-				relationName);
-    }
+		elog(ERROR, "ALTER TABLE: relation \"%s\" does not need a toast table",
+			 relationName);
+	}
 
 	/*
 	 * Create the toast table and its index
@@ -1518,8 +1511,9 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 	 * collision, and the toast rel will be destroyed when its master is,
 	 * so there's no need to handle the toast rel as temp.
 	 */
-	heap_create_with_catalog(toast_relname, tupdesc, RELKIND_TOASTVALUE,
-							 false, true);
+	toast_relid = heap_create_with_catalog(toast_relname, tupdesc,
+										   RELKIND_TOASTVALUE,
+										   false, true);
 
 	/* make the toast relation visible, else index creation will fail */
 	CommandCounterIncrement();
@@ -1540,18 +1534,18 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 				 BTREE_AM_OID, classObjectId,
 				 false, false, true);
 
-	/* make the index visible in this transaction */
-	CommandCounterIncrement();
+	/*
+	 * Update toast rel's pg_class entry to show that it has an index.
+	 * NOTE this also does CommandCounterIncrement() to make index visible.
+	 */
+	setRelhasindexInplace(toast_relid, true, false);
 
 	/*
-	 * Get the OIDs of the newly created objects
+	 * Get the OID of the newly created index
 	 */
-	toast_rel = heap_openr(toast_relname, NoLock);
-	toast_relid = RelationGetRelid(toast_rel);
-	heap_close(toast_rel, NoLock);
-	toast_rel = index_openr(toast_idxname);
-	toast_idxid = RelationGetRelid(toast_rel);
-	index_close(toast_rel);
+	toast_idxrel = index_openr(toast_idxname);
+	toast_idxid = RelationGetRelid(toast_idxrel);
+	index_close(toast_idxrel);
 
 	/*
 	 * Store the toast table- and index-Oid's in the relation tuple
@@ -1569,36 +1563,6 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 
 	heap_freetuple(reltup);
 
-	/*
-	 * Finally update the toast relations pg_class tuple to say
-	 * it has an index.
-	 */
-	reltup = SearchSysCacheTuple(RELNAME, PointerGetDatum(toast_relname),
-								 0, 0, 0);
-	if (!HeapTupleIsValid(reltup))
-		elog(ERROR, "ALTER TABLE: just created toast relation \"%s\" not found",
-			 toast_relname);
-	classtuple.t_self = reltup->t_self;
-	switch (heap_mark4update(class_rel, &classtuple, &buffer))
-	{
-		case HeapTupleSelfUpdated:
-		case HeapTupleMayBeUpdated:
-			break;
-		default:
-			elog(ERROR, "couldn't lock pg_class tuple");
-	}
-	reltup = heap_copytuple(&classtuple);
-	ReleaseBuffer(buffer);
-
-	((Form_pg_class) GETSTRUCT(reltup))->relhasindex = true;
-	heap_update(class_rel, &reltup->t_self, reltup, NULL);
-
-	CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, ridescs);
-	CatalogIndexInsert(ridescs, Num_pg_class_indices, class_rel, reltup);
-	CatalogCloseIndices(Num_pg_class_indices, ridescs);
-
-	heap_freetuple(reltup);
-
 	/*
 	 * Close relations and make changes visible
 	 */
@@ -1608,6 +1572,56 @@ AlterTableCreateToastTable(const char *relationName, bool silent)
 	CommandCounterIncrement();
 }
 
+/*
+ * Check to see whether the table needs a TOAST table.  It does only if
+ * (1) there are any toastable attributes, and (2) the maximum length
+ * of a tuple could exceed TOAST_TUPLE_THRESHOLD.  (We don't want to
+ * create a toast table for something like "f1 varchar(20)".)
+ */
+static bool
+needs_toast_table(Relation rel)
+{
+	int32		data_length = 0;
+	bool		maxlength_unknown = false;
+	bool		has_toastable_attrs = false;
+	TupleDesc	tupdesc;
+	Form_pg_attribute  *att;
+	int32		tuple_length;
+	int			i;
+
+	tupdesc = rel->rd_att;
+	att = tupdesc->attrs;
+
+	for (i = 0; i < tupdesc->natts; i++)
+	{
+		data_length = att_align(data_length, att[i]->attlen, att[i]->attalign);
+		if (att[i]->attlen >= 0)
+		{
+			/* Fixed-length types are never toastable */
+			data_length += att[i]->attlen;
+		}
+		else
+		{
+			int32	maxlen = type_maximum_size(att[i]->atttypid,
+											   att[i]->atttypmod);
+
+			if (maxlen < 0)
+				maxlength_unknown = true;
+			else
+				data_length += maxlen;
+			if (att[i]->attstorage != 'p')
+				has_toastable_attrs = true;
+		}
+	}
+	if (!has_toastable_attrs)
+		return false;			/* nothing to toast? */
+	if (maxlength_unknown)
+		return true;			/* any unlimited-length attrs? */
+	tuple_length = MAXALIGN(offsetof(HeapTupleHeaderData, t_bits) +
+							BITMAPLEN(tupdesc->natts)) +
+		MAXALIGN(data_length);
+	return (tuple_length > TOAST_TUPLE_THRESHOLD);
+}
 
 
 /*
diff --git a/src/backend/utils/adt/format_type.c b/src/backend/utils/adt/format_type.c
index 003ab35aa1..19a8849b81 100644
--- a/src/backend/utils/adt/format_type.c
+++ b/src/backend/utils/adt/format_type.c
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/utils/adt/format_type.c,v 1.3 2000/08/21 18:23:18 tgl Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/utils/adt/format_type.c,v 1.4 2000/08/25 18:05:54 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -20,6 +20,7 @@
 #include "fmgr.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/numeric.h"
 #include "utils/syscache.h"
 
 #define MAX_INT32_LEN 11
@@ -214,6 +215,53 @@ format_type_internal(Oid type_oid, int32 typemod)
 }
 
 
+/*
+ * type_maximum_size --- determine maximum length of a varlena column
+ *
+ * If the max length is indeterminate, return -1.  In particular, we return
+ * -1 for any type not known to this routine.  We assume the caller has
+ * already determined that the type is a varlena type, so it's not
+ * necessary to look up the type's pg_type tuple here.
+ *
+ * This may appear unrelated to format_type(), but in fact the two routines
+ * share knowledge of the encoding of typmod for different types, so it's
+ * convenient to keep them together.
+ */
+int32
+type_maximum_size(Oid type_oid, int32 typemod)
+{
+	if (typemod <= 0)
+		return -1;
+
+	switch (type_oid)
+	{
+		case BPCHAROID:
+		case VARCHAROID:
+			/* typemod includes varlena header */
+			return typemod;
+
+		case NUMERICOID:
+			/* precision (ie, max # of digits) is in upper bits of typmod */
+			if (typemod > VARHDRSZ)
+			{
+				int		precision = ((typemod - VARHDRSZ) >> 16) & 0xffff;
+
+				/* Numeric stores 2 decimal digits/byte, plus header */
+				return (precision + 1) / 2 + NUMERIC_HDRSZ;
+			}
+			break;
+
+		case VARBITOID:
+		case ZPBITOID:
+			/* typemod is the (max) number of bits */
+			return (typemod + (BITSPERBYTE-1)) / BITSPERBYTE
+				+ 2 * sizeof(int32);
+	}
+
+	/* Unknown type, or unlimited-length type such as 'text' */
+	return -1;
+}
+
 
 /*
  * oidvectortypes			- converts a vector of type OIDs to "typname" list
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index f1ebe40b59..2c42899b1c 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: builtins.h,v 1.134 2000/08/24 03:29:14 tgl Exp $
+ * $Id: builtins.h,v 1.135 2000/08/25 18:05:53 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -583,5 +583,6 @@ extern Datum PG_char_to_encoding(PG_FUNCTION_ARGS);
 /* format_type.c */
 extern Datum format_type(PG_FUNCTION_ARGS);
 extern Datum oidvectortypes(PG_FUNCTION_ARGS);
+extern int32 type_maximum_size(Oid type_oid, int32 typemod);
 
 #endif	 /* BUILTINS_H */
-- 
2.40.0