]> granicus.if.org Git - postgresql/blobdiff - src/backend/utils/init/flatfiles.c
Message style improvements
[postgresql] / src / backend / utils / init / flatfiles.c
index 7087a9887d413a9b76751926eca9e10d7e1e4532..8867ec3e015fc520582de52978bc42821fc99b3d 100644 (file)
@@ -4,9 +4,9 @@
  *       Routines for maintaining "flat file" images of the shared catalogs.
  *
  * We use flat files so that the postmaster and not-yet-fully-started
- * backends can look at the contents of pg_database, pg_authid, and 
- * pg_auth_members for authentication purposes.  This module is 
- * responsible for keeping the flat-file images as nearly in sync with 
+ * backends can look at the contents of pg_database, pg_authid, and
+ * pg_auth_members for authentication purposes.  This module is
+ * responsible for keeping the flat-file images as nearly in sync with
  * database reality as possible.
  *
  * The tricky part of the write_xxx_file() routines in this module is that
  * a way that this is OK.
  *
  *
- * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.13 2005/07/28 22:27:02 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/init/flatfiles.c,v 1.21 2006/07/14 14:52:25 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -33,7 +33,9 @@
 #include <unistd.h>
 
 #include "access/heapam.h"
+#include "access/transam.h"
 #include "access/twophase_rmgr.h"
+#include "access/xact.h"
 #include "catalog/pg_auth_members.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_database.h"
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/pmsignal.h"
-#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/flatfiles.h"
 #include "utils/resowner.h"
-#include "utils/syscache.h"
 
 
 /* Actual names of the flat files (within $PGDATA) */
@@ -55,7 +55,7 @@
 #define AUTH_FLAT_FILE         "global/pg_auth"
 
 /* Info bits in a flatfiles 2PC record */
-#define FF_BIT_DATABASE        1
+#define FF_BIT_DATABASE 1
 #define FF_BIT_AUTH            2
 
 
@@ -163,7 +163,7 @@ name_okay(const char *str)
 /*
  * write_database_file: update the flat database file
  *
- * A side effect is to determine the oldest database's datfrozenxid
+ * A side effect is to determine the oldest database's datminxid
  * so we can set or update the XID wrap limit.
  */
 static void
@@ -177,12 +177,12 @@ write_database_file(Relation drel)
        HeapScanDesc scan;
        HeapTuple       tuple;
        NameData        oldest_datname;
-       TransactionId oldest_datfrozenxid = InvalidTransactionId;
+       TransactionId oldest_datminxid = InvalidTransactionId;
 
        /*
         * Create a temporary filename to be renamed later.  This prevents the
-        * backend from clobbering the flat file while the postmaster
-        * might be reading from it.
+        * backend from clobbering the flat file while the postmaster might be
+        * reading from it.
         */
        filename = database_getflatfilename();
        bufsize = strlen(filename) + 12;
@@ -208,25 +208,27 @@ write_database_file(Relation drel)
                char       *datname;
                Oid                     datoid;
                Oid                     dattablespace;
-               TransactionId datfrozenxid;
+               TransactionId datminxid,
+                                       datvacuumxid;
 
                datname = NameStr(dbform->datname);
                datoid = HeapTupleGetOid(tuple);
                dattablespace = dbform->dattablespace;
-               datfrozenxid = dbform->datfrozenxid;
+               datminxid = dbform->datminxid;
+               datvacuumxid = dbform->datvacuumxid;
 
                /*
-                * Identify the oldest datfrozenxid, ignoring databases that are not
-                * connectable (we assume they are safely frozen).  This must match
+                * Identify the oldest datminxid, ignoring databases that are not
+                * connectable (we assume they are safely frozen).      This must match
                 * the logic in vac_truncate_clog() in vacuum.c.
                 */
                if (dbform->datallowconn &&
-                       TransactionIdIsNormal(datfrozenxid))
+                       TransactionIdIsNormal(datminxid))
                {
-                       if (oldest_datfrozenxid == InvalidTransactionId ||
-                               TransactionIdPrecedes(datfrozenxid, oldest_datfrozenxid))
+                       if (oldest_datminxid == InvalidTransactionId ||
+                               TransactionIdPrecedes(datminxid, oldest_datminxid))
                        {
-                               oldest_datfrozenxid = datfrozenxid;
+                               oldest_datminxid = datminxid;
                                namestrcpy(&oldest_datname, datname);
                        }
                }
