]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/decode.c
Add macros to make AllocSetContextCreate() calls simpler and safer.
[postgresql] / src / backend / replication / logical / decode.c
1 /* -------------------------------------------------------------------------
2  *
3  * decode.c
4  *              This module decodes WAL records read using xlogreader.h's APIs for the
5  *              purpose of logical decoding by passing information to the
6  *              reorderbuffer module (containing the actual changes) and to the
7  *              snapbuild module to build a fitting catalog snapshot (to be able to
8  *              properly decode the changes in the reorderbuffer).
9  *
10  * NOTE:
11  *              This basically tries to handle all low level xlog stuff for
12  *              reorderbuffer.c and snapbuild.c. There's some minor leakage where a
13  *              specific record's struct is used to pass data along, but those just
14  *              happen to contain the right amount of data in a convenient
15  *              format. There isn't and shouldn't be much intelligence about the
16  *              contents of records in here except turning them into a more usable
17  *              format.
18  *
19  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  *        src/backend/replication/logical/decode.c
24  *
25  * -------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28
29 #include "access/heapam.h"
30 #include "access/heapam_xlog.h"
31 #include "access/transam.h"
32 #include "access/xact.h"
33 #include "access/xlog_internal.h"
34 #include "access/xlogutils.h"
35 #include "access/xlogreader.h"
36 #include "access/xlogrecord.h"
37
38 #include "catalog/pg_control.h"
39
40 #include "replication/decode.h"
41 #include "replication/logical.h"
42 #include "replication/message.h"
43 #include "replication/reorderbuffer.h"
44 #include "replication/origin.h"
45 #include "replication/snapbuild.h"
46
47 #include "storage/standby.h"
48
49 typedef struct XLogRecordBuffer
50 {
51         XLogRecPtr      origptr;
52         XLogRecPtr      endptr;
53         XLogReaderState *record;
54 } XLogRecordBuffer;
55
56 /* RMGR Handlers */
57 static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
58 static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
59 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
60 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
61 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
62 static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
63
64 /* individual record(group)'s handlers */
65 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
66 static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
67 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
68 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
69 static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
70
71 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
72                          xl_xact_parsed_commit *parsed, TransactionId xid);
73 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
74                         xl_xact_parsed_abort *parsed, TransactionId xid);
75
76 /* common function to decode tuples */
77 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
78
79 /*
80  * Take every XLogReadRecord()ed record and perform the actions required to
81  * decode it using the output plugin already setup in the logical decoding
82  * context.
83  *
84  * NB: Note that every record's xid needs to be processed by reorderbuffer
85  * (xids contained in the content of records are not relevant for this rule).
86  * That means that for records which'd otherwise not go through the
87  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
88  * call ReorderBufferProcessXid for each record type by default, because
89  * e.g. empty xacts can be handled more efficiently if there's no previous
90  * state for them.
91  */
92 void
93 LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
94 {
95         XLogRecordBuffer buf;
96
97         buf.origptr = ctx->reader->ReadRecPtr;
98         buf.endptr = ctx->reader->EndRecPtr;
99         buf.record = record;
100
101         /* cast so we get a warning when new rmgrs are added */
102         switch ((RmgrIds) XLogRecGetRmid(record))
103         {
104                         /*
105                          * Rmgrs we care about for logical decoding. Add new rmgrs in
106                          * rmgrlist.h's order.
107                          */
108                 case RM_XLOG_ID:
109                         DecodeXLogOp(ctx, &buf);
110                         break;
111
112                 case RM_XACT_ID:
113                         DecodeXactOp(ctx, &buf);
114                         break;
115
116                 case RM_STANDBY_ID:
117                         DecodeStandbyOp(ctx, &buf);
118                         break;
119
120                 case RM_HEAP2_ID:
121                         DecodeHeap2Op(ctx, &buf);
122                         break;
123
124                 case RM_HEAP_ID:
125                         DecodeHeapOp(ctx, &buf);
126                         break;
127
128                 case RM_LOGICALMSG_ID:
129                         DecodeLogicalMsgOp(ctx, &buf);
130                         break;
131
132                         /*
133                          * Rmgrs irrelevant for logical decoding; they describe stuff not
134                          * represented in logical decoding. Add new rmgrs in rmgrlist.h's
135                          * order.
136                          */
137                 case RM_SMGR_ID:
138                 case RM_CLOG_ID:
139                 case RM_DBASE_ID:
140                 case RM_TBLSPC_ID:
141                 case RM_MULTIXACT_ID:
142                 case RM_RELMAP_ID:
143                 case RM_BTREE_ID:
144                 case RM_HASH_ID:
145                 case RM_GIN_ID:
146                 case RM_GIST_ID:
147                 case RM_SEQ_ID:
148                 case RM_SPGIST_ID:
149                 case RM_BRIN_ID:
150                 case RM_COMMIT_TS_ID:
151                 case RM_REPLORIGIN_ID:
152                 case RM_GENERIC_ID:
153                         /* just deal with xid, and done */
154                         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
155                                                                         buf.origptr);
156                         break;
157                 case RM_NEXT_ID:
158                         elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
159         }
160 }
161
162 /*
163  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
164  */
165 static void
166 DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
167 {
168         SnapBuild  *builder = ctx->snapshot_builder;
169         uint8           info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
170
171         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
172                                                         buf->origptr);
173
174         switch (info)
175         {
176                         /* this is also used in END_OF_RECOVERY checkpoints */
177                 case XLOG_CHECKPOINT_SHUTDOWN:
178                 case XLOG_END_OF_RECOVERY:
179                         SnapBuildSerializationPoint(builder, buf->origptr);
180
181                         break;
182                 case XLOG_CHECKPOINT_ONLINE:
183
184                         /*
185                          * a RUNNING_XACTS record will have been logged near to this, we
186                          * can restart from there.
187                          */
188                         break;
189                 case XLOG_NOOP:
190                 case XLOG_NEXTOID:
191                 case XLOG_SWITCH:
192                 case XLOG_BACKUP_END:
193                 case XLOG_PARAMETER_CHANGE:
194                 case XLOG_RESTORE_POINT:
195                 case XLOG_FPW_CHANGE:
196                 case XLOG_FPI_FOR_HINT:
197                 case XLOG_FPI:
198                         break;
199                 default:
200                         elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
201         }
202 }
203
204 /*
205  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
206  */
207 static void
208 DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
209 {
210         SnapBuild  *builder = ctx->snapshot_builder;
211         ReorderBuffer *reorder = ctx->reorder;
212         XLogReaderState *r = buf->record;
213         uint8           info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
214
215         /*
216          * No point in doing anything yet, data could not be decoded anyway. It's
217          * ok not to call ReorderBufferProcessXid() in that case, except in the
218          * assignment case there'll not be any later records with the same xid;
219          * and in the assignment case we'll not decode those xacts.
220          */
221         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
222                 return;
223
224         switch (info)
225         {
226                 case XLOG_XACT_COMMIT:
227                 case XLOG_XACT_COMMIT_PREPARED:
228                         {
229                                 xl_xact_commit *xlrec;
230                                 xl_xact_parsed_commit parsed;
231                                 TransactionId xid;
232
233                                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
234                                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
235
236                                 if (!TransactionIdIsValid(parsed.twophase_xid))
237                                         xid = XLogRecGetXid(r);
238                                 else
239                                         xid = parsed.twophase_xid;
240
241                                 DecodeCommit(ctx, buf, &parsed, xid);
242                                 break;
243                         }
244                 case XLOG_XACT_ABORT:
245                 case XLOG_XACT_ABORT_PREPARED:
246                         {
247                                 xl_xact_abort *xlrec;
248                                 xl_xact_parsed_abort parsed;
249                                 TransactionId xid;
250
251                                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
252                                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
253
254                                 if (!TransactionIdIsValid(parsed.twophase_xid))
255                                         xid = XLogRecGetXid(r);
256                                 else
257                                         xid = parsed.twophase_xid;
258
259                                 DecodeAbort(ctx, buf, &parsed, xid);
260                                 break;
261                         }
262                 case XLOG_XACT_ASSIGNMENT:
263                         {
264                                 xl_xact_assignment *xlrec;
265                                 int                     i;
266                                 TransactionId *sub_xid;
267
268                                 xlrec = (xl_xact_assignment *) XLogRecGetData(r);
269
270                                 sub_xid = &xlrec->xsub[0];
271
272                                 for (i = 0; i < xlrec->nsubxacts; i++)
273                                 {
274                                         ReorderBufferAssignChild(reorder, xlrec->xtop,
275                                                                                          *(sub_xid++), buf->origptr);
276                                 }
277                                 break;
278                         }
279                 case XLOG_XACT_PREPARE:
280
281                         /*
282                          * Currently decoding ignores PREPARE TRANSACTION and will just
283                          * decode the transaction when the COMMIT PREPARED is sent or
284                          * throw away the transaction's contents when a ROLLBACK PREPARED
285                          * is received. In the future we could add code to expose prepared
286                          * transactions in the changestream allowing for a kind of
287                          * distributed 2PC.
288                          */
289                         ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
290                         break;
291                 default:
292                         elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
293         }
294 }
295
296 /*
297  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
298  */
299 static void
300 DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
301 {
302         SnapBuild  *builder = ctx->snapshot_builder;
303         XLogReaderState *r = buf->record;
304         uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
305
306         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
307
308         switch (info)
309         {
310                 case XLOG_RUNNING_XACTS:
311                         {
312                                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
313
314                                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
315
316                                 /*
317                                  * Abort all transactions that we keep track of, that are
318                                  * older than the record's oldestRunningXid. This is the most
319                                  * convenient spot for doing so since, in contrast to shutdown
320                                  * or end-of-recovery checkpoints, we have information about
321                                  * all running transactions which includes prepared ones,
322                                  * while shutdown checkpoints just know that no non-prepared
323                                  * transactions are in progress.
324                                  */
325                                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
326                         }
327                         break;
328                 case XLOG_STANDBY_LOCK:
329                         break;
330                 case XLOG_INVALIDATIONS:
331                         {
332                                 xl_invalidations *invalidations =
333                                 (xl_invalidations *) XLogRecGetData(r);
334
335                                 ReorderBufferImmediateInvalidation(
336                                         ctx->reorder, invalidations->nmsgs, invalidations->msgs);
337                         }
338                         break;
339                 default:
340                         elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
341         }
342 }
343
344 /*
345  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
346  */
347 static void
348 DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
349 {
350         uint8           info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
351         TransactionId xid = XLogRecGetXid(buf->record);
352         SnapBuild  *builder = ctx->snapshot_builder;
353
354         ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
355
356         /* no point in doing anything yet */
357         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
358                 return;
359
360         switch (info)
361         {
362                 case XLOG_HEAP2_MULTI_INSERT:
363                         if (SnapBuildProcessChange(builder, xid, buf->origptr))
364                                 DecodeMultiInsert(ctx, buf);
365                         break;
366                 case XLOG_HEAP2_NEW_CID:
367                         {
368                                 xl_heap_new_cid *xlrec;
369
370                                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
371                                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
372
373                                 break;
374                         }
375                 case XLOG_HEAP2_REWRITE:
376
377                         /*
378                          * Although these records only exist to serve the needs of logical
379                          * decoding, all the work happens as part of crash or archive
380                          * recovery, so we don't need to do anything here.
381                          */
382                         break;
383
384                         /*
385                          * Everything else here is just low level physical stuff we're not
386                          * interested in.
387                          */
388                 case XLOG_HEAP2_FREEZE_PAGE:
389                 case XLOG_HEAP2_CLEAN:
390                 case XLOG_HEAP2_CLEANUP_INFO:
391                 case XLOG_HEAP2_VISIBLE:
392                 case XLOG_HEAP2_LOCK_UPDATED:
393                         break;
394                 default:
395                         elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
396         }
397 }
398
399 /*
400  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
401  */
402 static void
403 DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
404 {
405         uint8           info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
406         TransactionId xid = XLogRecGetXid(buf->record);
407         SnapBuild  *builder = ctx->snapshot_builder;
408
409         ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
410
411         /* no point in doing anything yet */
412         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
413                 return;
414
415         switch (info)
416         {
417                 case XLOG_HEAP_INSERT:
418                         if (SnapBuildProcessChange(builder, xid, buf->origptr))
419                                 DecodeInsert(ctx, buf);
420                         break;
421
422                         /*
423                          * Treat HOT update as normal updates. There is no useful
424                          * information in the fact that we could make it a HOT update
425                          * locally and the WAL layout is compatible.
426                          */
427                 case XLOG_HEAP_HOT_UPDATE:
428                 case XLOG_HEAP_UPDATE:
429                         if (SnapBuildProcessChange(builder, xid, buf->origptr))
430                                 DecodeUpdate(ctx, buf);
431                         break;
432
433                 case XLOG_HEAP_DELETE:
434                         if (SnapBuildProcessChange(builder, xid, buf->origptr))
435                                 DecodeDelete(ctx, buf);
436                         break;
437
438                 case XLOG_HEAP_INPLACE:
439
440                         /*
441                          * Inplace updates are only ever performed on catalog tuples and
442                          * can, per definition, not change tuple visibility.  Since we
443                          * don't decode catalog tuples, we're not interested in the
444                          * record's contents.
445                          *
446                          * In-place updates can be used either by XID-bearing transactions
447                          * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
448                          * transactions (e.g.  VACUUM).  In the former case, the commit
449                          * record will include cache invalidations, so we mark the
450                          * transaction as catalog modifying here. Currently that's
451                          * redundant because the commit will do that as well, but once we
452                          * support decoding in-progress relations, this will be important.
453                          */
454                         if (!TransactionIdIsValid(xid))
455                                 break;
456
457                         SnapBuildProcessChange(builder, xid, buf->origptr);
458                         ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
459                         break;
460
461                 case XLOG_HEAP_CONFIRM:
462                         if (SnapBuildProcessChange(builder, xid, buf->origptr))
463                                 DecodeSpecConfirm(ctx, buf);
464                         break;
465
466                 case XLOG_HEAP_LOCK:
467                         /* we don't care about row level locks for now */
468                         break;
469
470                 default:
471                         elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
472                         break;
473         }
474 }
475
476 static inline bool
477 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
478 {
479         if (ctx->callbacks.filter_by_origin_cb == NULL)
480                 return false;
481
482         return filter_by_origin_cb_wrapper(ctx, origin_id);
483 }
484
485 /*
486  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
487  */
488 static void
489 DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
490 {
491         SnapBuild  *builder = ctx->snapshot_builder;
492         XLogReaderState *r = buf->record;
493         TransactionId xid = XLogRecGetXid(r);
494         uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
495         RepOriginId origin_id = XLogRecGetOrigin(r);
496         Snapshot        snapshot;
497         xl_logical_message *message;
498
499         if (info != XLOG_LOGICAL_MESSAGE)
500                 elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
501
502         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
503
504         /* No point in doing anything yet. */
505         if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
506                 return;
507
508         message = (xl_logical_message *) XLogRecGetData(r);
509
510         if (message->dbId != ctx->slot->data.database ||
511                 FilterByOrigin(ctx, origin_id))
512                 return;
513
514         if (message->transactional &&
515                 !SnapBuildProcessChange(builder, xid, buf->origptr))
516                 return;
517         else if (!message->transactional &&
518                          (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
519                           SnapBuildXactNeedsSkip(builder, buf->origptr)))
520                 return;
521
522         snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
523         ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
524                                                           message->transactional,
525                                                           message->message, /* first part of message is
526                                                                                                  * prefix */
527                                                           message->message_size,
528                                                           message->message + message->prefix_size);
529 }
530
531 /*
532  * Consolidated commit record handling between the different form of commit
533  * records.
534  */
535 static void
536 DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
537                          xl_xact_parsed_commit *parsed, TransactionId xid)
538 {
539         XLogRecPtr      origin_lsn = InvalidXLogRecPtr;
540         TimestampTz commit_time = parsed->xact_time;
541         RepOriginId origin_id = XLogRecGetOrigin(buf->record);
542         int                     i;
543
544         if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
545         {
546                 origin_lsn = parsed->origin_lsn;
547                 commit_time = parsed->origin_timestamp;
548         }
549
550         /*
551          * Process invalidation messages, even if we're not interested in the
552          * transaction's contents, since the various caches need to always be
553          * consistent.
554          */
555         if (parsed->nmsgs > 0)
556         {
557                 ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
558                                                                           parsed->nmsgs, parsed->msgs);
559                 ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
560         }
561
562         SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
563                                            parsed->nsubxacts, parsed->subxacts);
564
565         /* ----
566          * Check whether we are interested in this specific transaction, and tell
567          * the reorderbuffer to forget the content of the (sub-)transactions
568          * if not.
569          *
570          * There can be several reasons we might not be interested in this
571          * transaction:
572          * 1) We might not be interested in decoding transactions up to this
573          *        LSN. This can happen because we previously decoded it and now just
574          *        are restarting or if we haven't assembled a consistent snapshot yet.
575          * 2) The transaction happened in another database.
576          * 3) The output plugin is not interested in the origin.
577          *
578          * We can't just use ReorderBufferAbort() here, because we need to execute
579          * the transaction's invalidations.  This currently won't be needed if
580          * we're just skipping over the transaction because currently we only do
581          * so during startup, to get to the first transaction the client needs. As
582          * we have reset the catalog caches before starting to read WAL, and we
583          * haven't yet touched any catalogs, there can't be anything to invalidate.
584          * But if we're "forgetting" this commit because it's it happened in
585          * another database, the invalidations might be important, because they
586          * could be for shared catalogs and we might have loaded data into the
587          * relevant syscaches.
588          * ---
589          */
590         if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
591                 (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
592                 FilterByOrigin(ctx, origin_id))
593         {
594                 for (i = 0; i < parsed->nsubxacts; i++)
595                 {
596                         ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
597                 }
598                 ReorderBufferForget(ctx->reorder, xid, buf->origptr);
599
600                 return;
601         }
602
603         /* tell the reorderbuffer about the surviving subtransactions */
604         for (i = 0; i < parsed->nsubxacts; i++)
605         {
606                 ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
607                                                                  buf->origptr, buf->endptr);
608         }
609
610         /* replay actions of all transaction + subtransactions in order */
611         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
612                                                 commit_time, origin_id, origin_lsn);
613 }
614
615 /*
616  * Get the data from the various forms of abort records and pass it on to
617  * snapbuild.c and reorderbuffer.c
618  */
619 static void
620 DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
621                         xl_xact_parsed_abort *parsed, TransactionId xid)
622 {
623         int                     i;
624
625         SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
626                                           parsed->nsubxacts, parsed->subxacts);
627
628         for (i = 0; i < parsed->nsubxacts; i++)
629         {
630                 ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
631                                                    buf->record->EndRecPtr);
632         }
633
634         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
635 }
636
637 /*
638  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
639  *
640  * Deletes can contain the new tuple.
641  */
642 static void
643 DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
644 {
645         XLogReaderState *r = buf->record;
646         xl_heap_insert *xlrec;
647         ReorderBufferChange *change;
648         RelFileNode target_node;
649
650         xlrec = (xl_heap_insert *) XLogRecGetData(r);
651
652         /* only interested in our database */
653         XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
654         if (target_node.dbNode != ctx->slot->data.database)
655                 return;
656
657         /* output plugin doesn't look for this origin, no need to queue */
658         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
659                 return;
660
661         change = ReorderBufferGetChange(ctx->reorder);
662         if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
663                 change->action = REORDER_BUFFER_CHANGE_INSERT;
664         else
665                 change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
666         change->origin_id = XLogRecGetOrigin(r);
667
668         memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
669
670         if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
671         {
672                 Size            datalen;
673                 char       *tupledata = XLogRecGetBlockData(r, 0, &datalen);
674                 Size            tuplelen = datalen - SizeOfHeapHeader;
675
676                 change->data.tp.newtuple =
677                         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
678
679                 DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
680         }
681
682         change->data.tp.clear_toast_afterwards = true;
683
684         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
685 }
686
687 /*
688  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
689  * in the record, from wal into proper tuplebufs.
690  *
691  * Updates can possibly contain a new tuple and the old primary key.
692  */
693 static void
694 DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
695 {
696         XLogReaderState *r = buf->record;
697         xl_heap_update *xlrec;
698         ReorderBufferChange *change;
699         char       *data;
700         RelFileNode target_node;
701
702         xlrec = (xl_heap_update *) XLogRecGetData(r);
703
704         /* only interested in our database */
705         XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
706         if (target_node.dbNode != ctx->slot->data.database)
707                 return;
708
709         /* output plugin doesn't look for this origin, no need to queue */
710         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
711                 return;
712
713         change = ReorderBufferGetChange(ctx->reorder);
714         change->action = REORDER_BUFFER_CHANGE_UPDATE;
715         change->origin_id = XLogRecGetOrigin(r);
716         memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
717
718         if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
719         {
720                 Size            datalen;
721                 Size            tuplelen;
722
723                 data = XLogRecGetBlockData(r, 0, &datalen);
724
725                 tuplelen = datalen - SizeOfHeapHeader;
726
727                 change->data.tp.newtuple =
728                         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
729
730                 DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
731         }
732
733         if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
734         {
735                 Size            datalen;
736                 Size            tuplelen;
737
738                 /* caution, remaining data in record is not aligned */
739                 data = XLogRecGetData(r) + SizeOfHeapUpdate;
740                 datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
741                 tuplelen = datalen - SizeOfHeapHeader;
742
743                 change->data.tp.oldtuple =
744                         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
745
746                 DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
747         }
748
749         change->data.tp.clear_toast_afterwards = true;
750
751         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
752 }
753
754 /*
755  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
756  *
757  * Deletes can possibly contain the old primary key.
758  */
759 static void
760 DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
761 {
762         XLogReaderState *r = buf->record;
763         xl_heap_delete *xlrec;
764         ReorderBufferChange *change;
765         RelFileNode target_node;
766
767         xlrec = (xl_heap_delete *) XLogRecGetData(r);
768
769         /* only interested in our database */
770         XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
771         if (target_node.dbNode != ctx->slot->data.database)
772                 return;
773
774         /*
775          * Super deletions are irrelevant for logical decoding, it's driven by the
776          * confirmation records.
777          */
778         if (xlrec->flags & XLH_DELETE_IS_SUPER)
779                 return;
780
781         /* output plugin doesn't look for this origin, no need to queue */
782         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
783                 return;
784
785         change = ReorderBufferGetChange(ctx->reorder);
786         change->action = REORDER_BUFFER_CHANGE_DELETE;
787         change->origin_id = XLogRecGetOrigin(r);
788
789         memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
790
791         /* old primary key stored */
792         if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
793         {
794                 Size            datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
795                 Size            tuplelen = datalen - SizeOfHeapHeader;
796
797                 Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
798
799                 change->data.tp.oldtuple =
800                         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
801
802                 DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
803                                                 datalen, change->data.tp.oldtuple);
804         }
805
806         change->data.tp.clear_toast_afterwards = true;
807
808         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
809 }
810
811 /*
812  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
813  *
814  * Currently MULTI_INSERT will always contain the full tuples.
815  */
816 static void
817 DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
818 {
819         XLogReaderState *r = buf->record;
820         xl_heap_multi_insert *xlrec;
821         int                     i;
822         char       *data;
823         char       *tupledata;
824         Size            tuplelen;
825         RelFileNode rnode;
826
827         xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
828
829         /* only interested in our database */
830         XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
831         if (rnode.dbNode != ctx->slot->data.database)
832                 return;
833
834         /* output plugin doesn't look for this origin, no need to queue */
835         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
836                 return;
837
838         tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
839
840         data = tupledata;
841         for (i = 0; i < xlrec->ntuples; i++)
842         {
843                 ReorderBufferChange *change;
844                 xl_multi_insert_tuple *xlhdr;
845                 int                     datalen;
846                 ReorderBufferTupleBuf *tuple;
847
848                 change = ReorderBufferGetChange(ctx->reorder);
849                 change->action = REORDER_BUFFER_CHANGE_INSERT;
850                 change->origin_id = XLogRecGetOrigin(r);
851
852                 memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
853
854                 /*
855                  * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
856                  * isn't used for catalogs, but better be future proof.
857                  *
858                  * We decode the tuple in pretty much the same way as DecodeXLogTuple,
859                  * but since the layout is slightly different, we can't use it here.
860                  */
861                 if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
862                 {
863                         HeapTupleHeader header;
864
865                         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
866                         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
867                         datalen = xlhdr->datalen;
868
869                         change->data.tp.newtuple =
870                                 ReorderBufferGetTupleBuf(ctx->reorder, datalen);
871
872                         tuple = change->data.tp.newtuple;
873                         header = tuple->tuple.t_data;
874
875                         /* not a disk based tuple */
876                         ItemPointerSetInvalid(&tuple->tuple.t_self);
877
878                         /*
879                          * We can only figure this out after reassembling the
880                          * transactions.
881                          */
882                         tuple->tuple.t_tableOid = InvalidOid;
883
884                         tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
885
886                         memset(header, 0, SizeofHeapTupleHeader);
887
888                         memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
889                                    (char *) data,
890                                    datalen);
891                         data += datalen;
892
893                         header->t_infomask = xlhdr->t_infomask;
894                         header->t_infomask2 = xlhdr->t_infomask2;
895                         header->t_hoff = xlhdr->t_hoff;
896                 }
897
898                 /*
899                  * Reset toast reassembly state only after the last row in the last
900                  * xl_multi_insert_tuple record emitted by one heap_multi_insert()
901                  * call.
902                  */
903                 if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
904                         (i + 1) == xlrec->ntuples)
905                         change->data.tp.clear_toast_afterwards = true;
906                 else
907                         change->data.tp.clear_toast_afterwards = false;
908
909                 ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
910                                                                  buf->origptr, change);
911         }
912         Assert(data == tupledata + tuplelen);
913 }
914
915 /*
916  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
917  *
918  * This is pretty trivial, all the state essentially already setup by the
919  * speculative insertion.
920  */
921 static void
922 DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
923 {
924         XLogReaderState *r = buf->record;
925         ReorderBufferChange *change;
926         RelFileNode target_node;
927
928         /* only interested in our database */
929         XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
930         if (target_node.dbNode != ctx->slot->data.database)
931                 return;
932
933         /* output plugin doesn't look for this origin, no need to queue */
934         if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
935                 return;
936
937         change = ReorderBufferGetChange(ctx->reorder);
938         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
939         change->origin_id = XLogRecGetOrigin(r);
940
941         memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
942
943         change->data.tp.clear_toast_afterwards = true;
944
945         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
946 }
947
948
949 /*
950  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
951  * (but not by heap_multi_insert) into a tuplebuf.
952  *
953  * The size 'len' and the pointer 'data' in the record need to be
954  * computed outside as they are record specific.
955  */
956 static void
957 DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
958 {
959         xl_heap_header xlhdr;
960         int                     datalen = len - SizeOfHeapHeader;
961         HeapTupleHeader header;
962
963         Assert(datalen >= 0);
964
965         tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
966         header = tuple->tuple.t_data;
967
968         /* not a disk based tuple */
969         ItemPointerSetInvalid(&tuple->tuple.t_self);
970
971         /* we can only figure this out after reassembling the transactions */
972         tuple->tuple.t_tableOid = InvalidOid;
973
974         /* data is not stored aligned, copy to aligned storage */
975         memcpy((char *) &xlhdr,
976                    data,
977                    SizeOfHeapHeader);
978
979         memset(header, 0, SizeofHeapTupleHeader);
980
981         memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
982                    data + SizeOfHeapHeader,
983                    datalen);
984
985         header->t_infomask = xlhdr.t_infomask;
986         header->t_infomask2 = xlhdr.t_infomask2;
987         header->t_hoff = xlhdr.t_hoff;
988 }