1 /*-------------------------------------------------------------------------
3 * PostgreSQL logical replication worker (apply)
5 * Copyright (c) 2016-2019, PostgreSQL Global Development Group
8 * src/backend/replication/logical/worker.c
11 * This file contains the worker which applies logical changes as they come
12 * from remote logical replication stream.
14 * The main worker (apply) is started by logical replication worker
15 * launcher for every enabled subscription in a database. It uses
16 * walsender protocol to communicate with publisher.
18 * This module includes server facing code and shares libpqwalreceiver
19 * module with walreceiver for providing the libpq specific functionality.
21 *-------------------------------------------------------------------------
26 #include "access/table.h"
27 #include "access/xact.h"
28 #include "access/xlog_internal.h"
29 #include "catalog/catalog.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_subscription.h"
32 #include "catalog/pg_subscription_rel.h"
33 #include "commands/tablecmds.h"
34 #include "commands/trigger.h"
35 #include "executor/executor.h"
36 #include "executor/nodeModifyTable.h"
38 #include "libpq/pqformat.h"
39 #include "libpq/pqsignal.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "nodes/makefuncs.h"
43 #include "optimizer/optimizer.h"
44 #include "parser/parse_relation.h"
46 #include "postmaster/bgworker.h"
47 #include "postmaster/postmaster.h"
48 #include "postmaster/walwriter.h"
49 #include "replication/decode.h"
50 #include "replication/logical.h"
51 #include "replication/logicalproto.h"
52 #include "replication/logicalrelation.h"
53 #include "replication/logicalworker.h"
54 #include "replication/origin.h"
55 #include "replication/reorderbuffer.h"
56 #include "replication/snapbuild.h"
57 #include "replication/walreceiver.h"
58 #include "replication/worker_internal.h"
59 #include "rewrite/rewriteHandler.h"
60 #include "storage/bufmgr.h"
61 #include "storage/ipc.h"
62 #include "storage/lmgr.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/catcache.h"
68 #include "utils/datum.h"
69 #include "utils/fmgroids.h"
70 #include "utils/guc.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/rel.h"
75 #include "utils/syscache.h"
76 #include "utils/timeout.h"
78 #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
80 typedef struct FlushPosition
84 XLogRecPtr remote_end;
87 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
89 typedef struct SlotErrCallbackArg
91 LogicalRepRelMapEntry *rel;
96 static MemoryContext ApplyMessageContext = NULL;
97 MemoryContext ApplyContext = NULL;
99 WalReceiverConn *wrconn = NULL;
101 Subscription *MySubscription = NULL;
102 bool MySubscriptionValid = false;
104 bool in_remote_transaction = false;
105 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
107 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
109 static void store_flush_position(XLogRecPtr remote_lsn);
111 static void maybe_reread_subscription(void);
113 /* Flags set by signal handlers */
114 static volatile sig_atomic_t got_SIGHUP = false;
117 * Should this worker apply changes for given relation.
119 * This is mainly needed for initial relation data sync as that runs in
120 * separate worker process running in parallel and we need some way to skip
121 * changes coming to the main apply worker during the sync of a table.
123 * Note we need to do smaller or equals comparison for SYNCDONE state because
124 * it might hold position of end of initial slot consistent point WAL
125 * record + 1 (ie start of next record) and next record can be COMMIT of
126 * transaction we are now processing (which is what we set remote_final_lsn
127 * to in apply_handle_begin).
130 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
132 if (am_tablesync_worker())
133 return MyLogicalRepWorker->relid == rel->localreloid;
135 return (rel->state == SUBREL_STATE_READY ||
136 (rel->state == SUBREL_STATE_SYNCDONE &&
137 rel->statelsn <= remote_final_lsn));
141 * Make sure that we started local transaction.
143 * Also switches to ApplyMessageContext as necessary.
146 ensure_transaction(void)
148 if (IsTransactionState())
150 SetCurrentStatementStartTimestamp();
152 if (CurrentMemoryContext != ApplyMessageContext)
153 MemoryContextSwitchTo(ApplyMessageContext);
158 SetCurrentStatementStartTimestamp();
159 StartTransactionCommand();
161 maybe_reread_subscription();
163 MemoryContextSwitchTo(ApplyMessageContext);
169 * Executor state preparation for evaluation of constraint expressions,
170 * indexes and triggers.
172 * This is based on similar code in copy.c
175 create_estate_for_relation(LogicalRepRelMapEntry *rel)
178 ResultRelInfo *resultRelInfo;
181 estate = CreateExecutorState();
183 rte = makeNode(RangeTblEntry);
184 rte->rtekind = RTE_RELATION;
185 rte->relid = RelationGetRelid(rel->localrel);
186 rte->relkind = rel->localrel->rd_rel->relkind;
187 rte->rellockmode = AccessShareLock;
188 ExecInitRangeTable(estate, list_make1(rte));
190 resultRelInfo = makeNode(ResultRelInfo);
191 InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
193 estate->es_result_relations = resultRelInfo;
194 estate->es_num_result_relations = 1;
195 estate->es_result_relation_info = resultRelInfo;
197 estate->es_output_cid = GetCurrentCommandId(true);
199 /* Triggers might need a slot */
200 if (resultRelInfo->ri_TrigDesc)
201 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL,
204 /* Prepare to catch AFTER triggers. */
205 AfterTriggerBeginQuery();
211 * Executes default values for columns for which we can't map to remote
214 * This allows us to support tables which have more columns on the downstream
215 * than on the upstream.
218 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
219 TupleTableSlot *slot)
221 TupleDesc desc = RelationGetDescr(rel->localrel);
222 int num_phys_attrs = desc->natts;
227 ExprState **defexprs;
228 ExprContext *econtext;
230 econtext = GetPerTupleExprContext(estate);
232 /* We got all the data via replication, no need to evaluate anything. */
233 if (num_phys_attrs == rel->remoterel.natts)
236 defmap = (int *) palloc(num_phys_attrs * sizeof(int));
237 defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
239 for (attnum = 0; attnum < num_phys_attrs; attnum++)
243 if (TupleDescAttr(desc, attnum)->attisdropped)
246 if (rel->attrmap[attnum] >= 0)
249 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
253 /* Run the expression through planner */
254 defexpr = expression_planner(defexpr);
256 /* Initialize executable expression in copycontext */
257 defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
258 defmap[num_defaults] = attnum;
264 for (i = 0; i < num_defaults; i++)
265 slot->tts_values[defmap[i]] =
266 ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
270 * Error callback to give more context info about type conversion failure.
273 slot_store_error_callback(void *arg)
275 SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
276 LogicalRepRelMapEntry *rel;
281 /* Nothing to do if remote attribute number is not set */
282 if (errarg->remote_attnum < 0)
286 remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
288 /* Fetch remote type name from the LogicalRepTypMap cache */
289 remotetypname = logicalrep_typmap_gettypname(remotetypoid);
291 /* Fetch local type OID from the local sys cache */
292 localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
294 errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
295 "remote type %s, local type %s",
296 rel->remoterel.nspname, rel->remoterel.relname,
297 rel->remoterel.attnames[errarg->remote_attnum],
299 format_type_be(localtypoid));
303 * Store data in C string form into slot.
304 * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
308 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
311 int natts = slot->tts_tupleDescriptor->natts;
313 SlotErrCallbackArg errarg;
314 ErrorContextCallback errcallback;
316 ExecClearTuple(slot);
318 /* Push callback + info on the error context stack */
320 errarg.local_attnum = -1;
321 errarg.remote_attnum = -1;
322 errcallback.callback = slot_store_error_callback;
323 errcallback.arg = (void *) &errarg;
324 errcallback.previous = error_context_stack;
325 error_context_stack = &errcallback;
327 /* Call the "in" function for each non-dropped attribute */
328 for (i = 0; i < natts; i++)
330 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
331 int remoteattnum = rel->attrmap[i];
333 if (!att->attisdropped && remoteattnum >= 0 &&
334 values[remoteattnum] != NULL)
339 errarg.local_attnum = i;
340 errarg.remote_attnum = remoteattnum;
342 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
343 slot->tts_values[i] =
344 OidInputFunctionCall(typinput, values[remoteattnum],
345 typioparam, att->atttypmod);
346 slot->tts_isnull[i] = false;
348 errarg.local_attnum = -1;
349 errarg.remote_attnum = -1;
354 * We assign NULL to dropped attributes, NULL values, and missing
355 * values (missing values should be later filled using
356 * slot_fill_defaults).
358 slot->tts_values[i] = (Datum) 0;
359 slot->tts_isnull[i] = true;
363 /* Pop the error context stack */
364 error_context_stack = errcallback.previous;
366 ExecStoreVirtualTuple(slot);
370 * Modify slot with user data provided as C strings.
371 * This is somewhat similar to heap_modify_tuple but also calls the type
372 * input function on the user data as the input is the text representation
376 slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
377 char **values, bool *replaces)
379 int natts = slot->tts_tupleDescriptor->natts;
381 SlotErrCallbackArg errarg;
382 ErrorContextCallback errcallback;
384 slot_getallattrs(slot);
385 ExecClearTuple(slot);
387 /* Push callback + info on the error context stack */
389 errarg.local_attnum = -1;
390 errarg.remote_attnum = -1;
391 errcallback.callback = slot_store_error_callback;
392 errcallback.arg = (void *) &errarg;
393 errcallback.previous = error_context_stack;
394 error_context_stack = &errcallback;
396 /* Call the "in" function for each replaced attribute */
397 for (i = 0; i < natts; i++)
399 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
400 int remoteattnum = rel->attrmap[i];
402 if (remoteattnum < 0)
405 if (!replaces[remoteattnum])
408 if (values[remoteattnum] != NULL)
413 errarg.local_attnum = i;
414 errarg.remote_attnum = remoteattnum;
416 getTypeInputInfo(att->atttypid, &typinput, &typioparam);
417 slot->tts_values[i] =
418 OidInputFunctionCall(typinput, values[remoteattnum],
419 typioparam, att->atttypmod);
420 slot->tts_isnull[i] = false;
422 errarg.local_attnum = -1;
423 errarg.remote_attnum = -1;
427 slot->tts_values[i] = (Datum) 0;
428 slot->tts_isnull[i] = true;
432 /* Pop the error context stack */
433 error_context_stack = errcallback.previous;
435 ExecStoreVirtualTuple(slot);
439 * Handle BEGIN message.
442 apply_handle_begin(StringInfo s)
444 LogicalRepBeginData begin_data;
446 logicalrep_read_begin(s, &begin_data);
448 remote_final_lsn = begin_data.final_lsn;
450 in_remote_transaction = true;
452 pgstat_report_activity(STATE_RUNNING, NULL);
456 * Handle COMMIT message.
458 * TODO, support tracking of multiple origins
461 apply_handle_commit(StringInfo s)
463 LogicalRepCommitData commit_data;
465 logicalrep_read_commit(s, &commit_data);
467 Assert(commit_data.commit_lsn == remote_final_lsn);
469 /* The synchronization worker runs in single transaction. */
470 if (IsTransactionState() && !am_tablesync_worker())
473 * Update origin state so we can restart streaming from correct
474 * position in case of crash.
476 replorigin_session_origin_lsn = commit_data.end_lsn;
477 replorigin_session_origin_timestamp = commit_data.committime;
479 CommitTransactionCommand();
480 pgstat_report_stat(false);
482 store_flush_position(commit_data.end_lsn);
486 /* Process any invalidation messages that might have accumulated. */
487 AcceptInvalidationMessages();
488 maybe_reread_subscription();
491 in_remote_transaction = false;
493 /* Process any tables that are being synchronized in parallel. */
494 process_syncing_tables(commit_data.end_lsn);
496 pgstat_report_activity(STATE_IDLE, NULL);
500 * Handle ORIGIN message.
502 * TODO, support tracking of multiple origins
505 apply_handle_origin(StringInfo s)
508 * ORIGIN message can only come inside remote transaction and before any
511 if (!in_remote_transaction ||
512 (IsTransactionState() && !am_tablesync_worker()))
514 (errcode(ERRCODE_PROTOCOL_VIOLATION),
515 errmsg("ORIGIN message sent out of order")));
519 * Handle RELATION message.
521 * Note we don't do validation against local schema here. The validation
522 * against local schema is postponed until first change for given relation
523 * comes as we only care about it when applying changes for it anyway and we
524 * do less locking this way.
527 apply_handle_relation(StringInfo s)
529 LogicalRepRelation *rel;
531 rel = logicalrep_read_rel(s);
532 logicalrep_relmap_update(rel);
536 * Handle TYPE message.
538 * Note we don't do local mapping here, that's done when the type is
542 apply_handle_type(StringInfo s)
546 logicalrep_read_typ(s, &typ);
547 logicalrep_typmap_update(&typ);
551 * Get replica identity index or if it is not defined a primary key.
553 * If neither is defined, returns InvalidOid
556 GetRelationIdentityOrPK(Relation rel)
560 idxoid = RelationGetReplicaIndex(rel);
562 if (!OidIsValid(idxoid))
563 idxoid = RelationGetPrimaryKeyIndex(rel);
569 * Handle INSERT message.
572 apply_handle_insert(StringInfo s)
574 LogicalRepRelMapEntry *rel;
575 LogicalRepTupleData newtup;
576 LogicalRepRelId relid;
578 TupleTableSlot *remoteslot;
579 MemoryContext oldctx;
581 ensure_transaction();
583 relid = logicalrep_read_insert(s, &newtup);
584 rel = logicalrep_rel_open(relid, RowExclusiveLock);
585 if (!should_apply_changes_for_rel(rel))
588 * The relation can't become interesting in the middle of the
589 * transaction so it's safe to unlock it.
591 logicalrep_rel_close(rel, RowExclusiveLock);
595 /* Initialize the executor state. */
596 estate = create_estate_for_relation(rel);
597 remoteslot = ExecInitExtraTupleSlot(estate,
598 RelationGetDescr(rel->localrel),
601 /* Input functions may need an active snapshot, so get one */
602 PushActiveSnapshot(GetTransactionSnapshot());
604 /* Process and store remote tuple in the slot */
605 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
606 slot_store_cstrings(remoteslot, rel, newtup.values);
607 slot_fill_defaults(rel, estate, remoteslot);
608 MemoryContextSwitchTo(oldctx);
610 ExecOpenIndices(estate->es_result_relation_info, false);
613 ExecSimpleRelationInsert(estate, remoteslot);
616 ExecCloseIndices(estate->es_result_relation_info);
619 /* Handle queued AFTER triggers. */
620 AfterTriggerEndQuery(estate);
622 ExecResetTupleTable(estate->es_tupleTable, false);
623 FreeExecutorState(estate);
625 logicalrep_rel_close(rel, NoLock);
627 CommandCounterIncrement();
631 * Check if the logical replication relation is updatable and throw
632 * appropriate error if it isn't.
635 check_relation_updatable(LogicalRepRelMapEntry *rel)
637 /* Updatable, no error. */
642 * We are in error mode so it's fine this is somewhat slow. It's better to
643 * give user correct error.
645 if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
648 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
649 errmsg("publisher did not send replica identity column "
650 "expected by the logical replication target relation \"%s.%s\"",
651 rel->remoterel.nspname, rel->remoterel.relname)));
655 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
656 errmsg("logical replication target relation \"%s.%s\" has "
657 "neither REPLICA IDENTITY index nor PRIMARY "
658 "KEY and published relation does not have "
659 "REPLICA IDENTITY FULL",
660 rel->remoterel.nspname, rel->remoterel.relname)));
664 * Handle UPDATE message.
669 apply_handle_update(StringInfo s)
671 LogicalRepRelMapEntry *rel;
672 LogicalRepRelId relid;
676 LogicalRepTupleData oldtup;
677 LogicalRepTupleData newtup;
679 TupleTableSlot *localslot;
680 TupleTableSlot *remoteslot;
682 MemoryContext oldctx;
684 ensure_transaction();
686 relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
688 rel = logicalrep_rel_open(relid, RowExclusiveLock);
689 if (!should_apply_changes_for_rel(rel))
692 * The relation can't become interesting in the middle of the
693 * transaction so it's safe to unlock it.
695 logicalrep_rel_close(rel, RowExclusiveLock);
699 /* Check if we can do the update. */
700 check_relation_updatable(rel);
702 /* Initialize the executor state. */
703 estate = create_estate_for_relation(rel);
704 remoteslot = ExecInitExtraTupleSlot(estate,
705 RelationGetDescr(rel->localrel),
707 localslot = ExecInitExtraTupleSlot(estate,
708 RelationGetDescr(rel->localrel),
710 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
712 PushActiveSnapshot(GetTransactionSnapshot());
713 ExecOpenIndices(estate->es_result_relation_info, false);
715 /* Build the search tuple. */
716 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
717 slot_store_cstrings(remoteslot, rel,
718 has_oldtup ? oldtup.values : newtup.values);
719 MemoryContextSwitchTo(oldctx);
722 * Try to find tuple using either replica identity index, primary key or
723 * if needed, sequential scan.
725 idxoid = GetRelationIdentityOrPK(rel->localrel);
726 Assert(OidIsValid(idxoid) ||
727 (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
729 if (OidIsValid(idxoid))
730 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
732 remoteslot, localslot);
734 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
735 remoteslot, localslot);
737 ExecClearTuple(remoteslot);
742 * Note this will fail if there are other conflicting unique indexes.
746 /* Process and store remote tuple in the slot */
747 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
748 ExecCopySlot(remoteslot, localslot);
749 slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
750 MemoryContextSwitchTo(oldctx);
752 EvalPlanQualSetSlot(&epqstate, remoteslot);
754 /* Do the actual update. */
755 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
760 * The tuple to be updated could not be found.
762 * TODO what to do here, change the log level to LOG perhaps?
765 "logical replication did not find row for update "
766 "in replication target relation \"%s\"",
767 RelationGetRelationName(rel->localrel));
771 ExecCloseIndices(estate->es_result_relation_info);
774 /* Handle queued AFTER triggers. */
775 AfterTriggerEndQuery(estate);
777 EvalPlanQualEnd(&epqstate);
778 ExecResetTupleTable(estate->es_tupleTable, false);
779 FreeExecutorState(estate);
781 logicalrep_rel_close(rel, NoLock);
783 CommandCounterIncrement();
787 * Handle DELETE message.
792 apply_handle_delete(StringInfo s)
794 LogicalRepRelMapEntry *rel;
795 LogicalRepTupleData oldtup;
796 LogicalRepRelId relid;
800 TupleTableSlot *remoteslot;
801 TupleTableSlot *localslot;
803 MemoryContext oldctx;
805 ensure_transaction();
807 relid = logicalrep_read_delete(s, &oldtup);
808 rel = logicalrep_rel_open(relid, RowExclusiveLock);
809 if (!should_apply_changes_for_rel(rel))
812 * The relation can't become interesting in the middle of the
813 * transaction so it's safe to unlock it.
815 logicalrep_rel_close(rel, RowExclusiveLock);
819 /* Check if we can do the delete. */
820 check_relation_updatable(rel);
822 /* Initialize the executor state. */
823 estate = create_estate_for_relation(rel);
824 remoteslot = ExecInitExtraTupleSlot(estate,
825 RelationGetDescr(rel->localrel),
827 localslot = ExecInitExtraTupleSlot(estate,
828 RelationGetDescr(rel->localrel),
830 EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
832 PushActiveSnapshot(GetTransactionSnapshot());
833 ExecOpenIndices(estate->es_result_relation_info, false);
835 /* Find the tuple using the replica identity index. */
836 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
837 slot_store_cstrings(remoteslot, rel, oldtup.values);
838 MemoryContextSwitchTo(oldctx);
841 * Try to find tuple using either replica identity index, primary key or
842 * if needed, sequential scan.
844 idxoid = GetRelationIdentityOrPK(rel->localrel);
845 Assert(OidIsValid(idxoid) ||
846 (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
848 if (OidIsValid(idxoid))
849 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
851 remoteslot, localslot);
853 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
854 remoteslot, localslot);
855 /* If found delete it. */
858 EvalPlanQualSetSlot(&epqstate, localslot);
860 /* Do the actual delete. */
861 ExecSimpleRelationDelete(estate, &epqstate, localslot);
865 /* The tuple to be deleted could not be found. */
867 "logical replication could not find row for delete "
868 "in replication target relation \"%s\"",
869 RelationGetRelationName(rel->localrel));
873 ExecCloseIndices(estate->es_result_relation_info);
876 /* Handle queued AFTER triggers. */
877 AfterTriggerEndQuery(estate);
879 EvalPlanQualEnd(&epqstate);
880 ExecResetTupleTable(estate->es_tupleTable, false);
881 FreeExecutorState(estate);
883 logicalrep_rel_close(rel, NoLock);
885 CommandCounterIncrement();
889 * Handle TRUNCATE message.
894 apply_handle_truncate(StringInfo s)
896 bool cascade = false;
897 bool restart_seqs = false;
898 List *remote_relids = NIL;
899 List *remote_rels = NIL;
902 List *relids_logged = NIL;
905 ensure_transaction();
907 remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
909 foreach(lc, remote_relids)
911 LogicalRepRelId relid = lfirst_oid(lc);
912 LogicalRepRelMapEntry *rel;
914 rel = logicalrep_rel_open(relid, RowExclusiveLock);
915 if (!should_apply_changes_for_rel(rel))
918 * The relation can't become interesting in the middle of the
919 * transaction so it's safe to unlock it.
921 logicalrep_rel_close(rel, RowExclusiveLock);
925 remote_rels = lappend(remote_rels, rel);
926 rels = lappend(rels, rel->localrel);
927 relids = lappend_oid(relids, rel->localreloid);
928 if (RelationIsLogicallyLogged(rel->localrel))
929 relids_logged = lappend_oid(relids_logged, rel->localreloid);
933 * Even if we used CASCADE on the upstream master we explicitly default to
934 * replaying changes without further cascading. This might be later
935 * changeable with a user specified option.
937 ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
939 foreach(lc, remote_rels)
941 LogicalRepRelMapEntry *rel = lfirst(lc);
943 logicalrep_rel_close(rel, NoLock);
946 CommandCounterIncrement();
951 * Logical replication protocol message dispatcher.
954 apply_dispatch(StringInfo s)
956 char action = pq_getmsgbyte(s);
962 apply_handle_begin(s);
966 apply_handle_commit(s);
970 apply_handle_insert(s);
974 apply_handle_update(s);
978 apply_handle_delete(s);
982 apply_handle_truncate(s);
986 apply_handle_relation(s);
990 apply_handle_type(s);
994 apply_handle_origin(s);
998 (errcode(ERRCODE_PROTOCOL_VIOLATION),
999 errmsg("invalid logical replication message type \"%c\"", action)));
1004 * Figure out which write/flush positions to report to the walsender process.
1006 * We can't simply report back the last LSN the walsender sent us because the
1007 * local transaction might not yet be flushed to disk locally. Instead we
1008 * build a list that associates local with remote LSNs for every commit. When
1009 * reporting back the flush position to the sender we iterate that list and
1010 * check which entries on it are already locally flushed. Those we can report
1011 * as having been flushed.
1013 * The have_pending_txes is true if there are outstanding transactions that
1014 * need to be flushed.
1017 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1018 bool *have_pending_txes)
1020 dlist_mutable_iter iter;
1021 XLogRecPtr local_flush = GetFlushRecPtr();
1023 *write = InvalidXLogRecPtr;
1024 *flush = InvalidXLogRecPtr;
1026 dlist_foreach_modify(iter, &lsn_mapping)
1028 FlushPosition *pos =
1029 dlist_container(FlushPosition, node, iter.cur);
1031 *write = pos->remote_end;
1033 if (pos->local_end <= local_flush)
1035 *flush = pos->remote_end;
1036 dlist_delete(iter.cur);
1042 * Don't want to uselessly iterate over the rest of the list which
1043 * could potentially be long. Instead get the last element and
1044 * grab the write position from there.
1046 pos = dlist_tail_element(FlushPosition, node,
1048 *write = pos->remote_end;
1049 *have_pending_txes = true;
1054 *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1058 * Store current remote/local lsn pair in the tracking list.
1061 store_flush_position(XLogRecPtr remote_lsn)
1063 FlushPosition *flushpos;
1065 /* Need to do this in permanent context */
1066 MemoryContextSwitchTo(ApplyContext);
1068 /* Track commit lsn */
1069 flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1070 flushpos->local_end = XactLastCommitEnd;
1071 flushpos->remote_end = remote_lsn;
1073 dlist_push_tail(&lsn_mapping, &flushpos->node);
1074 MemoryContextSwitchTo(ApplyMessageContext);
1078 /* Update statistics of the worker. */
1080 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1082 MyLogicalRepWorker->last_lsn = last_lsn;
1083 MyLogicalRepWorker->last_send_time = send_time;
1084 MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1087 MyLogicalRepWorker->reply_lsn = last_lsn;
1088 MyLogicalRepWorker->reply_time = send_time;
1096 LogicalRepApplyLoop(XLogRecPtr last_received)
1099 * Init the ApplyMessageContext which we clean up after each replication
1102 ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1103 "ApplyMessageContext",
1104 ALLOCSET_DEFAULT_SIZES);
1106 /* mark as idle, before starting to loop */
1107 pgstat_report_activity(STATE_IDLE, NULL);
1111 pgsocket fd = PGINVALID_SOCKET;
1115 bool endofstream = false;
1116 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1117 bool ping_sent = false;
1120 CHECK_FOR_INTERRUPTS();
1122 MemoryContextSwitchTo(ApplyMessageContext);
1124 len = walrcv_receive(wrconn, &buf, &fd);
1128 /* Process the data */
1131 CHECK_FOR_INTERRUPTS();
1140 (errmsg("data stream from publisher has ended")));
1149 /* Reset timeout. */
1150 last_recv_timestamp = GetCurrentTimestamp();
1153 /* Ensure we are reading the data into our memory context. */
1154 MemoryContextSwitchTo(ApplyMessageContext);
1161 c = pq_getmsgbyte(&s);
1165 XLogRecPtr start_lsn;
1167 TimestampTz send_time;
1169 start_lsn = pq_getmsgint64(&s);
1170 end_lsn = pq_getmsgint64(&s);
1171 send_time = pq_getmsgint64(&s);
1173 if (last_received < start_lsn)
1174 last_received = start_lsn;
1176 if (last_received < end_lsn)
1177 last_received = end_lsn;
1179 UpdateWorkerStats(last_received, send_time, false);
1186 TimestampTz timestamp;
1187 bool reply_requested;
1189 end_lsn = pq_getmsgint64(&s);
1190 timestamp = pq_getmsgint64(&s);
1191 reply_requested = pq_getmsgbyte(&s);
1193 if (last_received < end_lsn)
1194 last_received = end_lsn;
1196 send_feedback(last_received, reply_requested, false);
1197 UpdateWorkerStats(last_received, timestamp, true);
1199 /* other message types are purposefully ignored */
1201 MemoryContextReset(ApplyMessageContext);
1204 len = walrcv_receive(wrconn, &buf, &fd);
1208 /* confirm all writes so far */
1209 send_feedback(last_received, false, false);
1211 if (!in_remote_transaction)
1214 * If we didn't get any transactions for a while there might be
1215 * unconsumed invalidation messages in the queue, consume them
1218 AcceptInvalidationMessages();
1219 maybe_reread_subscription();
1221 /* Process any table synchronization changes. */
1222 process_syncing_tables(last_received);
1225 /* Cleanup the memory. */
1226 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1227 MemoryContextSwitchTo(TopMemoryContext);
1229 /* Check if we need to exit the streaming loop. */
1234 walrcv_endstreaming(wrconn, &tli);
1239 * Wait for more data or latch. If we have unflushed transactions,
1240 * wake up after WalWriterDelay to see if they've been flushed yet (in
1241 * which case we should send a feedback message). Otherwise, there's
1242 * no particular urgency about waking up unless we get data or a
1245 if (!dlist_is_empty(&lsn_mapping))
1246 wait_time = WalWriterDelay;
1248 wait_time = NAPTIME_PER_CYCLE;
1250 rc = WaitLatchOrSocket(MyLatch,
1251 WL_SOCKET_READABLE | WL_LATCH_SET |
1252 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1254 WAIT_EVENT_LOGICAL_APPLY_MAIN);
1256 if (rc & WL_LATCH_SET)
1258 ResetLatch(MyLatch);
1259 CHECK_FOR_INTERRUPTS();
1265 ProcessConfigFile(PGC_SIGHUP);
1268 if (rc & WL_TIMEOUT)
1271 * We didn't receive anything new. If we haven't heard anything
1272 * from the server for more than wal_receiver_timeout / 2, ping
1273 * the server. Also, if it's been longer than
1274 * wal_receiver_status_interval since the last update we sent,
1275 * send a status update to the master anyway, to report any
1276 * progress in applying WAL.
1278 bool requestReply = false;
1281 * Check if time since last receive from standby has reached the
1284 if (wal_receiver_timeout > 0)
1286 TimestampTz now = GetCurrentTimestamp();
1287 TimestampTz timeout;
1290 TimestampTzPlusMilliseconds(last_recv_timestamp,
1291 wal_receiver_timeout);
1295 (errmsg("terminating logical replication worker due to timeout")));
1298 * We didn't receive anything new, for half of receiver
1299 * replication timeout. Ping the server.
1303 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1304 (wal_receiver_timeout / 2));
1307 requestReply = true;
1313 send_feedback(last_received, requestReply, requestReply);
1319 * Send a Standby Status Update message to server.
1321 * 'recvpos' is the latest LSN we've received data to, force is set if we need
1322 * to send a response to avoid timeouts.
1325 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1327 static StringInfo reply_message = NULL;
1328 static TimestampTz send_time = 0;
1330 static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1331 static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1332 static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1334 XLogRecPtr writepos;
1335 XLogRecPtr flushpos;
1337 bool have_pending_txes;
1340 * If the user doesn't want status to be reported to the publisher, be
1341 * sure to exit before doing anything at all.
1343 if (!force && wal_receiver_status_interval <= 0)
1346 /* It's legal to not pass a recvpos */
1347 if (recvpos < last_recvpos)
1348 recvpos = last_recvpos;
1350 get_flush_position(&writepos, &flushpos, &have_pending_txes);
1353 * No outstanding transactions to flush, we can report the latest received
1354 * position. This is important for synchronous replication.
1356 if (!have_pending_txes)
1357 flushpos = writepos = recvpos;
1359 if (writepos < last_writepos)
1360 writepos = last_writepos;
1362 if (flushpos < last_flushpos)
1363 flushpos = last_flushpos;
1365 now = GetCurrentTimestamp();
1367 /* if we've already reported everything we're good */
1369 writepos == last_writepos &&
1370 flushpos == last_flushpos &&
1371 !TimestampDifferenceExceeds(send_time, now,
1372 wal_receiver_status_interval * 1000))
1378 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1380 reply_message = makeStringInfo();
1381 MemoryContextSwitchTo(oldctx);
1384 resetStringInfo(reply_message);
1386 pq_sendbyte(reply_message, 'r');
1387 pq_sendint64(reply_message, recvpos); /* write */
1388 pq_sendint64(reply_message, flushpos); /* flush */
1389 pq_sendint64(reply_message, writepos); /* apply */
1390 pq_sendint64(reply_message, now); /* sendTime */
1391 pq_sendbyte(reply_message, requestReply); /* replyRequested */
1393 elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1395 (uint32) (recvpos >> 32), (uint32) recvpos,
1396 (uint32) (writepos >> 32), (uint32) writepos,
1397 (uint32) (flushpos >> 32), (uint32) flushpos
1400 walrcv_send(wrconn, reply_message->data, reply_message->len);
1402 if (recvpos > last_recvpos)
1403 last_recvpos = recvpos;
1404 if (writepos > last_writepos)
1405 last_writepos = writepos;
1406 if (flushpos > last_flushpos)
1407 last_flushpos = flushpos;
1411 * Reread subscription info if needed. Most changes will be exit.
1414 maybe_reread_subscription(void)
1416 MemoryContext oldctx;
1417 Subscription *newsub;
1418 bool started_tx = false;
1420 /* When cache state is valid there is nothing to do here. */
1421 if (MySubscriptionValid)
1424 /* This function might be called inside or outside of transaction. */
1425 if (!IsTransactionState())
1427 StartTransactionCommand();
1431 /* Ensure allocations in permanent context. */
1432 oldctx = MemoryContextSwitchTo(ApplyContext);
1434 newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1437 * Exit if the subscription was removed. This normally should not happen
1438 * as the worker gets killed during DROP SUBSCRIPTION.
1443 (errmsg("logical replication apply worker for subscription \"%s\" will "
1444 "stop because the subscription was removed",
1445 MySubscription->name)));
1451 * Exit if the subscription was disabled. This normally should not happen
1452 * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1454 if (!newsub->enabled)
1457 (errmsg("logical replication apply worker for subscription \"%s\" will "
1458 "stop because the subscription was disabled",
1459 MySubscription->name)));
1465 * Exit if connection string was changed. The launcher will start new
1468 if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1471 (errmsg("logical replication apply worker for subscription \"%s\" will "
1472 "restart because the connection information was changed",
1473 MySubscription->name)));
1479 * Exit if subscription name was changed (it's used for
1480 * fallback_application_name). The launcher will start new worker.
1482 if (strcmp(newsub->name, MySubscription->name) != 0)
1485 (errmsg("logical replication apply worker for subscription \"%s\" will "
1486 "restart because subscription was renamed",
1487 MySubscription->name)));
1492 /* !slotname should never happen when enabled is true. */
1493 Assert(newsub->slotname);
1496 * We need to make new connection to new slot if slot name has changed so
1497 * exit here as well if that's the case.
1499 if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1502 (errmsg("logical replication apply worker for subscription \"%s\" will "
1503 "restart because the replication slot name was changed",
1504 MySubscription->name)));
1510 * Exit if publication list was changed. The launcher will start new
1513 if (!equal(newsub->publications, MySubscription->publications))
1516 (errmsg("logical replication apply worker for subscription \"%s\" will "
1517 "restart because subscription's publications were changed",
1518 MySubscription->name)));
1523 /* Check for other changes that should never happen too. */
1524 if (newsub->dbid != MySubscription->dbid)
1526 elog(ERROR, "subscription %u changed unexpectedly",
1527 MyLogicalRepWorker->subid);
1530 /* Clean old subscription info and switch to new one. */
1531 FreeSubscription(MySubscription);
1532 MySubscription = newsub;
1534 MemoryContextSwitchTo(oldctx);
1536 /* Change synchronous commit according to the user's wishes */
1537 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1538 PGC_BACKEND, PGC_S_OVERRIDE);
1541 CommitTransactionCommand();
1543 MySubscriptionValid = true;
1547 * Callback from subscription syscache invalidation.
1550 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1552 MySubscriptionValid = false;
1555 /* SIGHUP: set flag to reload configuration at next convenient time */
1557 logicalrep_worker_sighup(SIGNAL_ARGS)
1559 int save_errno = errno;
1563 /* Waken anything waiting on the process latch */
1569 /* Logical Replication Apply worker entry point */
1571 ApplyWorkerMain(Datum main_arg)
1573 int worker_slot = DatumGetInt32(main_arg);
1574 MemoryContext oldctx;
1575 char originname[NAMEDATALEN];
1576 XLogRecPtr origin_startpos;
1578 WalRcvStreamOptions options;
1580 /* Attach to slot */
1581 logicalrep_worker_attach(worker_slot);
1583 /* Setup signal handling */
1584 pqsignal(SIGHUP, logicalrep_worker_sighup);
1585 pqsignal(SIGTERM, die);
1586 BackgroundWorkerUnblockSignals();
1589 * We don't currently need any ResourceOwner in a walreceiver process, but
1590 * if we did, we could call CreateAuxProcessResourceOwner here.
1593 /* Initialise stats to a sanish value */
1594 MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1595 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1597 /* Load the libpq-specific functions */
1598 load_file("libpqwalreceiver", false);
1600 /* Run as replica session replication role. */
1601 SetConfigOption("session_replication_role", "replica",
1602 PGC_SUSET, PGC_S_OVERRIDE);
1604 /* Connect to our database. */
1605 BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1606 MyLogicalRepWorker->userid,
1609 /* Load the subscription into persistent memory context. */
1610 ApplyContext = AllocSetContextCreate(TopMemoryContext,
1612 ALLOCSET_DEFAULT_SIZES);
1613 StartTransactionCommand();
1614 oldctx = MemoryContextSwitchTo(ApplyContext);
1616 MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1617 if (!MySubscription)
1620 (errmsg("logical replication apply worker for subscription %u will not "
1621 "start because the subscription was removed during startup",
1622 MyLogicalRepWorker->subid)));
1626 MySubscriptionValid = true;
1627 MemoryContextSwitchTo(oldctx);
1629 if (!MySubscription->enabled)
1632 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1633 "start because the subscription was disabled during startup",
1634 MySubscription->name)));
1639 /* Setup synchronous commit according to the user's wishes */
1640 SetConfigOption("synchronous_commit", MySubscription->synccommit,
1641 PGC_BACKEND, PGC_S_OVERRIDE);
1643 /* Keep us informed about subscription changes. */
1644 CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1645 subscription_change_cb,
1648 if (am_tablesync_worker())
1650 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1651 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1654 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1655 MySubscription->name)));
1657 CommitTransactionCommand();
1659 /* Connect to the origin and start the replication. */
1660 elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1661 MySubscription->conninfo);
1663 if (am_tablesync_worker())
1667 /* This is table synchroniation worker, call initial sync. */
1668 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1670 /* The slot name needs to be allocated in permanent memory context. */
1671 oldctx = MemoryContextSwitchTo(ApplyContext);
1672 myslotname = pstrdup(syncslotname);
1673 MemoryContextSwitchTo(oldctx);
1675 pfree(syncslotname);
1679 /* This is main apply worker */
1680 RepOriginId originid;
1681 TimeLineID startpointTLI;
1685 myslotname = MySubscription->slotname;
1688 * This shouldn't happen if the subscription is enabled, but guard
1689 * against DDL bugs or manual catalog changes. (libpqwalreceiver will
1690 * crash if slot is NULL.)
1694 (errmsg("subscription has no replication slot set")));
1696 /* Setup replication origin tracking. */
1697 StartTransactionCommand();
1698 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1699 originid = replorigin_by_name(originname, true);
1700 if (!OidIsValid(originid))
1701 originid = replorigin_create(originname);
1702 replorigin_session_setup(originid);
1703 replorigin_session_origin = originid;
1704 origin_startpos = replorigin_session_get_progress(false);
1705 CommitTransactionCommand();
1707 wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1711 (errmsg("could not connect to the publisher: %s", err)));
1714 * We don't really use the output identify_system for anything but it
1715 * does some initializations on the upstream so let's still call it.
1717 (void) walrcv_identify_system(wrconn, &startpointTLI,
1723 * Setup callback for syscache so that we know when something changes in
1724 * the subscription relation state.
1726 CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1727 invalidate_syncing_table_states,
1730 /* Build logical replication streaming options. */
1731 options.logical = true;
1732 options.startpoint = origin_startpos;
1733 options.slotname = myslotname;
1734 options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1735 options.proto.logical.publication_names = MySubscription->publications;
1737 /* Start normal logical streaming replication. */
1738 walrcv_startstreaming(wrconn, &options);
1740 /* Run the main loop. */
1741 LogicalRepApplyLoop(origin_startpos);
1747 * Is current process a logical replication worker?
1750 IsLogicalWorker(void)
1752 return MyLogicalRepWorker != NULL;