@@ -242,13 +244,14 @@ write_database_file(Relation drel)
                }
 
                /*
-                * The file format is: "dbname" oid tablespace frozenxid
+                * The file format is: "dbname" oid tablespace minxid vacuumxid
                 *
-                * The xid is not needed for backend startup, but may be of use
-                * for forensic purposes.
+                * The xids are not needed for backend startup, but are of use to
+                * autovacuum, and might also be helpful for forensic purposes.
                 */
                fputs_quote(datname, fp);
-               fprintf(fp, " %u %u %u\n", datoid, dattablespace, datfrozenxid);
+               fprintf(fp, " %u %u %u %u\n",
+                               datoid, dattablespace, datminxid, datvacuumxid);
        }
        heap_endscan(scan);
 
@@ -259,8 +262,8 @@ write_database_file(Relation drel)
                                                tempname)));
 
        /*
-        * Rename the temp file to its final name, deleting the old flat file.
-        * We expect that rename(2) is an atomic action.
+        * Rename the temp file to its final name, deleting the old flat file. We
+        * expect that rename(2) is an atomic action.
         */
        if (rename(tempname, filename))
                ereport(ERROR,
@@ -269,10 +272,10 @@ write_database_file(Relation drel)
                                                tempname, filename)));
 
        /*
-        * Set the transaction ID wrap limit using the oldest datfrozenxid
+        * Set the transaction ID wrap limit using the oldest datminxid
         */
-       if (oldest_datfrozenxid != InvalidTransactionId)
-               SetTransactionIdLimit(oldest_datfrozenxid, &oldest_datname);
+       if (oldest_datminxid != InvalidTransactionId)
+               SetTransactionIdLimit(oldest_datminxid, &oldest_datname);
 }
 
 
@@ -292,16 +295,18 @@ write_database_file(Relation drel)
  * and build data structures in-memory before writing the file.
  */
 
