From e55704d8b2fe522fbc9435acbb5bc59033478bd5 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 10 Dec 2013 18:33:45 -0500 Subject: [PATCH] Add new wal_level, logical, sufficient for logical decoding. When wal_level=logical, we'll log columns from the old tuple as configured by the REPLICA IDENTITY facility added in commit 07cacba983ef79be4a84fcd0e0ca3b5fcb85dd65. This makes it possible a properly-configured logical replication solution to correctly follow table updates even if they change the chosen key columns, or, with REPLICA IDENTITY FULL, even if the table has no key at all. Note that updates which do not modify the replica identity column won't log anything extra, making the choice of a good key (i.e. one that will rarely be changed) important to performance when wal_level=logical is configured. Each insert, update, or delete to a catalog table will also log the CMIN and/or CMAX values of stamped by the current transaction. This is necessary because logical decoding will require access to historical snapshots of the catalog in order to decode some data types, and the CMIN/CMAX values that we may need in order to judge row visibility may have been overwritten by the time we need them. Andres Freund, reviewed in various versions by myself, Heikki Linnakangas, KONDO Mitsumasa, and many others. --- doc/src/sgml/backup.sgml | 4 +- doc/src/sgml/config.sgml | 44 +- doc/src/sgml/high-availability.sgml | 5 +- src/backend/access/heap/heapam.c | 603 +++++++++++++++--- src/backend/access/rmgrdesc/heapdesc.c | 9 + src/backend/access/rmgrdesc/xlogdesc.c | 1 + src/backend/access/transam/twophase.c | 4 +- src/backend/access/transam/xact.c | 41 +- src/backend/access/transam/xlog.c | 8 +- src/backend/catalog/index.c | 2 +- src/backend/commands/trigger.c | 3 +- src/backend/postmaster/postmaster.c | 4 +- src/backend/utils/cache/relcache.c | 50 +- src/backend/utils/misc/postgresql.conf.sample | 2 +- src/bin/pg_controldata/pg_controldata.c | 2 + src/include/access/heapam_xlog.h | 70 +- src/include/access/xact.h | 1 + src/include/access/xlog.h | 8 +- src/include/access/xlog_internal.h | 2 +- src/include/utils/rel.h | 24 + src/include/utils/relcache.h | 12 +- src/tools/pgindent/typedefs.list | 3 + 22 files changed, 745 insertions(+), 157 deletions(-) diff --git a/doc/src/sgml/backup.sgml b/doc/src/sgml/backup.sgml index 995933c62d..a2361d780f 100644 --- a/doc/src/sgml/backup.sgml +++ b/doc/src/sgml/backup.sgml @@ -587,7 +587,7 @@ tar -cf backup.tar /usr/local/pgsql/data To enable WAL archiving, set the - configuration parameter to archive (or hot_standby), + configuration parameter to archive or higher, to on, and specify the shell command to use in the configuration parameter. In practice @@ -1259,7 +1259,7 @@ restore_command = 'cp /mnt/server/archivedir/%f %p' If more flexibility in copying the backup files is needed, a lower level process can be used for standalone hot backups as well. To prepare for low level standalone hot backups, set wal_level to - archive (or hot_standby), archive_mode to + archive or higher, archive_mode to on, and set up an archive_command that performs archiving only when a switch file exists. For example: diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index fee83c1496..f33a16b7aa 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1648,10 +1648,12 @@ include 'filename' wal_level determines how much information is written to the WAL. The default value is minimal, which writes only the information needed to recover from a crash or immediate - shutdown. archive adds logging required for WAL archiving, - and hot_standby further adds information required to run - read-only queries on a standby server. - This parameter can only be set at server start. + shutdown. archive adds logging required for WAL archiving; + hot_standby further adds information required to run + read-only queries on a standby server; and, finally + logical adds information necessary to support logical + decoding. Each level includes the information logged at all lower + levels. This parameter can only be set at server start. In minimal level, WAL-logging of some bulk @@ -1665,24 +1667,30 @@ include 'filename' COPY into tables that were created or truncated in the same transaction - But minimal WAL does not contain - enough information to reconstruct the data from a base backup and the - WAL logs, so either archive or hot_standby - level must be used to enable - WAL archiving () and streaming - replication. + But minimal WAL does not contain enough information to reconstruct the + data from a base backup and the WAL logs, so archive or + higher must be used to enable WAL archiving + () and streaming replication. In hot_standby level, the same information is logged as with archive, plus information needed to reconstruct the status of running transactions from the WAL. To enable read-only queries on a standby server, wal_level must be set to - hot_standby on the primary, and + hot_standby or higher on the primary, and must be enabled in the standby. It is - thought that there is - little measurable difference in performance between using - hot_standby and archive levels, so feedback - is welcome if any production impacts are noticeable. + thought that there is little measurable difference in performance + between using hot_standby and archive levels, + so feedback is welcome if any production impacts are noticeable. + + + In logical level, the same information is logged as + with hot_standby, plus information needed to allow + extracting logical changesets from the WAL. Using a level of + logical will increase the WAL volume, particularly if many + tables are configured for REPLICA IDENTITY FULL and + many UPDATE and DELETE statements are + executed. @@ -2239,9 +2247,9 @@ include 'filename' disabled. WAL sender processes count towards the total number of connections, so the parameter cannot be set higher than . This parameter can only - be set at server start. wal_level must be set - to archive or hot_standby to allow - connections from standby servers. + be set at server start. wal_level must be set to + archive or higher to allow connections from standby + servers. diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index c8f6fa8a54..e2e5ac93ab 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1861,8 +1861,9 @@ LOG: database system is ready to accept read only connections Consistency information is recorded once per checkpoint on the primary. It is not possible to enable hot standby when reading WAL written during a period when wal_level was not set to - hot_standby on the primary. Reaching a consistent state can - also be delayed in the presence of both of these conditions: + hot_standby or logical on the primary. Reaching + a consistent state can also be delayed in the presence of both of these + conditions: diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2035a2158f..249fffeb06 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, - HeapTuple newtup, bool all_visible_cleared, - bool new_all_visible_cleared); + HeapTuple newtup, HeapTuple old_key_tup, + bool all_visible_cleared, bool new_all_visible_cleared); static void HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, - bool *satisfies_hot, bool *satisfies_key, - HeapTuple oldtup, HeapTuple newtup); + Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, + bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, + HeapTuple oldtup, HeapTuple newtup); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status, int *remaining, uint16 infomask); +static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); +static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, + bool *copy); /* @@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + bool need_tuple_data; + + /* + * For logical decoding, we need the tuple even if we're doing a + * full page write, so make sure to log it separately. (XXX We could + * alternatively store a pointer into the FPW). + * + * Also, if this is a catalog, we need to transmit combocids to + * properly decode, so log that as well. + */ + need_tuple_data = RelationIsLogicallyLogged(relation); + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, heaptup); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; @@ -2126,17 +2144,35 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[2].buffer = buffer; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; + /* + * Make a separate rdata entry for the tuple's buffer if we're + * doing logical decoding, so that an eventual FPW doesn't + * remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[2].next = &(rdata[3]); + + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; + + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + /* * If this is the single and first tuple on page, we can reinit the * page instead of restoring the whole thing. Set flag, and hide @@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Page page; bool needwal; Size saveFreeSpace; + bool need_tuple_data = RelationIsLogicallyLogged(relation); + bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); saveFreeSpace = RelationGetTargetPageFreeSpace(relation, @@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, { XLogRecPtr recptr; xl_heap_multi_insert *xlrec; - XLogRecData rdata[2]; + XLogRecData rdata[3]; uint8 info = XLOG_HEAP2_MULTI_INSERT; char *tupledata; int totaldatalen; @@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->all_visible_cleared = all_visible_cleared; + xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec->node = relation->rd_node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntuples = nthispage; @@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, datalen); tuphdr->datalen = datalen; scratchptr += datalen; + + /* + * We don't use heap_multi_insert for catalog tuples yet, but + * better be prepared... + */ + if (need_cids) + log_heap_new_cid(relation, heaptup); } totaldatalen = scratchptr - tupledata; Assert((scratchptr - scratch) < BLCKSZ); @@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, rdata[1].data = tupledata; rdata[1].len = totaldatalen; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * Make a separate rdata entry for the tuple's buffer if + * we're doing logical decoding, so that an eventual FPW + * doesn't remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[1].next = &(rdata[2]); + + rdata[2].data = NULL; + rdata[2].len = 0; + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = NULL; + xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + /* * If we're going to reinitialize the whole page using the WAL * record, hide buffer reference from XLogInsert. */ if (init) { - rdata[1].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = InvalidBuffer; info |= XLOG_HEAP_INIT_PAGE; } @@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid, bool have_tuple_lock = false; bool iscombo; bool all_visible_cleared = false; + HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ + bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); @@ -2734,6 +2798,12 @@ l1: /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + START_CRIT_SECTION(); /* @@ -2786,9 +2856,13 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; + + /* For logical decode we need combocids to properly decode the catalog */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, &tp); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.target.node = relation->rd_node; @@ -2805,6 +2879,37 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_heap_header xlhdr; + + xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + rdata[1].next = &(rdata[2]); + rdata[2].data = (char*)&xlhdr; + rdata[2].len = SizeOfHeapHeader; + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + + rdata[2].next = &(rdata[3]); + rdata[3].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].buffer = InvalidBuffer; + rdata[3].next = NULL; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr); @@ -2850,6 +2955,9 @@ l1: pgstat_count_heap_delete(relation); + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + return HeapTupleMayBeUpdated; } @@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *key_attrs; + Bitmapset *id_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; + HeapTuple old_key_tuple = NULL; + bool old_key_copied = false; Page page; BlockNumber block; MultiXactStatus mxact_status; @@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, bool iscombo; bool satisfies_hot; bool satisfies_key; + bool satisfies_id; bool use_hot_update = false; bool key_intact; bool all_visible_cleared = false; @@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * Note that we get a copy here, so we need not worry about relcache flush * happening midway through. */ - hot_attrs = RelationGetIndexAttrBitmap(relation, false); - key_attrs = RelationGetIndexAttrBitmap(relation, true); + hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL); + key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY); + id_attrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); block = ItemPointerGetBlockNumber(otid); buffer = ReadBuffer(relation, block); @@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * is updates that don't manipulate key columns, not those that * serendipitiously arrive at the same key values. */ - HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, + HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs, &satisfies_hot, &satisfies_key, - &oldtup, newtup); + &satisfies_id, &oldtup, newtup); if (satisfies_key) { *lockmode = LockTupleNoKeyExclusive; @@ -3514,6 +3628,14 @@ l2: PageSetFull(page); } + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + * ExtractReplicaIdentity() will return NULL if nothing needs to be + * logged. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -3589,11 +3711,23 @@ l2: /* XLOG stuff */ if (RelationNeedsWAL(relation)) { - XLogRecPtr recptr = log_heap_update(relation, buffer, - newbuf, &oldtup, heaptup, - all_visible_cleared, - all_visible_cleared_new); + XLogRecPtr recptr; + /* + * For logical decoding we need combocids to properly decode the + * catalog. + */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + { + log_heap_new_cid(relation, &oldtup); + log_heap_new_cid(relation, heaptup); + } + + recptr = log_heap_update(relation, buffer, + newbuf, &oldtup, heaptup, + old_key_tuple, + all_visible_cleared, + all_visible_cleared_new); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -3644,6 +3778,9 @@ l2: heap_freetuple(heaptup); } + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + bms_free(hot_attrs); bms_free(key_attrs); @@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, /* * Check which columns are being updated. * - * This simultaneously checks conditions for HOT updates and for FOR KEY - * SHARE updates. Since much of the time they will be checking very similar - * sets of columns, and doing the same tests on them, it makes sense to - * optimize and do them together. + * This simultaneously checks conditions for HOT updates, for FOR KEY + * SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they + * will be checking very similar sets of columns, and doing the same tests on + * them, it makes sense to optimize and do them together. * - * We receive two bitmapsets comprising the two sets of columns we're + * We receive three bitmapsets comprising the three sets of columns we're * interested in. Note these are destructively modified; that is OK since * this is invoked at most once in heap_update. * * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not * modified indexed columns); key_result is set to TRUE if the update does not - * modify columns used in the key. + * modify columns used in the key; id_result is set to TRUE if the update does + * not modify columns in any index marked as the REPLICA IDENTITY. */ static void -HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, +HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, HeapTuple oldtup, HeapTuple newtup) { int next_hot_attnum; int next_key_attnum; + int next_id_attnum; bool hot_result = true; bool key_result = true; - bool key_done = false; - bool hot_done = false; + bool id_result = true; - next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + /* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */ + Assert(bms_is_subset(id_attrs, key_attrs)); + Assert(bms_is_subset(key_attrs, hot_attrs)); + /* + * If one of these sets contains no remaining bits, bms_first_member will + * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which + * is negative!) we'll get an attribute number that can't possibly be + * real, and thus won't match any actual attribute number. + */ + next_hot_attnum = bms_first_member(hot_attrs); + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; for (;;) { - int check_now; bool changed; + int check_now; - /* both bitmapsets are now empty */ - if (key_done && hot_done) - break; - - /* XXX there's probably an easier way ... */ - if (hot_done) - check_now = next_key_attnum; - if (key_done) + /* + * Since the HOT attributes are a superset of the key attributes and + * the key attributes are a superset of the id attributes, this logic + * is guaranteed to identify the next column that needs to be + * checked. + */ + if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber) check_now = next_hot_attnum; + else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_key_attnum; + else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_id_attnum; else - check_now = Min(next_hot_attnum, next_key_attnum); + break; + /* See whether it changed. */ changed = !heap_tuple_attr_equals(RelationGetDescr(relation), check_now, oldtup, newtup); if (changed) @@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation, hot_result = false; if (check_now == next_key_attnum) key_result = false; - } + if (check_now == next_id_attnum) + id_result = false; - /* if both are false now, we can stop checking */ - if (!hot_result && !key_result) - break; + /* if all are false now, we can stop checking */ + if (!hot_result && !key_result && !id_result) + break; + } - if (check_now == next_hot_attnum) + /* + * Advance the next attribute numbers for the sets that contain + * the attribute we just checked. As we work our way through the + * columns, the next_attnum values will rise; but when each set + * becomes empty, bms_first_member() will return -1 and the attribute + * number will end up with a value less than + * FirstLowInvalidHeapAttributeNumber. + */ + if (hot_result && check_now == next_hot_attnum) { next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; } - if (check_now == next_key_attnum) + if (key_result && check_now == next_key_attnum) { next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + } + if (id_result && check_now == next_id_attnum) + { + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; } } *satisfies_hot = hot_result; *satisfies_key = key_result; + *satisfies_id = id_result; } /* @@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, + HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; + xl_heap_header_len xlhdr_idx; uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[7]; Page page = BufferGetPage(newbuf); + bool need_tuple_data = RelationIsLogicallyLogged(reln); /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, oldtup->t_data->t_infomask2); xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; xlrec.newtid = newtup->t_self; - xlrec.new_all_visible_cleared = new_all_visible_cleared; + if (new_all_visible_cleared) + xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; @@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf, rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); - xlhdr.t_infomask2 = newtup->t_data->t_infomask2; - xlhdr.t_infomask = newtup->t_data->t_infomask; - xlhdr.t_hoff = newtup->t_data->t_hoff; + xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; + xlhdr.header.t_infomask = newtup->t_data->t_infomask; + xlhdr.header.t_hoff = newtup->t_data->t_hoff; + xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); /* - * As with insert records, we need not store the rdata[2] segment if we - * decide to store the whole buffer instead. + * As with insert records, we need not store the rdata[2] segment + * if we decide to store the whole buffer instead unless we're + * doing logical decoding. */ rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = newbuf; + rdata[2].len = SizeOfHeapHeaderLen; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next = &(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].data = (char *) newtup->t_data + + offsetof(HeapTupleHeaderData, t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = newbuf; + rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL; + /* + * Separate storage for the FPW buffer reference of the new page in the + * wal_level >= logical case. + */ + if (need_tuple_data) + { + rdata[3].next = &(rdata[4]); + + rdata[4].data = NULL, + rdata[4].len = 0; + rdata[4].buffer = newbuf; + rdata[4].buffer_std = true; + rdata[4].next = NULL; + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + + /* We need to log a tuple identity */ + if (old_key_tuple) + { + /* don't really need this, but its more comfy to decode */ + xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr_idx.t_len = old_key_tuple->t_len; + + rdata[4].next = &(rdata[5]); + rdata[5].data = (char *) &xlhdr_idx; + rdata[5].len = SizeOfHeapHeaderLen; + rdata[5].buffer = InvalidBuffer; + rdata[5].next = &(rdata[6]); + + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + rdata[6].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[6].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[6].buffer = InvalidBuffer; + rdata[6].next = NULL; + + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + } + /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { + XLogRecData *rcur = &rdata[2]; info |= XLOG_HEAP_INIT_PAGE; - rdata[2].buffer = rdata[3].buffer = InvalidBuffer; + while (rcur != NULL) + { + rcur->buffer = InvalidBuffer; + rcur = rcur->next; + } } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -6339,6 +6552,184 @@ log_newpage_buffer(Buffer buffer, bool page_std) return log_newpage(&rnode, forkNum, blkno, page, page_std); } +/* + * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record + * + * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog + * tuples. + */ +static XLogRecPtr +log_heap_new_cid(Relation relation, HeapTuple tup) +{ + xl_heap_new_cid xlrec; + + XLogRecPtr recptr; + XLogRecData rdata[1]; + HeapTupleHeader hdr = tup->t_data; + + Assert(ItemPointerIsValid(&tup->t_self)); + Assert(tup->t_tableOid != InvalidOid); + + xlrec.top_xid = GetTopTransactionId(); + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tup->t_self; + + /* + * If the tuple got inserted & deleted in the same TX we definitely have a + * combocid, set cmin and cmax. + */ + if (hdr->t_infomask & HEAP_COMBOCID) + { + Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID)); + Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID)); + xlrec.cmin = HeapTupleHeaderGetCmin(hdr); + xlrec.cmax = HeapTupleHeaderGetCmax(hdr); + xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr); + } + /* No combocid, so only cmin or cmax can be set by this TX */ + else + { + /* + * Tuple inserted. + * + * We need to check for LOCK ONLY because multixacts might be + * transferred to the new tuple in case of FOR KEY SHARE updates in + * which case there will be a xmax, although the tuple just got + * inserted. + */ + if (hdr->t_infomask & HEAP_XMAX_INVALID || + HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask)) + { + xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr); + xlrec.cmax = InvalidCommandId; + } + /* Tuple from a different tx updated or deleted. */ + else + { + xlrec.cmin = InvalidCommandId; + xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr); + + } + xlrec.combocid = InvalidCommandId; + } + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapNewCid; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata); + + return recptr; +} + +/* + * Build a heap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log a identity or if there's no suitable + * key in the Relation relation. + */ +static HeapTuple +ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Relation idx_rel; + TupleDesc idx_desc; + char replident = relation->rd_rel->relreplident; + HeapTuple key_tuple = NULL; + bool copy_oid = false; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (HeapTupleHasExternal(tp)) + { + *copy = true; + tp = toast_flatten_tuple(tp, RelationGetDescr(relation)); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* needs to already have been fetched? */ + if (relation->rd_indexvalid == 0) + RelationGetIndexList(relation); + + if (!OidIsValid(relation->rd_replidindex)) + { + elog(DEBUG4, "Could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(relation->rd_replidindex); + idx_desc = RelationGetDescr(idx_rel); + + /* deform tuple, so we have fast access to columns */ + heap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < idx_desc->natts; natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno == ObjectIdAttributeNumber) + copy_oid = true; + else if (attno < 0) + elog(ERROR, "system column in index"); + else + nulls[attno - 1] = false; + } + + key_tuple = heap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* XXX: we could also do this unconditionally, the space is used anyway */ + if (copy_oid) + HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp)); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (HeapTupleHasExternal(key_tuple)) + { + HeapTuple oldtup = key_tuple; + key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation)); + heap_freetuple(oldtup); + } + + return key_tuple; +} + /* * Handles CLEANUP_INFO */ @@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->node); Buffer vmbuffer = InvalidBuffer; @@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; int hsize; uint32 newlen; Size freespace; @@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); @@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* @@ -7164,7 +7555,7 @@ newt:; * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); @@ -7222,13 +7613,13 @@ newsame:; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate + SizeOfHeapHeader; + hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen; - newlen = record->xl_len - hsize; - Assert(newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, - SizeOfHeapHeader); + SizeOfHeapHeaderLen); + newlen = xlhdr.t_len; + Assert(newlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ @@ -7236,9 +7627,9 @@ newsame:; (char *) xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; + htup->t_infomask2 = xlhdr.header.t_infomask2; + htup->t_infomask = xlhdr.header.t_infomask; + htup->t_hoff = xlhdr.header.t_hoff; HeapTupleHeaderSetXmin(htup, record->xl_xid); HeapTupleHeaderSetCmin(htup, FirstCommandId); @@ -7250,7 +7641,7 @@ newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(lsn, record); break; + case XLOG_HEAP2_NEW_CID: + /* + * Nothing to do on a real replay, only used during logical + * decoding. + */ + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index e14c053910..39c53d0022 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -184,6 +184,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->infobits_set); out_target(buf, &(xlrec->target)); } + else if (info == XLOG_HEAP2_NEW_CID) + { + xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec; + + appendStringInfo(buf, "new_cid: "); + out_target(buf, &(xlrec->target)); + appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u", + xlrec->cmin, xlrec->cmax, xlrec->combocid); + } else appendStringInfoString(buf, "UNKNOWN"); } diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 1d70494233..dd67b1f643 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -28,6 +28,7 @@ const struct config_enum_entry wal_level_options[] = { {"minimal", WAL_LEVEL_MINIMAL, false}, {"archive", WAL_LEVEL_ARCHIVE, false}, {"hot_standby", WAL_LEVEL_HOT_STANDBY, false}, + {"logical", WAL_LEVEL_LOGICAL, false}, {NULL, 0, false} }; diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e975f8d26d..48203d2c81 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -47,6 +47,7 @@ #include "access/twophase.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlog.h" #include "access/xlogutils.h" #include "catalog/pg_type.h" #include "catalog/storage.h" @@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void) * the prepared transaction generated xid assignment records. Test * here must match one used in AssignTransactionId(). */ - if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS) + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) overwriteOK = true; /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b467b5c89d..ad0d19c4a9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -148,6 +148,7 @@ typedef struct TransactionStateData int prevSecContext; /* previous SecurityRestrictionContext */ bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ + bool didLogXid; /* has xid been included in WAL record? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -177,6 +178,7 @@ static TransactionStateData TopTransactionStateData = { 0, /* previous SecurityRestrictionContext */ false, /* entry-time xact r/o state */ false, /* startedInRecovery */ + false, /* didLogXid */ NULL /* link to parent state block */ }; @@ -394,6 +396,19 @@ GetCurrentTransactionIdIfAny(void) return CurrentTransactionState->transactionId; } +/* + * MarkCurrentTransactionIdLoggedIfAny + * + * Remember that the current xid - if it is assigned - now has been wal logged. + */ +void +MarkCurrentTransactionIdLoggedIfAny(void) +{ + if (TransactionIdIsValid(CurrentTransactionState->transactionId)) + CurrentTransactionState->didLogXid = true; +} + + /* * GetStableLatestTransactionId * @@ -435,6 +450,7 @@ AssignTransactionId(TransactionState s) { bool isSubXact = (s->parent != NULL); ResourceOwner currentOwner; + bool log_unknown_top = false; /* Assert that caller didn't screw up */ Assert(!TransactionIdIsValid(s->transactionId)); @@ -469,6 +485,20 @@ AssignTransactionId(TransactionState s) pfree(parents); } + /* + * When wal_level=logical, guarantee that a subtransaction's xid can only + * be seen in the WAL stream if its toplevel xid has been logged + * before. If necessary we log a xact_assignment record with fewer than + * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set + * for a transaction even though it appears in a WAL record, we just might + * superfluously log something. That can happen when an xid is included + * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in + * xl_standby_locks. + */ + if (isSubXact && XLogLogicalInfoActive() && + !TopTransactionStateData.didLogXid) + log_unknown_top = true; + /* * Generate a new Xid and record it in PG_PROC and pg_subtrans. * @@ -523,6 +553,9 @@ AssignTransactionId(TransactionState s) * top-level transaction that each subxact belongs to. This is correct in * recovery only because aborted subtransactions are separately WAL * logged. + * + * This is correct even for the case where several levels above us didn't + * have an xid assigned as we recursed up to them beforehand. */ if (isSubXact && XLogStandbyInfoActive()) { @@ -533,7 +566,8 @@ AssignTransactionId(TransactionState s) * ensure this test matches similar one in * RecoverPreparedTransactions() */ - if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS) + if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS || + log_unknown_top) { XLogRecData rdata[2]; xl_xact_assignment xlrec; @@ -552,13 +586,15 @@ AssignTransactionId(TransactionState s) rdata[0].next = &rdata[1]; rdata[1].data = (char *) unreportedXids; - rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId); + rdata[1].len = nUnreportedXids * sizeof(TransactionId); rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); nUnreportedXids = 0; + /* mark top, not current xact as having been logged */ + TopTransactionStateData.didLogXid = true; } } } @@ -1737,6 +1773,7 @@ StartTransaction(void) * initialize reported xid accounting */ nUnreportedXids = 0; + s->didLogXid = false; /* * must initialize resource-management stuff first diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b68230d196..6fa5479c92 100755 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1191,6 +1191,8 @@ begin:; */ WALInsertSlotRelease(); + MarkCurrentTransactionIdLoggedIfAny(); + END_CRIT_SECTION(); /* @@ -5961,7 +5963,7 @@ CheckRequiredParameterValues(void) { if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY) ereport(ERROR, - (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"), + (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"), errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here."))); /* We ignore autovacuum_max_workers when we make this test. */ @@ -9650,7 +9652,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL level not sufficient for making an online backup"), - errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start."))); + errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start."))); if (strlen(backupidstr) > MAXPGPATH) ereport(ERROR, @@ -9988,7 +9990,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL level not sufficient for making an online backup"), - errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start."))); + errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start."))); /* * OK to update backup counters and forcePageWrites diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 02752406f5..aa31429b8a 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -3321,7 +3321,7 @@ reindex_relation(Oid relid, int flags) /* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */ if (is_pg_class) - (void) RelationGetIndexAttrBitmap(rel, false); + (void) RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_ALL); PG_TRY(); { diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 0008fc633b..4eff1845f5 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2355,7 +2355,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, * concurrency. */ modifiedCols = GetModifiedColumns(relinfo, estate); - keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true); + keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, + INDEX_ATTR_BITMAP_KEY); if (bms_overlap(keyCols, modifiedCols)) lockmode = LockTupleExclusive; else diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d294a5a47f..048a1894e5 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -816,10 +816,10 @@ PostmasterMain(int argc, char *argv[]) } if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\" or \"hot_standby\""))); + (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\" or \"logical\""))); if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); + (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\" or \"logical\""))); /* * Other one-time internal sanity checks can go here, if they are fast. diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index d0acca871e..03bbb9f338 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -3818,8 +3818,9 @@ RelationGetIndexPredicate(Relation relation) * simple index keys, but attributes used in expressions and partial-index * predicates.) * - * If "keyAttrs" is true, only attributes that can be referenced by foreign - * keys are considered. + * Depending on attrKind, a bitmap covering the attnums for all index columns, + * for all key columns or for all the columns the configured replica identity + * are returned. * * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that * we can include system attributes (e.g., OID) in the bitmap representation. @@ -3832,17 +3833,28 @@ RelationGetIndexPredicate(Relation relation) * be bms_free'd when not needed anymore. */ Bitmapset * -RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) +RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) { - Bitmapset *indexattrs; - Bitmapset *uindexattrs; + Bitmapset *indexattrs; /* indexed columns */ + Bitmapset *uindexattrs; /* columns in unique indexes */ + Bitmapset *idindexattrs; /* columns in the the replica identity */ List *indexoidlist; ListCell *l; MemoryContext oldcxt; /* Quick exit if we already computed the result. */ if (relation->rd_indexattr != NULL) - return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr); + switch(attrKind) + { + case INDEX_ATTR_BITMAP_IDENTITY_KEY: + return bms_copy(relation->rd_idattr); + case INDEX_ATTR_BITMAP_KEY: + return bms_copy(relation->rd_keyattr); + case INDEX_ATTR_BITMAP_ALL: + return bms_copy(relation->rd_indexattr); + default: + elog(ERROR, "unknown attrKind %u", attrKind); + } /* Fast path if definitely no indexes */ if (!RelationGetForm(relation)->relhasindex) @@ -3869,13 +3881,16 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) */ indexattrs = NULL; uindexattrs = NULL; + idindexattrs = NULL; foreach(l, indexoidlist) { Oid indexOid = lfirst_oid(l); Relation indexDesc; IndexInfo *indexInfo; int i; - bool isKey; + bool isKey; /* candidate key */ + bool isIDKey; /* replica identity index */ + indexDesc = index_open(indexOid, AccessShareLock); @@ -3887,6 +3902,9 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) indexInfo->ii_Expressions == NIL && indexInfo->ii_Predicate == NIL; + /* Is this index the configured (or default) replica identity? */ + isIDKey = indexOid == relation->rd_replidindex; + /* Collect simple attribute references */ for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) { @@ -3896,6 +3914,11 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) { indexattrs = bms_add_member(indexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); + + if (isIDKey) + idindexattrs = bms_add_member(idindexattrs, + attrnum - FirstLowInvalidHeapAttributeNumber); + if (isKey) uindexattrs = bms_add_member(uindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); @@ -3917,10 +3940,21 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) oldcxt = MemoryContextSwitchTo(CacheMemoryContext); relation->rd_indexattr = bms_copy(indexattrs); relation->rd_keyattr = bms_copy(uindexattrs); + relation->rd_idattr = bms_copy(idindexattrs); MemoryContextSwitchTo(oldcxt); /* We return our original working copy for caller to play with */ - return keyAttrs ? uindexattrs : indexattrs; + switch(attrKind) + { + case INDEX_ATTR_BITMAP_IDENTITY_KEY: + return idindexattrs; + case INDEX_ATTR_BITMAP_KEY: + return uindexattrs; + case INDEX_ATTR_BITMAP_ALL: + return indexattrs; + default: + elog(ERROR, "unknown attrKind %u", attrKind); + } } /* diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 983cae7fda..6096fe4445 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -170,7 +170,7 @@ # - Settings - -#wal_level = minimal # minimal, archive, or hot_standby +#wal_level = minimal # minimal, archive, hot_standby or logical # (change requires restart) #fsync = on # turns forced synchronization on or off #synchronous_commit = on # synchronization level; diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index fde483a616..8c6cf24d23 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level) return "archive"; case WAL_LEVEL_HOT_STANDBY: return "hot_standby"; + case WAL_LEVEL_LOGICAL: + return "logical"; } return _("unrecognized wal_level"); } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 63b73d0329..438e79db48 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -55,6 +55,22 @@ #define XLOG_HEAP2_VISIBLE 0x40 #define XLOG_HEAP2_MULTI_INSERT 0x50 #define XLOG_HEAP2_LOCK_UPDATED 0x60 +#define XLOG_HEAP2_NEW_CID 0x70 + +/* + * xl_heap_* ->flag values, 8 bits are available. + */ +/* PD_ALL_VISIBLE was cleared */ +#define XLOG_HEAP_ALL_VISIBLE_CLEARED (1<<0) +/* PD_ALL_VISIBLE was cleared in the 2nd page */ +#define XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED (1<<1) +#define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2) +#define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3) +#define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4) + +/* convenience macro for checking whether any form of old tuple was logged */ +#define XLOG_HEAP_CONTAINS_OLD \ + (XLOG_HEAP_CONTAINS_OLD_TUPLE | XLOG_HEAP_CONTAINS_OLD_KEY) /* * All what we need to find changed tuple @@ -78,10 +94,10 @@ typedef struct xl_heap_delete xl_heaptid target; /* deleted tuple id */ TransactionId xmax; /* xmax of the deleted tuple */ uint8 infobits_set; /* infomask bits */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ + uint8 flags; } xl_heap_delete; -#define SizeOfHeapDelete (offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8)) /* * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted @@ -100,15 +116,29 @@ typedef struct xl_heap_header #define SizeOfHeapHeader (offsetof(xl_heap_header, t_hoff) + sizeof(uint8)) +/* + * Variant of xl_heap_header that contains the length of the tuple, which is + * useful if the length of the tuple cannot be computed using the overall + * record length. E.g. because there are several tuples inside a single + * record. + */ +typedef struct xl_heap_header_len +{ + uint16 t_len; + xl_heap_header header; +} xl_heap_header_len; + +#define SizeOfHeapHeaderLen (offsetof(xl_heap_header_len, header) + SizeOfHeapHeader) + /* This is what we need to know about insert */ typedef struct xl_heap_insert { xl_heaptid target; /* inserted tuple id */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ + uint8 flags; /* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_insert; -#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapInsert (offsetof(xl_heap_insert, flags) + sizeof(uint8)) /* * This is what we need to know about a multi-insert. The record consists of @@ -120,7 +150,7 @@ typedef struct xl_heap_multi_insert { RelFileNode node; BlockNumber blkno; - bool all_visible_cleared; + uint8 flags; uint16 ntuples; OffsetNumber offsets[1]; @@ -147,13 +177,12 @@ typedef struct xl_heap_update TransactionId old_xmax; /* xmax of the old tuple */ TransactionId new_xmax; /* xmax of the new tuple */ ItemPointerData newtid; /* new inserted tuple id */ - uint8 old_infobits_set; /* infomask bits to set on old tuple */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ - bool new_all_visible_cleared; /* same for the page of newtid */ + uint8 old_infobits_set; /* infomask bits to set on old tuple */ + uint8 flags; /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; -#define SizeOfHeapUpdate (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8)) /* * This is what we need to know about vacuum page cleanup/redirect @@ -263,6 +292,29 @@ typedef struct xl_heap_visible #define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId)) +typedef struct xl_heap_new_cid +{ + /* + * store toplevel xid so we don't have to merge cids from different + * transactions + */ + TransactionId top_xid; + CommandId cmin; + CommandId cmax; + /* + * don't really need the combocid since we have the actual values + * right in this struct, but the padding makes it free and its + * useful for debugging. + */ + CommandId combocid; + /* + * Store the relfilenode/ctid pair to facilitate lookups. + */ + xl_heaptid target; +} xl_heap_new_cid; + +#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid) + extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, TransactionId *latestRemovedXid); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 1d3e7d8938..9632378865 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -215,6 +215,7 @@ extern TransactionId GetCurrentTransactionId(void); extern TransactionId GetCurrentTransactionIdIfAny(void); extern TransactionId GetStableLatestTransactionId(void); extern SubTransactionId GetCurrentSubTransactionId(void); +extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); extern TimestampTz GetCurrentTransactionStartTimestamp(void); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 002862cca5..7415a261bb 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -197,7 +197,8 @@ typedef enum WalLevel { WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_ARCHIVE, - WAL_LEVEL_HOT_STANDBY + WAL_LEVEL_HOT_STANDBY, + WAL_LEVEL_LOGICAL } WalLevel; extern int wal_level; @@ -210,9 +211,12 @@ extern int wal_level; */ #define XLogIsNeeded() (wal_level >= WAL_LEVEL_ARCHIVE) -/* Do we need to WAL-log information required only for Hot Standby? */ +/* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) +/* Do we need to WAL-log information required only for logical replication? */ +#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 9fba8c3db8..64ba55355b 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -55,7 +55,7 @@ typedef struct BkpBlock /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD078 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD079 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 21d5871454..ad878cf1a2 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -104,6 +104,7 @@ typedef struct RelationData List *rd_indexlist; /* list of OIDs of indexes on relation */ Bitmapset *rd_indexattr; /* identifies columns used in indexes */ Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */ + Bitmapset *rd_idattr; /* included in replica identity index */ Oid rd_oidindex; /* OID of unique index on OID, if any */ LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */ RuleLock *rd_rules; /* rewrite rules */ @@ -453,6 +454,29 @@ typedef struct StdRdOptions */ #define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated) +/* + * RelationIsAccessibleInLogicalDecoding + * True if we need to log enough information to have access via + * decoding snapshot. + */ +#define RelationIsAccessibleInLogicalDecoding(relation) \ + (XLogLogicalInfoActive() && \ + RelationNeedsWAL(relation) && \ + IsCatalogRelation(relation)) + +/* + * RelationIsLogicallyLogged + * True if we need to log enough information to extract the data from the + * WAL stream. + * + * We don't log information for unlogged tables (since they don't WAL log + * anyway) and for system tables (their content is hard to make sense of, and + * it would complicate decoding slightly for little gain). + */ +#define RelationIsLogicallyLogged(relation) \ + (XLogLogicalInfoActive() && \ + RelationNeedsWAL(relation) && \ + !IsCatalogRelation(relation)) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index 8ac2549cb3..d7604ec113 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -41,7 +41,17 @@ extern List *RelationGetIndexList(Relation relation); extern Oid RelationGetOidIndex(Relation relation); extern List *RelationGetIndexExpressions(Relation relation); extern List *RelationGetIndexPredicate(Relation relation); -extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs); + +typedef enum IndexAttrBitmapKind +{ + INDEX_ATTR_BITMAP_ALL, + INDEX_ATTR_BITMAP_KEY, + INDEX_ATTR_BITMAP_IDENTITY_KEY +} IndexAttrBitmapKind; + +extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, + IndexAttrBitmapKind keyAttrs); + extern void RelationGetExclusionInfo(Relation indexRelation, Oid **operators, Oid **procs, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b20eb0d5ac..c5200372ae 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -791,6 +791,7 @@ IdentifySystemCmd IncrementVarSublevelsUp_context Index IndexArrayKeyInfo +IndexAttrBitmapKind IndexBuildCallback IndexBuildResult IndexBulkDeleteCallback @@ -2419,11 +2420,13 @@ xl_heap_cleanup_info xl_heap_delete xl_heap_freeze xl_heap_header +xl_heap_header_len xl_heap_inplace xl_heap_insert xl_heap_lock xl_heap_lock_updated xl_heap_multi_insert +xl_heap_new_cid xl_heap_newpage xl_heap_update xl_heap_visible -- 2.40.0