-typedef struct {
+typedef struct
+{
        Oid                     roleid;
        bool            rolcanlogin;
-       char*           rolname;
-       char*           rolpassword;
-       char*           rolvaliduntil;
-       List*           member_of;
+       char       *rolname;
+       char       *rolpassword;
+       char       *rolvaliduntil;
+       List       *member_of;
 } auth_entry;
 
-typedef struct {
+typedef struct
+{
        Oid                     roleid;
        Oid                     memberid;
 } authmem_entry;
@@ -311,11 +316,13 @@ typedef struct {
 static int
 oid_compar(const void *a, const void *b)
 {
-       const auth_entry *a_auth = (const auth_entry*) a;
-       const auth_entry *b_auth = (const auth_entry*) b;
+       const auth_entry *a_auth = (const auth_entry *) a;
+       const auth_entry *b_auth = (const auth_entry *) b;
 
-       if (a_auth->roleid < b_auth->roleid) return -1;
-       if (a_auth->roleid > b_auth->roleid) return 1;
+       if (a_auth->roleid < b_auth->roleid)
+               return -1;
+       if (a_auth->roleid > b_auth->roleid)
+               return 1;
        return 0;
 }
 
@@ -323,21 +330,23 @@ oid_compar(const void *a, const void *b)
 static int
 name_compar(const void *a, const void *b)
 {
-       const auth_entry *a_auth = (const auth_entry*) a;
-       const auth_entry *b_auth = (const auth_entry*) b;
+       const auth_entry *a_auth = (const auth_entry *) a;
+       const auth_entry *b_auth = (const auth_entry *) b;
 
-       return strcmp(a_auth->rolname,b_auth->rolname);
+       return strcmp(a_auth->rolname, b_auth->rolname);
 }
 
 /* qsort comparator for sorting authmem_entry array by memberid */
 static int
 mem_compar(const void *a, const void *b)
 {
-       const authmem_entry *a_auth = (const authmem_entry*) a;
-       const authmem_entry *b_auth = (const authmem_entry*) b;
+       const authmem_entry *a_auth = (const authmem_entry *) a;
+       const authmem_entry *b_auth = (const authmem_entry *) b;
 
-       if (a_auth->memberid < b_auth->memberid) return -1;
-       if (a_auth->memberid > b_auth->memberid) return 1;
+       if (a_auth->memberid < b_auth->memberid)
+               return -1;
+       if (a_auth->memberid > b_auth->memberid)
+               return 1;
        return 0;
 }
 
@@ -351,7 +360,7 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
        char       *filename,
                           *tempname;
        int                     bufsize;
-       BlockNumber     totalblocks;
+       BlockNumber totalblocks;
        FILE       *fp;
        mode_t          oumask;
        HeapScanDesc scan;
@@ -361,13 +370,13 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
        int                     curr_mem = 0;
        int                     total_mem = 0;
        int                     est_rows;
-       auth_entry  *auth_info;
+       auth_entry *auth_info;
        authmem_entry *authmem_info;
 
        /*
         * Create a temporary filename to be renamed later.  This prevents the
-        * backend from clobbering the flat file while the postmaster might
-        * be reading from it.
+        * backend from clobbering the flat file while the postmaster might be
+        * reading from it.
         */
        filename = auth_getflatfilename();
        bufsize = strlen(filename) + 12;
@@ -384,29 +393,29 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                                                tempname)));
 
        /*
-        * Read pg_authid and fill temporary data structures.  Note we must
-        * read all roles, even those without rolcanlogin.
+        * Read pg_authid and fill temporary data structures.  Note we must read
+        * all roles, even those without rolcanlogin.
         */
        totalblocks = RelationGetNumberOfBlocks(rel_authid);
        totalblocks = totalblocks ? totalblocks : 1;
-       est_rows = totalblocks * (BLCKSZ / (sizeof(HeapTupleHeaderData)+sizeof(FormData_pg_authid)));
-       auth_info = (auth_entry*) palloc(est_rows*sizeof(auth_entry));
+       est_rows = totalblocks * (BLCKSZ / (sizeof(HeapTupleHeaderData) + sizeof(FormData_pg_authid)));
+       auth_info = (auth_entry *) palloc(est_rows * sizeof(auth_entry));
 
        scan = heap_beginscan(rel_authid, SnapshotNow, 0, NULL);
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
                Form_pg_authid aform = (Form_pg_authid) GETSTRUCT(tuple);
                HeapTupleHeader tup = tuple->t_data;
-               char       *tp;                         /* ptr to tuple data */
-               long            off;                    /* offset in tuple data */
+               char       *tp;                 /* ptr to tuple data */
+               long            off;            /* offset in tuple data */
                bits8      *bp = tup->t_bits;   /* ptr to null bitmask in tuple */
                Datum           datum;
 
                if (curr_role >= est_rows)
                {
                        est_rows *= 2;
-                       auth_info = (auth_entry*)
-                               repalloc(auth_info, est_rows*sizeof(auth_entry));
+                       auth_info = (auth_entry *)
+                               repalloc(auth_info, est_rows * sizeof(auth_entry));
                }
 
                auth_info[curr_role].roleid = HeapTupleGetOid(tuple);
@@ -415,10 +424,10 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                auth_info[curr_role].member_of = NIL;
 
                /*
-                * We can't use heap_getattr() here because during startup we will
-                * not have any tupdesc for pg_authid.  Fortunately it's not too
-                * hard to work around this.  rolpassword is the first possibly-null
-                * field so we can compute its offset directly.
+                * We can't use heap_getattr() here because during startup we will not
+                * have any tupdesc for pg_authid.      Fortunately it's not too hard to
+                * work around this.  rolpassword is the first possibly-null field so
+                * we can compute its offset directly.
                 */
                tp = (char *) tup + tup->t_hoff;
                off = offsetof(FormData_pg_authid, rolpassword);
@@ -435,8 +444,8 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                        datum = PointerGetDatum(tp + off);
 
                        /*
-                        * The password probably shouldn't ever be out-of-line toasted;
-                        * if it is, ignore it, since we can't handle that in startup mode.
+                        * The password probably shouldn't ever be out-of-line toasted; if
+                        * it is, ignore it, since we can't handle that in startup mode.
                         */
                        if (VARATT_IS_EXTERNAL(DatumGetPointer(datum)))
                                auth_info[curr_role].rolpassword = pstrdup("");
@@ -492,8 +501,8 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
         */
        totalblocks = RelationGetNumberOfBlocks(rel_authmem);
        totalblocks = totalblocks ? totalblocks : 1;
-       est_rows = totalblocks * (BLCKSZ / (sizeof(HeapTupleHeaderData)+sizeof(FormData_pg_auth_members)));
-       authmem_info = (authmem_entry*) palloc(est_rows*sizeof(authmem_entry));
+       est_rows = totalblocks * (BLCKSZ / (sizeof(HeapTupleHeaderData) + sizeof(FormData_pg_auth_members)));
+       authmem_info = (authmem_entry *) palloc(est_rows * sizeof(authmem_entry));
 
        scan = heap_beginscan(rel_authmem, SnapshotNow, 0, NULL);
        while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
@@ -503,8 +512,8 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                if (curr_mem >= est_rows)
                {
                        est_rows *= 2;
-                       authmem_info = (authmem_entry*)
-                               repalloc(authmem_info, est_rows*sizeof(authmem_entry));
+                       authmem_info = (authmem_entry *)
+                               repalloc(authmem_info, est_rows * sizeof(authmem_entry));
                }
 
                authmem_info[curr_mem].roleid = memform->roleid;
@@ -515,8 +524,8 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
        heap_endscan(scan);
 
        /*
-        * Search for memberships.  We can skip all this if pg_auth_members
-        * is empty.
+        * Search for memberships.      We can skip all this if pg_auth_members is
+        * empty.
         */
        if (total_mem > 0)
        {
@@ -525,22 +534,23 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                 */
                qsort(auth_info, total_roles, sizeof(auth_entry), oid_compar);
                qsort(authmem_info, total_mem, sizeof(authmem_entry), mem_compar);
+
                /*
                 * For each role, find what it belongs to.
                 */
                for (curr_role = 0; curr_role < total_roles; curr_role++)
                {
-                       List    *roles_list;
-                       List    *roles_names_list = NIL;
-                       ListCell *mem;
+                       List       *roles_list;
+                       List       *roles_names_list = NIL;
+                       ListCell   *mem;
 
                        /* We can skip this for non-login roles */
                        if (!auth_info[curr_role].rolcanlogin)
                                continue;
 
                        /*
-                        * This search algorithm is the same as in is_member_of_role;
-                        * we are just working with a different input data structure.
+                        * This search algorithm is the same as in is_member_of_role; we
+                        * are just working with a different input data structure.
                         */
                        roles_list = list_make1_oid(auth_info[curr_role].roleid);
 
@@ -548,17 +558,20 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                        {
                                authmem_entry key;
                                authmem_entry *found_mem;
-                               int             first_found, last_found, i;
+                               int                     first_found,
+                                                       last_found,
+                                                       i;
 
                                key.memberid = lfirst_oid(mem);
                                found_mem = bsearch(&key, authmem_info, total_mem,
                                                                        sizeof(authmem_entry), mem_compar);
                                if (!found_mem)
                                        continue;
+
                                /*
-                                * bsearch found a match for us; but if there were
-                                * multiple matches it could have found any one of them.
-                                * Locate first and last match.
+                                * bsearch found a match for us; but if there were multiple
+                                * matches it could have found any one of them. Locate first
+                                * and last match.
                                 */
                                first_found = last_found = (found_mem - authmem_info);
                                while (first_found > 0 &&
@@ -567,30 +580,31 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                                while (last_found + 1 < total_mem &&
                                           mem_compar(&key, &authmem_info[last_found + 1]) == 0)
                                        last_found++;
+
                                /*
                                 * Now add all the new roles to roles_list.
                                 */
                                for (i = first_found; i <= last_found; i++)
                                        roles_list = list_append_unique_oid(roles_list,
-                                                                                                               authmem_info[i].roleid);
+                                                                                                        authmem_info[i].roleid);
                        }
 
                        /*
-                        * Convert list of role Oids to list of role names.
-                        * We must do this before re-sorting auth_info.
+                        * Convert list of role Oids to list of role names. We must do
+                        * this before re-sorting auth_info.
                         *
                         * We skip the first list element (curr_role itself) since there
                         * is no point in writing that a role is a member of itself.
                         */
                        for_each_cell(mem, lnext(list_head(roles_list)))
                        {
-                               auth_entry key_auth;
+                               auth_entry      key_auth;
                                auth_entry *found_role;
 
                                key_auth.roleid = lfirst_oid(mem);
                                found_role = bsearch(&key_auth, auth_info, total_roles,
                                                                         sizeof(auth_entry), oid_compar);
-                               if (found_role)                 /* paranoia */
+                               if (found_role) /* paranoia */
                                        roles_names_list = lappend(roles_names_list,
                                                                                           found_role->rolname);
                        }
@@ -610,7 +624,7 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
 
                if (arole->rolcanlogin)
                {
-                       ListCell *mem;
+                       ListCell   *mem;
 
                        fputs_quote(arole->rolname, fp);
                        fputs(" ", fp);
@@ -635,8 +649,8 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
                                                tempname)));
 
        /*
-        * Rename the temp file to its final name, deleting the old flat file.
-        * We expect that rename(2) is an atomic action.
+        * Rename the temp file to its final name, deleting the old flat file. We
+        * expect that rename(2) is an atomic action.
         */
        if (rename(tempname, filename))
                ereport(ERROR,
@@ -654,8 +668,10 @@ write_auth_file(Relation rel_authid, Relation rel_authmem)
  * base backup which may be far out of sync with the current state.
  *
  * In theory we could skip rebuilding the flat files if no WAL replay
- * occurred, but it seems safest to just do it always.  We have to
- * scan pg_database to compute the XID wrap limit anyway.
+ * occurred, but it seems best to just do it always.  We have to
+ * scan pg_database to compute the XID wrap limit anyway.  Also, this
+ * policy means we need not force initdb to change the format of the
+ * flat files.
  *
  * In a standalone backend we pass database_only = true to skip processing
  * the auth file.  We won't need it, and building it could fail if there's
@@ -666,11 +682,13 @@ BuildFlatFiles(bool database_only)
 {
        ResourceOwner owner;
        RelFileNode rnode;
-       Relation        rel_db, rel_authid, rel_authmem;
+       Relation        rel_db,
+                               rel_authid,
+                               rel_authmem;
 
        /*
-        * We don't have any hope of running a real relcache, but we can use
-        * the same fake-relcache facility that WAL replay uses.
+        * We don't have any hope of running a real relcache, but we can use the
+        * same fake-relcache facility that WAL replay uses.
         */
        XLogInitRelationCache();
 
@@ -744,31 +762,55 @@ AtEOXact_UpdateFlatFiles(bool isCommit)
        }
 
        /*
-        * Advance command counter to be certain we see all effects of the
-        * current transaction.
+        * Advance command counter to be certain we see all effects of the current
+        * transaction.
         */
        CommandCounterIncrement();
 
        /*
-        * We use ExclusiveLock to ensure that only one backend writes the
-        * flat file(s) at a time.      That's sufficient because it's okay to
-        * allow plain reads of the tables in parallel.  There is some chance
-        * of a deadlock here (if we were triggered by a user update of one
-        * of the tables, which likely won't have gotten a strong enough lock),
-        * so get the locks we need before writing anything.
+        * Open and lock the needed catalog(s).
         *
-        * For writing the auth file, it's sufficient to ExclusiveLock pg_authid;
-        * we take just regular AccessShareLock on pg_auth_members.
+        * Even though we only need AccessShareLock, this could theoretically fail
+        * due to deadlock.  In practice, however, our transaction already holds
+        * RowExclusiveLock or better (it couldn't have updated the catalog
+        * without such a lock).  This implies that dbcommands.c and other places
+        * that force flat-file updates must not follow the common practice of
+        * dropping catalog locks before commit.
         */
        if (database_file_update_subid != InvalidSubTransactionId)
-               drel = heap_open(DatabaseRelationId, ExclusiveLock);
+               drel = heap_open(DatabaseRelationId, AccessShareLock);
 
        if (auth_file_update_subid != InvalidSubTransactionId)
        {
-               arel = heap_open(AuthIdRelationId, ExclusiveLock);
+               arel = heap_open(AuthIdRelationId, AccessShareLock);
                mrel = heap_open(AuthMemRelationId, AccessShareLock);
        }
 
+       /*
+        * Obtain special locks to ensure that two transactions don't try to write
+        * the same flat file concurrently.  Quite aside from any direct risks of
+        * corrupted output, the winning writer probably wouldn't have seen the
+        * other writer's updates.  By taking a lock and holding it till commit,
+        * we ensure that whichever updater goes second will see the other
+        * updater's changes as committed, and thus the final state of the file
+        * will include all updates.
+        *
+        * We use a lock on "database 0" to protect writing the pg_database flat
+        * file, and a lock on "role 0" to protect the auth file.  This is a bit
+        * ugly but it's not worth inventing any more-general convention.  (Any
+        * two locktags that are never used for anything else would do.)
+        *
+        * This is safe against deadlock as long as these are the very last locks
+        * acquired during the transaction.
+        */
+       if (database_file_update_subid != InvalidSubTransactionId)
+               LockSharedObject(DatabaseRelationId, InvalidOid, 0,
+                                                AccessExclusiveLock);
+
+       if (auth_file_update_subid != InvalidSubTransactionId)
+               LockSharedObject(AuthIdRelationId, InvalidOid, 0,
+                                                AccessExclusiveLock);
+
        /* Okay to write the files */
        if (database_file_update_subid != InvalidSubTransactionId)
        {
@@ -858,7 +900,7 @@ AtEOSubXact_UpdateFlatFiles(bool isCommit,
  * or pg_auth_members via general-purpose INSERT/UPDATE/DELETE commands.
  *
  * It is sufficient for this to be a STATEMENT trigger since we don't
- * care which individual rows changed.  It doesn't much matter whether
+ * care which individual rows changed. It doesn't much matter whether
  * it's a BEFORE or AFTER trigger.
  */
 Datum
@@ -901,11 +943,11 @@ flatfile_twophase_postcommit(TransactionId xid, uint16 info,
                                                         void *recdata, uint32 len)
 {
        /*
-        * Set flags to do the needed file updates at the end of my own
-        * current transaction.  (XXX this has some issues if my own
-        * transaction later rolls back, or if there is any significant
-        * delay before I commit.  OK for now because we disallow
-        * COMMIT PREPARED inside a transaction block.)
+        * Set flags to do the needed file updates at the end of my own current
+        * transaction.  (XXX this has some issues if my own transaction later
+        * rolls back, or if there is any significant delay before I commit.  OK
+        * for now because we disallow COMMIT PREPARED inside a transaction
+        * block.)
         */
        if (info & FF_BIT_DATABASE)
                database_file_update_needed();