]> granicus.if.org Git - postgresql/blob - src/backend/replication/logical/worker.c
Refactor planner's header files.
[postgresql] / src / backend / replication / logical / worker.c
1 /*-------------------------------------------------------------------------
2  * worker.c
3  *         PostgreSQL logical replication worker (apply)
4  *
5  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  *        src/backend/replication/logical/worker.c
9  *
10  * NOTES
11  *        This file contains the worker which applies logical changes as they come
12  *        from remote logical replication stream.
13  *
14  *        The main worker (apply) is started by logical replication worker
15  *        launcher for every enabled subscription in a database. It uses
16  *        walsender protocol to communicate with publisher.
17  *
18  *        This module includes server facing code and shares libpqwalreceiver
19  *        module with walreceiver for providing the libpq specific functionality.
20  *
21  *-------------------------------------------------------------------------
22  */
23
24 #include "postgres.h"
25
26 #include "access/table.h"
27 #include "access/xact.h"
28 #include "access/xlog_internal.h"
29 #include "catalog/catalog.h"
30 #include "catalog/namespace.h"
31 #include "catalog/pg_subscription.h"
32 #include "catalog/pg_subscription_rel.h"
33 #include "commands/tablecmds.h"
34 #include "commands/trigger.h"
35 #include "executor/executor.h"
36 #include "executor/nodeModifyTable.h"
37 #include "funcapi.h"
38 #include "libpq/pqformat.h"
39 #include "libpq/pqsignal.h"
40 #include "mb/pg_wchar.h"
41 #include "miscadmin.h"
42 #include "nodes/makefuncs.h"
43 #include "optimizer/optimizer.h"
44 #include "parser/parse_relation.h"
45 #include "pgstat.h"
46 #include "postmaster/bgworker.h"
47 #include "postmaster/postmaster.h"
48 #include "postmaster/walwriter.h"
49 #include "replication/decode.h"
50 #include "replication/logical.h"
51 #include "replication/logicalproto.h"
52 #include "replication/logicalrelation.h"
53 #include "replication/logicalworker.h"
54 #include "replication/origin.h"
55 #include "replication/reorderbuffer.h"
56 #include "replication/snapbuild.h"
57 #include "replication/walreceiver.h"
58 #include "replication/worker_internal.h"
59 #include "rewrite/rewriteHandler.h"
60 #include "storage/bufmgr.h"
61 #include "storage/ipc.h"
62 #include "storage/lmgr.h"
63 #include "storage/proc.h"
64 #include "storage/procarray.h"
65 #include "tcop/tcopprot.h"
66 #include "utils/builtins.h"
67 #include "utils/catcache.h"
68 #include "utils/datum.h"
69 #include "utils/fmgroids.h"
70 #include "utils/guc.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/memutils.h"
74 #include "utils/rel.h"
75 #include "utils/syscache.h"
76 #include "utils/timeout.h"
77
78 #define NAPTIME_PER_CYCLE 1000  /* max sleep time between cycles (1s) */
79
80 typedef struct FlushPosition
81 {
82         dlist_node      node;
83         XLogRecPtr      local_end;
84         XLogRecPtr      remote_end;
85 } FlushPosition;
86
87 static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
88
89 typedef struct SlotErrCallbackArg
90 {
91         LogicalRepRelMapEntry *rel;
92         int                     local_attnum;
93         int                     remote_attnum;
94 } SlotErrCallbackArg;
95
96 static MemoryContext ApplyMessageContext = NULL;
97 MemoryContext ApplyContext = NULL;
98
99 WalReceiverConn *wrconn = NULL;
100
101 Subscription *MySubscription = NULL;
102 bool            MySubscriptionValid = false;
103
104 bool            in_remote_transaction = false;
105 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
106
107 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
108
109 static void store_flush_position(XLogRecPtr remote_lsn);
110
111 static void maybe_reread_subscription(void);
112
113 /* Flags set by signal handlers */
114 static volatile sig_atomic_t got_SIGHUP = false;
115
116 /*
117  * Should this worker apply changes for given relation.
118  *
119  * This is mainly needed for initial relation data sync as that runs in
120  * separate worker process running in parallel and we need some way to skip
121  * changes coming to the main apply worker during the sync of a table.
122  *
123  * Note we need to do smaller or equals comparison for SYNCDONE state because
124  * it might hold position of end of initial slot consistent point WAL
125  * record + 1 (ie start of next record) and next record can be COMMIT of
126  * transaction we are now processing (which is what we set remote_final_lsn
127  * to in apply_handle_begin).
128  */
129 static bool
130 should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
131 {
132         if (am_tablesync_worker())
133                 return MyLogicalRepWorker->relid == rel->localreloid;
134         else
135                 return (rel->state == SUBREL_STATE_READY ||
136                                 (rel->state == SUBREL_STATE_SYNCDONE &&
137                                  rel->statelsn <= remote_final_lsn));
138 }
139
140 /*
141  * Make sure that we started local transaction.
142  *
143  * Also switches to ApplyMessageContext as necessary.
144  */
145 static bool
146 ensure_transaction(void)
147 {
148         if (IsTransactionState())
149         {
150                 SetCurrentStatementStartTimestamp();
151
152                 if (CurrentMemoryContext != ApplyMessageContext)
153                         MemoryContextSwitchTo(ApplyMessageContext);
154
155                 return false;
156         }
157
158         SetCurrentStatementStartTimestamp();
159         StartTransactionCommand();
160
161         maybe_reread_subscription();
162
163         MemoryContextSwitchTo(ApplyMessageContext);
164         return true;
165 }
166
167
168 /*
169  * Executor state preparation for evaluation of constraint expressions,
170  * indexes and triggers.
171  *
172  * This is based on similar code in copy.c
173  */
174 static EState *
175 create_estate_for_relation(LogicalRepRelMapEntry *rel)
176 {
177         EState     *estate;
178         ResultRelInfo *resultRelInfo;
179         RangeTblEntry *rte;
180
181         estate = CreateExecutorState();
182
183         rte = makeNode(RangeTblEntry);
184         rte->rtekind = RTE_RELATION;
185         rte->relid = RelationGetRelid(rel->localrel);
186         rte->relkind = rel->localrel->rd_rel->relkind;
187         rte->rellockmode = AccessShareLock;
188         ExecInitRangeTable(estate, list_make1(rte));
189
190         resultRelInfo = makeNode(ResultRelInfo);
191         InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
192
193         estate->es_result_relations = resultRelInfo;
194         estate->es_num_result_relations = 1;
195         estate->es_result_relation_info = resultRelInfo;
196
197         estate->es_output_cid = GetCurrentCommandId(true);
198
199         /* Triggers might need a slot */
200         if (resultRelInfo->ri_TrigDesc)
201                 estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL,
202                                                                                                                         &TTSOpsVirtual);
203
204         /* Prepare to catch AFTER triggers. */
205         AfterTriggerBeginQuery();
206
207         return estate;
208 }
209
210 /*
211  * Executes default values for columns for which we can't map to remote
212  * relation columns.
213  *
214  * This allows us to support tables which have more columns on the downstream
215  * than on the upstream.
216  */
217 static void
218 slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
219                                    TupleTableSlot *slot)
220 {
221         TupleDesc       desc = RelationGetDescr(rel->localrel);
222         int                     num_phys_attrs = desc->natts;
223         int                     i;
224         int                     attnum,
225                                 num_defaults = 0;
226         int                *defmap;
227         ExprState **defexprs;
228         ExprContext *econtext;
229
230         econtext = GetPerTupleExprContext(estate);
231
232         /* We got all the data via replication, no need to evaluate anything. */
233         if (num_phys_attrs == rel->remoterel.natts)
234                 return;
235
236         defmap = (int *) palloc(num_phys_attrs * sizeof(int));
237         defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
238
239         for (attnum = 0; attnum < num_phys_attrs; attnum++)
240         {
241                 Expr       *defexpr;
242
243                 if (TupleDescAttr(desc, attnum)->attisdropped)
244                         continue;
245
246                 if (rel->attrmap[attnum] >= 0)
247                         continue;
248
249                 defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
250
251                 if (defexpr != NULL)
252                 {
253                         /* Run the expression through planner */
254                         defexpr = expression_planner(defexpr);
255
256                         /* Initialize executable expression in copycontext */
257                         defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
258                         defmap[num_defaults] = attnum;
259                         num_defaults++;
260                 }
261
262         }
263
264         for (i = 0; i < num_defaults; i++)
265                 slot->tts_values[defmap[i]] =
266                         ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
267 }
268
269 /*
270  * Error callback to give more context info about type conversion failure.
271  */
272 static void
273 slot_store_error_callback(void *arg)
274 {
275         SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
276         LogicalRepRelMapEntry *rel;
277         char       *remotetypname;
278         Oid                     remotetypoid,
279                                 localtypoid;
280
281         /* Nothing to do if remote attribute number is not set */
282         if (errarg->remote_attnum < 0)
283                 return;
284
285         rel = errarg->rel;
286         remotetypoid = rel->remoterel.atttyps[errarg->remote_attnum];
287
288         /* Fetch remote type name from the LogicalRepTypMap cache */
289         remotetypname = logicalrep_typmap_gettypname(remotetypoid);
290
291         /* Fetch local type OID from the local sys cache */
292         localtypoid = get_atttype(rel->localreloid, errarg->local_attnum + 1);
293
294         errcontext("processing remote data for replication target relation \"%s.%s\" column \"%s\", "
295                            "remote type %s, local type %s",
296                            rel->remoterel.nspname, rel->remoterel.relname,
297                            rel->remoterel.attnames[errarg->remote_attnum],
298                            remotetypname,
299                            format_type_be(localtypoid));
300 }
301
302 /*
303  * Store data in C string form into slot.
304  * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
305  * use better.
306  */
307 static void
308 slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
309                                         char **values)
310 {
311         int                     natts = slot->tts_tupleDescriptor->natts;
312         int                     i;
313         SlotErrCallbackArg errarg;
314         ErrorContextCallback errcallback;
315
316         ExecClearTuple(slot);
317
318         /* Push callback + info on the error context stack */
319         errarg.rel = rel;
320         errarg.local_attnum = -1;
321         errarg.remote_attnum = -1;
322         errcallback.callback = slot_store_error_callback;
323         errcallback.arg = (void *) &errarg;
324         errcallback.previous = error_context_stack;
325         error_context_stack = &errcallback;
326
327         /* Call the "in" function for each non-dropped attribute */
328         for (i = 0; i < natts; i++)
329         {
330                 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
331                 int                     remoteattnum = rel->attrmap[i];
332
333                 if (!att->attisdropped && remoteattnum >= 0 &&
334                         values[remoteattnum] != NULL)
335                 {
336                         Oid                     typinput;
337                         Oid                     typioparam;
338
339                         errarg.local_attnum = i;
340                         errarg.remote_attnum = remoteattnum;
341
342                         getTypeInputInfo(att->atttypid, &typinput, &typioparam);
343                         slot->tts_values[i] =
344                                 OidInputFunctionCall(typinput, values[remoteattnum],
345                                                                          typioparam, att->atttypmod);
346                         slot->tts_isnull[i] = false;
347
348                         errarg.local_attnum = -1;
349                         errarg.remote_attnum = -1;
350                 }
351                 else
352                 {
353                         /*
354                          * We assign NULL to dropped attributes, NULL values, and missing
355                          * values (missing values should be later filled using
356                          * slot_fill_defaults).
357                          */
358                         slot->tts_values[i] = (Datum) 0;
359                         slot->tts_isnull[i] = true;
360                 }
361         }
362
363         /* Pop the error context stack */
364         error_context_stack = errcallback.previous;
365
366         ExecStoreVirtualTuple(slot);
367 }
368
369 /*
370  * Modify slot with user data provided as C strings.
371  * This is somewhat similar to heap_modify_tuple but also calls the type
372  * input function on the user data as the input is the text representation
373  * of the types.
374  */
375 static void
376 slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
377                                          char **values, bool *replaces)
378 {
379         int                     natts = slot->tts_tupleDescriptor->natts;
380         int                     i;
381         SlotErrCallbackArg errarg;
382         ErrorContextCallback errcallback;
383
384         slot_getallattrs(slot);
385         ExecClearTuple(slot);
386
387         /* Push callback + info on the error context stack */
388         errarg.rel = rel;
389         errarg.local_attnum = -1;
390         errarg.remote_attnum = -1;
391         errcallback.callback = slot_store_error_callback;
392         errcallback.arg = (void *) &errarg;
393         errcallback.previous = error_context_stack;
394         error_context_stack = &errcallback;
395
396         /* Call the "in" function for each replaced attribute */
397         for (i = 0; i < natts; i++)
398         {
399                 Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
400                 int                     remoteattnum = rel->attrmap[i];
401
402                 if (remoteattnum < 0)
403                         continue;
404
405                 if (!replaces[remoteattnum])
406                         continue;
407
408                 if (values[remoteattnum] != NULL)
409                 {
410                         Oid                     typinput;
411                         Oid                     typioparam;
412
413                         errarg.local_attnum = i;
414                         errarg.remote_attnum = remoteattnum;
415
416                         getTypeInputInfo(att->atttypid, &typinput, &typioparam);
417                         slot->tts_values[i] =
418                                 OidInputFunctionCall(typinput, values[remoteattnum],
419                                                                          typioparam, att->atttypmod);
420                         slot->tts_isnull[i] = false;
421
422                         errarg.local_attnum = -1;
423                         errarg.remote_attnum = -1;
424                 }
425                 else
426                 {
427                         slot->tts_values[i] = (Datum) 0;
428                         slot->tts_isnull[i] = true;
429                 }
430         }
431
432         /* Pop the error context stack */
433         error_context_stack = errcallback.previous;
434
435         ExecStoreVirtualTuple(slot);
436 }
437
438 /*
439  * Handle BEGIN message.
440  */
441 static void
442 apply_handle_begin(StringInfo s)
443 {
444         LogicalRepBeginData begin_data;
445
446         logicalrep_read_begin(s, &begin_data);
447
448         remote_final_lsn = begin_data.final_lsn;
449
450         in_remote_transaction = true;
451
452         pgstat_report_activity(STATE_RUNNING, NULL);
453 }
454
455 /*
456  * Handle COMMIT message.
457  *
458  * TODO, support tracking of multiple origins
459  */
460 static void
461 apply_handle_commit(StringInfo s)
462 {
463         LogicalRepCommitData commit_data;
464
465         logicalrep_read_commit(s, &commit_data);
466
467         Assert(commit_data.commit_lsn == remote_final_lsn);
468
469         /* The synchronization worker runs in single transaction. */
470         if (IsTransactionState() && !am_tablesync_worker())
471         {
472                 /*
473                  * Update origin state so we can restart streaming from correct
474                  * position in case of crash.
475                  */
476                 replorigin_session_origin_lsn = commit_data.end_lsn;
477                 replorigin_session_origin_timestamp = commit_data.committime;
478
479                 CommitTransactionCommand();
480                 pgstat_report_stat(false);
481
482                 store_flush_position(commit_data.end_lsn);
483         }
484         else
485         {
486                 /* Process any invalidation messages that might have accumulated. */
487                 AcceptInvalidationMessages();
488                 maybe_reread_subscription();
489         }
490
491         in_remote_transaction = false;
492
493         /* Process any tables that are being synchronized in parallel. */
494         process_syncing_tables(commit_data.end_lsn);
495
496         pgstat_report_activity(STATE_IDLE, NULL);
497 }
498
499 /*
500  * Handle ORIGIN message.
501  *
502  * TODO, support tracking of multiple origins
503  */
504 static void
505 apply_handle_origin(StringInfo s)
506 {
507         /*
508          * ORIGIN message can only come inside remote transaction and before any
509          * actual writes.
510          */
511         if (!in_remote_transaction ||
512                 (IsTransactionState() && !am_tablesync_worker()))
513                 ereport(ERROR,
514                                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
515                                  errmsg("ORIGIN message sent out of order")));
516 }
517
518 /*
519  * Handle RELATION message.
520  *
521  * Note we don't do validation against local schema here. The validation
522  * against local schema is postponed until first change for given relation
523  * comes as we only care about it when applying changes for it anyway and we
524  * do less locking this way.
525  */
526 static void
527 apply_handle_relation(StringInfo s)
528 {
529         LogicalRepRelation *rel;
530
531         rel = logicalrep_read_rel(s);
532         logicalrep_relmap_update(rel);
533 }
534
535 /*
536  * Handle TYPE message.
537  *
538  * Note we don't do local mapping here, that's done when the type is
539  * actually used.
540  */
541 static void
542 apply_handle_type(StringInfo s)
543 {
544         LogicalRepTyp typ;
545
546         logicalrep_read_typ(s, &typ);
547         logicalrep_typmap_update(&typ);
548 }
549
550 /*
551  * Get replica identity index or if it is not defined a primary key.
552  *
553  * If neither is defined, returns InvalidOid
554  */
555 static Oid
556 GetRelationIdentityOrPK(Relation rel)
557 {
558         Oid                     idxoid;
559
560         idxoid = RelationGetReplicaIndex(rel);
561
562         if (!OidIsValid(idxoid))
563                 idxoid = RelationGetPrimaryKeyIndex(rel);
564
565         return idxoid;
566 }
567
568 /*
569  * Handle INSERT message.
570  */
571 static void
572 apply_handle_insert(StringInfo s)
573 {
574         LogicalRepRelMapEntry *rel;
575         LogicalRepTupleData newtup;
576         LogicalRepRelId relid;
577         EState     *estate;
578         TupleTableSlot *remoteslot;
579         MemoryContext oldctx;
580
581         ensure_transaction();
582
583         relid = logicalrep_read_insert(s, &newtup);
584         rel = logicalrep_rel_open(relid, RowExclusiveLock);
585         if (!should_apply_changes_for_rel(rel))
586         {
587                 /*
588                  * The relation can't become interesting in the middle of the
589                  * transaction so it's safe to unlock it.
590                  */
591                 logicalrep_rel_close(rel, RowExclusiveLock);
592                 return;
593         }
594
595         /* Initialize the executor state. */
596         estate = create_estate_for_relation(rel);
597         remoteslot = ExecInitExtraTupleSlot(estate,
598                                                                                 RelationGetDescr(rel->localrel),
599                                                                                 &TTSOpsHeapTuple);
600
601         /* Input functions may need an active snapshot, so get one */
602         PushActiveSnapshot(GetTransactionSnapshot());
603
604         /* Process and store remote tuple in the slot */
605         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
606         slot_store_cstrings(remoteslot, rel, newtup.values);
607         slot_fill_defaults(rel, estate, remoteslot);
608         MemoryContextSwitchTo(oldctx);
609
610         ExecOpenIndices(estate->es_result_relation_info, false);
611
612         /* Do the insert. */
613         ExecSimpleRelationInsert(estate, remoteslot);
614
615         /* Cleanup. */
616         ExecCloseIndices(estate->es_result_relation_info);
617         PopActiveSnapshot();
618
619         /* Handle queued AFTER triggers. */
620         AfterTriggerEndQuery(estate);
621
622         ExecResetTupleTable(estate->es_tupleTable, false);
623         FreeExecutorState(estate);
624
625         logicalrep_rel_close(rel, NoLock);
626
627         CommandCounterIncrement();
628 }
629
630 /*
631  * Check if the logical replication relation is updatable and throw
632  * appropriate error if it isn't.
633  */
634 static void
635 check_relation_updatable(LogicalRepRelMapEntry *rel)
636 {
637         /* Updatable, no error. */
638         if (rel->updatable)
639                 return;
640
641         /*
642          * We are in error mode so it's fine this is somewhat slow. It's better to
643          * give user correct error.
644          */
645         if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
646         {
647                 ereport(ERROR,
648                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
649                                  errmsg("publisher did not send replica identity column "
650                                                 "expected by the logical replication target relation \"%s.%s\"",
651                                                 rel->remoterel.nspname, rel->remoterel.relname)));
652         }
653
654         ereport(ERROR,
655                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
656                          errmsg("logical replication target relation \"%s.%s\" has "
657                                         "neither REPLICA IDENTITY index nor PRIMARY "
658                                         "KEY and published relation does not have "
659                                         "REPLICA IDENTITY FULL",
660                                         rel->remoterel.nspname, rel->remoterel.relname)));
661 }
662
663 /*
664  * Handle UPDATE message.
665  *
666  * TODO: FDW support
667  */
668 static void
669 apply_handle_update(StringInfo s)
670 {
671         LogicalRepRelMapEntry *rel;
672         LogicalRepRelId relid;
673         Oid                     idxoid;
674         EState     *estate;
675         EPQState        epqstate;
676         LogicalRepTupleData oldtup;
677         LogicalRepTupleData newtup;
678         bool            has_oldtup;
679         TupleTableSlot *localslot;
680         TupleTableSlot *remoteslot;
681         bool            found;
682         MemoryContext oldctx;
683
684         ensure_transaction();
685
686         relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
687                                                                    &newtup);
688         rel = logicalrep_rel_open(relid, RowExclusiveLock);
689         if (!should_apply_changes_for_rel(rel))
690         {
691                 /*
692                  * The relation can't become interesting in the middle of the
693                  * transaction so it's safe to unlock it.
694                  */
695                 logicalrep_rel_close(rel, RowExclusiveLock);
696                 return;
697         }
698
699         /* Check if we can do the update. */
700         check_relation_updatable(rel);
701
702         /* Initialize the executor state. */
703         estate = create_estate_for_relation(rel);
704         remoteslot = ExecInitExtraTupleSlot(estate,
705                                                                                 RelationGetDescr(rel->localrel),
706                                                                                 &TTSOpsHeapTuple);
707         localslot = ExecInitExtraTupleSlot(estate,
708                                                                            RelationGetDescr(rel->localrel),
709                                                                            &TTSOpsHeapTuple);
710         EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
711
712         PushActiveSnapshot(GetTransactionSnapshot());
713         ExecOpenIndices(estate->es_result_relation_info, false);
714
715         /* Build the search tuple. */
716         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
717         slot_store_cstrings(remoteslot, rel,
718                                                 has_oldtup ? oldtup.values : newtup.values);
719         MemoryContextSwitchTo(oldctx);
720
721         /*
722          * Try to find tuple using either replica identity index, primary key or
723          * if needed, sequential scan.
724          */
725         idxoid = GetRelationIdentityOrPK(rel->localrel);
726         Assert(OidIsValid(idxoid) ||
727                    (rel->remoterel.replident == REPLICA_IDENTITY_FULL && has_oldtup));
728
729         if (OidIsValid(idxoid))
730                 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
731                                                                                          LockTupleExclusive,
732                                                                                          remoteslot, localslot);
733         else
734                 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
735                                                                                  remoteslot, localslot);
736
737         ExecClearTuple(remoteslot);
738
739         /*
740          * Tuple found.
741          *
742          * Note this will fail if there are other conflicting unique indexes.
743          */
744         if (found)
745         {
746                 /* Process and store remote tuple in the slot */
747                 oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
748                 ExecCopySlot(remoteslot, localslot);
749                 slot_modify_cstrings(remoteslot, rel, newtup.values, newtup.changed);
750                 MemoryContextSwitchTo(oldctx);
751
752                 EvalPlanQualSetSlot(&epqstate, remoteslot);
753
754                 /* Do the actual update. */
755                 ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
756         }
757         else
758         {
759                 /*
760                  * The tuple to be updated could not be found.
761                  *
762                  * TODO what to do here, change the log level to LOG perhaps?
763                  */
764                 elog(DEBUG1,
765                          "logical replication did not find row for update "
766                          "in replication target relation \"%s\"",
767                          RelationGetRelationName(rel->localrel));
768         }
769
770         /* Cleanup. */
771         ExecCloseIndices(estate->es_result_relation_info);
772         PopActiveSnapshot();
773
774         /* Handle queued AFTER triggers. */
775         AfterTriggerEndQuery(estate);
776
777         EvalPlanQualEnd(&epqstate);
778         ExecResetTupleTable(estate->es_tupleTable, false);
779         FreeExecutorState(estate);
780
781         logicalrep_rel_close(rel, NoLock);
782
783         CommandCounterIncrement();
784 }
785
786 /*
787  * Handle DELETE message.
788  *
789  * TODO: FDW support
790  */
791 static void
792 apply_handle_delete(StringInfo s)
793 {
794         LogicalRepRelMapEntry *rel;
795         LogicalRepTupleData oldtup;
796         LogicalRepRelId relid;
797         Oid                     idxoid;
798         EState     *estate;
799         EPQState        epqstate;
800         TupleTableSlot *remoteslot;
801         TupleTableSlot *localslot;
802         bool            found;
803         MemoryContext oldctx;
804
805         ensure_transaction();
806
807         relid = logicalrep_read_delete(s, &oldtup);
808         rel = logicalrep_rel_open(relid, RowExclusiveLock);
809         if (!should_apply_changes_for_rel(rel))
810         {
811                 /*
812                  * The relation can't become interesting in the middle of the
813                  * transaction so it's safe to unlock it.
814                  */
815                 logicalrep_rel_close(rel, RowExclusiveLock);
816                 return;
817         }
818
819         /* Check if we can do the delete. */
820         check_relation_updatable(rel);
821
822         /* Initialize the executor state. */
823         estate = create_estate_for_relation(rel);
824         remoteslot = ExecInitExtraTupleSlot(estate,
825                                                                                 RelationGetDescr(rel->localrel),
826                                                                                 &TTSOpsVirtual);
827         localslot = ExecInitExtraTupleSlot(estate,
828                                                                            RelationGetDescr(rel->localrel),
829                                                                            &TTSOpsHeapTuple);
830         EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
831
832         PushActiveSnapshot(GetTransactionSnapshot());
833         ExecOpenIndices(estate->es_result_relation_info, false);
834
835         /* Find the tuple using the replica identity index. */
836         oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
837         slot_store_cstrings(remoteslot, rel, oldtup.values);
838         MemoryContextSwitchTo(oldctx);
839
840         /*
841          * Try to find tuple using either replica identity index, primary key or
842          * if needed, sequential scan.
843          */
844         idxoid = GetRelationIdentityOrPK(rel->localrel);
845         Assert(OidIsValid(idxoid) ||
846                    (rel->remoterel.replident == REPLICA_IDENTITY_FULL));
847
848         if (OidIsValid(idxoid))
849                 found = RelationFindReplTupleByIndex(rel->localrel, idxoid,
850                                                                                          LockTupleExclusive,
851                                                                                          remoteslot, localslot);
852         else
853                 found = RelationFindReplTupleSeq(rel->localrel, LockTupleExclusive,
854                                                                                  remoteslot, localslot);
855         /* If found delete it. */
856         if (found)
857         {
858                 EvalPlanQualSetSlot(&epqstate, localslot);
859
860                 /* Do the actual delete. */
861                 ExecSimpleRelationDelete(estate, &epqstate, localslot);
862         }
863         else
864         {
865                 /* The tuple to be deleted could not be found. */
866                 elog(DEBUG1,
867                          "logical replication could not find row for delete "
868                          "in replication target relation \"%s\"",
869                          RelationGetRelationName(rel->localrel));
870         }
871
872         /* Cleanup. */
873         ExecCloseIndices(estate->es_result_relation_info);
874         PopActiveSnapshot();
875
876         /* Handle queued AFTER triggers. */
877         AfterTriggerEndQuery(estate);
878
879         EvalPlanQualEnd(&epqstate);
880         ExecResetTupleTable(estate->es_tupleTable, false);
881         FreeExecutorState(estate);
882
883         logicalrep_rel_close(rel, NoLock);
884
885         CommandCounterIncrement();
886 }
887
888 /*
889  * Handle TRUNCATE message.
890  *
891  * TODO: FDW support
892  */
893 static void
894 apply_handle_truncate(StringInfo s)
895 {
896         bool            cascade = false;
897         bool            restart_seqs = false;
898         List       *remote_relids = NIL;
899         List       *remote_rels = NIL;
900         List       *rels = NIL;
901         List       *relids = NIL;
902         List       *relids_logged = NIL;
903         ListCell   *lc;
904
905         ensure_transaction();
906
907         remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
908
909         foreach(lc, remote_relids)
910         {
911                 LogicalRepRelId relid = lfirst_oid(lc);
912                 LogicalRepRelMapEntry *rel;
913
914                 rel = logicalrep_rel_open(relid, RowExclusiveLock);
915                 if (!should_apply_changes_for_rel(rel))
916                 {
917                         /*
918                          * The relation can't become interesting in the middle of the
919                          * transaction so it's safe to unlock it.
920                          */
921                         logicalrep_rel_close(rel, RowExclusiveLock);
922                         continue;
923                 }
924
925                 remote_rels = lappend(remote_rels, rel);
926                 rels = lappend(rels, rel->localrel);
927                 relids = lappend_oid(relids, rel->localreloid);
928                 if (RelationIsLogicallyLogged(rel->localrel))
929                         relids_logged = lappend_oid(relids_logged, rel->localreloid);
930         }
931
932         /*
933          * Even if we used CASCADE on the upstream master we explicitly default to
934          * replaying changes without further cascading. This might be later
935          * changeable with a user specified option.
936          */
937         ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs);
938
939         foreach(lc, remote_rels)
940         {
941                 LogicalRepRelMapEntry *rel = lfirst(lc);
942
943                 logicalrep_rel_close(rel, NoLock);
944         }
945
946         CommandCounterIncrement();
947 }
948
949
950 /*
951  * Logical replication protocol message dispatcher.
952  */
953 static void
954 apply_dispatch(StringInfo s)
955 {
956         char            action = pq_getmsgbyte(s);
957
958         switch (action)
959         {
960                         /* BEGIN */
961                 case 'B':
962                         apply_handle_begin(s);
963                         break;
964                         /* COMMIT */
965                 case 'C':
966                         apply_handle_commit(s);
967                         break;
968                         /* INSERT */
969                 case 'I':
970                         apply_handle_insert(s);
971                         break;
972                         /* UPDATE */
973                 case 'U':
974                         apply_handle_update(s);
975                         break;
976                         /* DELETE */
977                 case 'D':
978                         apply_handle_delete(s);
979                         break;
980                         /* TRUNCATE */
981                 case 'T':
982                         apply_handle_truncate(s);
983                         break;
984                         /* RELATION */
985                 case 'R':
986                         apply_handle_relation(s);
987                         break;
988                         /* TYPE */
989                 case 'Y':
990                         apply_handle_type(s);
991                         break;
992                         /* ORIGIN */
993                 case 'O':
994                         apply_handle_origin(s);
995                         break;
996                 default:
997                         ereport(ERROR,
998                                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
999                                          errmsg("invalid logical replication message type \"%c\"", action)));
1000         }
1001 }
1002
1003 /*
1004  * Figure out which write/flush positions to report to the walsender process.
1005  *
1006  * We can't simply report back the last LSN the walsender sent us because the
1007  * local transaction might not yet be flushed to disk locally. Instead we
1008  * build a list that associates local with remote LSNs for every commit. When
1009  * reporting back the flush position to the sender we iterate that list and
1010  * check which entries on it are already locally flushed. Those we can report
1011  * as having been flushed.
1012  *
1013  * The have_pending_txes is true if there are outstanding transactions that
1014  * need to be flushed.
1015  */
1016 static void
1017 get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
1018                                    bool *have_pending_txes)
1019 {
1020         dlist_mutable_iter iter;
1021         XLogRecPtr      local_flush = GetFlushRecPtr();
1022
1023         *write = InvalidXLogRecPtr;
1024         *flush = InvalidXLogRecPtr;
1025
1026         dlist_foreach_modify(iter, &lsn_mapping)
1027         {
1028                 FlushPosition *pos =
1029                 dlist_container(FlushPosition, node, iter.cur);
1030
1031                 *write = pos->remote_end;
1032
1033                 if (pos->local_end <= local_flush)
1034                 {
1035                         *flush = pos->remote_end;
1036                         dlist_delete(iter.cur);
1037                         pfree(pos);
1038                 }
1039                 else
1040                 {
1041                         /*
1042                          * Don't want to uselessly iterate over the rest of the list which
1043                          * could potentially be long. Instead get the last element and
1044                          * grab the write position from there.
1045                          */
1046                         pos = dlist_tail_element(FlushPosition, node,
1047                                                                          &lsn_mapping);
1048                         *write = pos->remote_end;
1049                         *have_pending_txes = true;
1050                         return;
1051                 }
1052         }
1053
1054         *have_pending_txes = !dlist_is_empty(&lsn_mapping);
1055 }
1056
1057 /*
1058  * Store current remote/local lsn pair in the tracking list.
1059  */
1060 static void
1061 store_flush_position(XLogRecPtr remote_lsn)
1062 {
1063         FlushPosition *flushpos;
1064
1065         /* Need to do this in permanent context */
1066         MemoryContextSwitchTo(ApplyContext);
1067
1068         /* Track commit lsn  */
1069         flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
1070         flushpos->local_end = XactLastCommitEnd;
1071         flushpos->remote_end = remote_lsn;
1072
1073         dlist_push_tail(&lsn_mapping, &flushpos->node);
1074         MemoryContextSwitchTo(ApplyMessageContext);
1075 }
1076
1077
1078 /* Update statistics of the worker. */
1079 static void
1080 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
1081 {
1082         MyLogicalRepWorker->last_lsn = last_lsn;
1083         MyLogicalRepWorker->last_send_time = send_time;
1084         MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
1085         if (reply)
1086         {
1087                 MyLogicalRepWorker->reply_lsn = last_lsn;
1088                 MyLogicalRepWorker->reply_time = send_time;
1089         }
1090 }
1091
1092 /*
1093  * Apply main loop.
1094  */
1095 static void
1096 LogicalRepApplyLoop(XLogRecPtr last_received)
1097 {
1098         /*
1099          * Init the ApplyMessageContext which we clean up after each replication
1100          * protocol message.
1101          */
1102         ApplyMessageContext = AllocSetContextCreate(ApplyContext,
1103                                                                                                 "ApplyMessageContext",
1104                                                                                                 ALLOCSET_DEFAULT_SIZES);
1105
1106         /* mark as idle, before starting to loop */
1107         pgstat_report_activity(STATE_IDLE, NULL);
1108
1109         for (;;)
1110         {
1111                 pgsocket        fd = PGINVALID_SOCKET;
1112                 int                     rc;
1113                 int                     len;
1114                 char       *buf = NULL;
1115                 bool            endofstream = false;
1116                 TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1117                 bool            ping_sent = false;
1118                 long            wait_time;
1119
1120                 CHECK_FOR_INTERRUPTS();
1121
1122                 MemoryContextSwitchTo(ApplyMessageContext);
1123
1124                 len = walrcv_receive(wrconn, &buf, &fd);
1125
1126                 if (len != 0)
1127                 {
1128                         /* Process the data */
1129                         for (;;)
1130                         {
1131                                 CHECK_FOR_INTERRUPTS();
1132
1133                                 if (len == 0)
1134                                 {
1135                                         break;
1136                                 }
1137                                 else if (len < 0)
1138                                 {
1139                                         ereport(LOG,
1140                                                         (errmsg("data stream from publisher has ended")));
1141                                         endofstream = true;
1142                                         break;
1143                                 }
1144                                 else
1145                                 {
1146                                         int                     c;
1147                                         StringInfoData s;
1148
1149                                         /* Reset timeout. */
1150                                         last_recv_timestamp = GetCurrentTimestamp();
1151                                         ping_sent = false;
1152
1153                                         /* Ensure we are reading the data into our memory context. */
1154                                         MemoryContextSwitchTo(ApplyMessageContext);
1155
1156                                         s.data = buf;
1157                                         s.len = len;
1158                                         s.cursor = 0;
1159                                         s.maxlen = -1;
1160
1161                                         c = pq_getmsgbyte(&s);
1162
1163                                         if (c == 'w')
1164                                         {
1165                                                 XLogRecPtr      start_lsn;
1166                                                 XLogRecPtr      end_lsn;
1167                                                 TimestampTz send_time;
1168
1169                                                 start_lsn = pq_getmsgint64(&s);
1170                                                 end_lsn = pq_getmsgint64(&s);
1171                                                 send_time = pq_getmsgint64(&s);
1172
1173                                                 if (last_received < start_lsn)
1174                                                         last_received = start_lsn;
1175
1176                                                 if (last_received < end_lsn)
1177                                                         last_received = end_lsn;
1178
1179                                                 UpdateWorkerStats(last_received, send_time, false);
1180
1181                                                 apply_dispatch(&s);
1182                                         }
1183                                         else if (c == 'k')
1184                                         {
1185                                                 XLogRecPtr      end_lsn;
1186                                                 TimestampTz timestamp;
1187                                                 bool            reply_requested;
1188
1189                                                 end_lsn = pq_getmsgint64(&s);
1190                                                 timestamp = pq_getmsgint64(&s);
1191                                                 reply_requested = pq_getmsgbyte(&s);
1192
1193                                                 if (last_received < end_lsn)
1194                                                         last_received = end_lsn;
1195
1196                                                 send_feedback(last_received, reply_requested, false);
1197                                                 UpdateWorkerStats(last_received, timestamp, true);
1198                                         }
1199                                         /* other message types are purposefully ignored */
1200
1201                                         MemoryContextReset(ApplyMessageContext);
1202                                 }
1203
1204                                 len = walrcv_receive(wrconn, &buf, &fd);
1205                         }
1206                 }
1207
1208                 /* confirm all writes so far */
1209                 send_feedback(last_received, false, false);
1210
1211                 if (!in_remote_transaction)
1212                 {
1213                         /*
1214                          * If we didn't get any transactions for a while there might be
1215                          * unconsumed invalidation messages in the queue, consume them
1216                          * now.
1217                          */
1218                         AcceptInvalidationMessages();
1219                         maybe_reread_subscription();
1220
1221                         /* Process any table synchronization changes. */
1222                         process_syncing_tables(last_received);
1223                 }
1224
1225                 /* Cleanup the memory. */
1226                 MemoryContextResetAndDeleteChildren(ApplyMessageContext);
1227                 MemoryContextSwitchTo(TopMemoryContext);
1228
1229                 /* Check if we need to exit the streaming loop. */
1230                 if (endofstream)
1231                 {
1232                         TimeLineID      tli;
1233
1234                         walrcv_endstreaming(wrconn, &tli);
1235                         break;
1236                 }
1237
1238                 /*
1239                  * Wait for more data or latch.  If we have unflushed transactions,
1240                  * wake up after WalWriterDelay to see if they've been flushed yet (in
1241                  * which case we should send a feedback message).  Otherwise, there's
1242                  * no particular urgency about waking up unless we get data or a
1243                  * signal.
1244                  */
1245                 if (!dlist_is_empty(&lsn_mapping))
1246                         wait_time = WalWriterDelay;
1247                 else
1248                         wait_time = NAPTIME_PER_CYCLE;
1249
1250                 rc = WaitLatchOrSocket(MyLatch,
1251                                                            WL_SOCKET_READABLE | WL_LATCH_SET |
1252                                                            WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1253                                                            fd, wait_time,
1254                                                            WAIT_EVENT_LOGICAL_APPLY_MAIN);
1255
1256                 if (rc & WL_LATCH_SET)
1257                 {
1258                         ResetLatch(MyLatch);
1259                         CHECK_FOR_INTERRUPTS();
1260                 }
1261
1262                 if (got_SIGHUP)
1263                 {
1264                         got_SIGHUP = false;
1265                         ProcessConfigFile(PGC_SIGHUP);
1266                 }
1267
1268                 if (rc & WL_TIMEOUT)
1269                 {
1270                         /*
1271                          * We didn't receive anything new. If we haven't heard anything
1272                          * from the server for more than wal_receiver_timeout / 2, ping
1273                          * the server. Also, if it's been longer than
1274                          * wal_receiver_status_interval since the last update we sent,
1275                          * send a status update to the master anyway, to report any
1276                          * progress in applying WAL.
1277                          */
1278                         bool            requestReply = false;
1279
1280                         /*
1281                          * Check if time since last receive from standby has reached the
1282                          * configured limit.
1283                          */
1284                         if (wal_receiver_timeout > 0)
1285                         {
1286                                 TimestampTz now = GetCurrentTimestamp();
1287                                 TimestampTz timeout;
1288
1289                                 timeout =
1290                                         TimestampTzPlusMilliseconds(last_recv_timestamp,
1291                                                                                                 wal_receiver_timeout);
1292
1293                                 if (now >= timeout)
1294                                         ereport(ERROR,
1295                                                         (errmsg("terminating logical replication worker due to timeout")));
1296
1297                                 /*
1298                                  * We didn't receive anything new, for half of receiver
1299                                  * replication timeout. Ping the server.
1300                                  */
1301                                 if (!ping_sent)
1302                                 {
1303                                         timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
1304                                                                                                                   (wal_receiver_timeout / 2));
1305                                         if (now >= timeout)
1306                                         {
1307                                                 requestReply = true;
1308                                                 ping_sent = true;
1309                                         }
1310                                 }
1311                         }
1312
1313                         send_feedback(last_received, requestReply, requestReply);
1314                 }
1315         }
1316 }
1317
1318 /*
1319  * Send a Standby Status Update message to server.
1320  *
1321  * 'recvpos' is the latest LSN we've received data to, force is set if we need
1322  * to send a response to avoid timeouts.
1323  */
1324 static void
1325 send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
1326 {
1327         static StringInfo reply_message = NULL;
1328         static TimestampTz send_time = 0;
1329
1330         static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
1331         static XLogRecPtr last_writepos = InvalidXLogRecPtr;
1332         static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
1333
1334         XLogRecPtr      writepos;
1335         XLogRecPtr      flushpos;
1336         TimestampTz now;
1337         bool            have_pending_txes;
1338
1339         /*
1340          * If the user doesn't want status to be reported to the publisher, be
1341          * sure to exit before doing anything at all.
1342          */
1343         if (!force && wal_receiver_status_interval <= 0)
1344                 return;
1345
1346         /* It's legal to not pass a recvpos */
1347         if (recvpos < last_recvpos)
1348                 recvpos = last_recvpos;
1349
1350         get_flush_position(&writepos, &flushpos, &have_pending_txes);
1351
1352         /*
1353          * No outstanding transactions to flush, we can report the latest received
1354          * position. This is important for synchronous replication.
1355          */
1356         if (!have_pending_txes)
1357                 flushpos = writepos = recvpos;
1358
1359         if (writepos < last_writepos)
1360                 writepos = last_writepos;
1361
1362         if (flushpos < last_flushpos)
1363                 flushpos = last_flushpos;
1364
1365         now = GetCurrentTimestamp();
1366
1367         /* if we've already reported everything we're good */
1368         if (!force &&
1369                 writepos == last_writepos &&
1370                 flushpos == last_flushpos &&
1371                 !TimestampDifferenceExceeds(send_time, now,
1372                                                                         wal_receiver_status_interval * 1000))
1373                 return;
1374         send_time = now;
1375
1376         if (!reply_message)
1377         {
1378                 MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
1379
1380                 reply_message = makeStringInfo();
1381                 MemoryContextSwitchTo(oldctx);
1382         }
1383         else
1384                 resetStringInfo(reply_message);
1385
1386         pq_sendbyte(reply_message, 'r');
1387         pq_sendint64(reply_message, recvpos);   /* write */
1388         pq_sendint64(reply_message, flushpos);  /* flush */
1389         pq_sendint64(reply_message, writepos);  /* apply */
1390         pq_sendint64(reply_message, now);       /* sendTime */
1391         pq_sendbyte(reply_message, requestReply);       /* replyRequested */
1392
1393         elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
1394                  force,
1395                  (uint32) (recvpos >> 32), (uint32) recvpos,
1396                  (uint32) (writepos >> 32), (uint32) writepos,
1397                  (uint32) (flushpos >> 32), (uint32) flushpos
1398                 );
1399
1400         walrcv_send(wrconn, reply_message->data, reply_message->len);
1401
1402         if (recvpos > last_recvpos)
1403                 last_recvpos = recvpos;
1404         if (writepos > last_writepos)
1405                 last_writepos = writepos;
1406         if (flushpos > last_flushpos)
1407                 last_flushpos = flushpos;
1408 }
1409
1410 /*
1411  * Reread subscription info if needed. Most changes will be exit.
1412  */
1413 static void
1414 maybe_reread_subscription(void)
1415 {
1416         MemoryContext oldctx;
1417         Subscription *newsub;
1418         bool            started_tx = false;
1419
1420         /* When cache state is valid there is nothing to do here. */
1421         if (MySubscriptionValid)
1422                 return;
1423
1424         /* This function might be called inside or outside of transaction. */
1425         if (!IsTransactionState())
1426         {
1427                 StartTransactionCommand();
1428                 started_tx = true;
1429         }
1430
1431         /* Ensure allocations in permanent context. */
1432         oldctx = MemoryContextSwitchTo(ApplyContext);
1433
1434         newsub = GetSubscription(MyLogicalRepWorker->subid, true);
1435
1436         /*
1437          * Exit if the subscription was removed. This normally should not happen
1438          * as the worker gets killed during DROP SUBSCRIPTION.
1439          */
1440         if (!newsub)
1441         {
1442                 ereport(LOG,
1443                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1444                                                 "stop because the subscription was removed",
1445                                                 MySubscription->name)));
1446
1447                 proc_exit(0);
1448         }
1449
1450         /*
1451          * Exit if the subscription was disabled. This normally should not happen
1452          * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
1453          */
1454         if (!newsub->enabled)
1455         {
1456                 ereport(LOG,
1457                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1458                                                 "stop because the subscription was disabled",
1459                                                 MySubscription->name)));
1460
1461                 proc_exit(0);
1462         }
1463
1464         /*
1465          * Exit if connection string was changed. The launcher will start new
1466          * worker.
1467          */
1468         if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
1469         {
1470                 ereport(LOG,
1471                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1472                                                 "restart because the connection information was changed",
1473                                                 MySubscription->name)));
1474
1475                 proc_exit(0);
1476         }
1477
1478         /*
1479          * Exit if subscription name was changed (it's used for
1480          * fallback_application_name). The launcher will start new worker.
1481          */
1482         if (strcmp(newsub->name, MySubscription->name) != 0)
1483         {
1484                 ereport(LOG,
1485                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1486                                                 "restart because subscription was renamed",
1487                                                 MySubscription->name)));
1488
1489                 proc_exit(0);
1490         }
1491
1492         /* !slotname should never happen when enabled is true. */
1493         Assert(newsub->slotname);
1494
1495         /*
1496          * We need to make new connection to new slot if slot name has changed so
1497          * exit here as well if that's the case.
1498          */
1499         if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
1500         {
1501                 ereport(LOG,
1502                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1503                                                 "restart because the replication slot name was changed",
1504                                                 MySubscription->name)));
1505
1506                 proc_exit(0);
1507         }
1508
1509         /*
1510          * Exit if publication list was changed. The launcher will start new
1511          * worker.
1512          */
1513         if (!equal(newsub->publications, MySubscription->publications))
1514         {
1515                 ereport(LOG,
1516                                 (errmsg("logical replication apply worker for subscription \"%s\" will "
1517                                                 "restart because subscription's publications were changed",
1518                                                 MySubscription->name)));
1519
1520                 proc_exit(0);
1521         }
1522
1523         /* Check for other changes that should never happen too. */
1524         if (newsub->dbid != MySubscription->dbid)
1525         {
1526                 elog(ERROR, "subscription %u changed unexpectedly",
1527                          MyLogicalRepWorker->subid);
1528         }
1529
1530         /* Clean old subscription info and switch to new one. */
1531         FreeSubscription(MySubscription);
1532         MySubscription = newsub;
1533
1534         MemoryContextSwitchTo(oldctx);
1535
1536         /* Change synchronous commit according to the user's wishes */
1537         SetConfigOption("synchronous_commit", MySubscription->synccommit,
1538                                         PGC_BACKEND, PGC_S_OVERRIDE);
1539
1540         if (started_tx)
1541                 CommitTransactionCommand();
1542
1543         MySubscriptionValid = true;
1544 }
1545
1546 /*
1547  * Callback from subscription syscache invalidation.
1548  */
1549 static void
1550 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
1551 {
1552         MySubscriptionValid = false;
1553 }
1554
1555 /* SIGHUP: set flag to reload configuration at next convenient time */
1556 static void
1557 logicalrep_worker_sighup(SIGNAL_ARGS)
1558 {
1559         int                     save_errno = errno;
1560
1561         got_SIGHUP = true;
1562
1563         /* Waken anything waiting on the process latch */
1564         SetLatch(MyLatch);
1565
1566         errno = save_errno;
1567 }
1568
1569 /* Logical Replication Apply worker entry point */
1570 void
1571 ApplyWorkerMain(Datum main_arg)
1572 {
1573         int                     worker_slot = DatumGetInt32(main_arg);
1574         MemoryContext oldctx;
1575         char            originname[NAMEDATALEN];
1576         XLogRecPtr      origin_startpos;
1577         char       *myslotname;
1578         WalRcvStreamOptions options;
1579
1580         /* Attach to slot */
1581         logicalrep_worker_attach(worker_slot);
1582
1583         /* Setup signal handling */
1584         pqsignal(SIGHUP, logicalrep_worker_sighup);
1585         pqsignal(SIGTERM, die);
1586         BackgroundWorkerUnblockSignals();
1587
1588         /*
1589          * We don't currently need any ResourceOwner in a walreceiver process, but
1590          * if we did, we could call CreateAuxProcessResourceOwner here.
1591          */
1592
1593         /* Initialise stats to a sanish value */
1594         MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
1595                 MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
1596
1597         /* Load the libpq-specific functions */
1598         load_file("libpqwalreceiver", false);
1599
1600         /* Run as replica session replication role. */
1601         SetConfigOption("session_replication_role", "replica",
1602                                         PGC_SUSET, PGC_S_OVERRIDE);
1603
1604         /* Connect to our database. */
1605         BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1606                                                                                           MyLogicalRepWorker->userid,
1607                                                                                           0);
1608
1609         /* Load the subscription into persistent memory context. */
1610         ApplyContext = AllocSetContextCreate(TopMemoryContext,
1611                                                                                  "ApplyContext",
1612                                                                                  ALLOCSET_DEFAULT_SIZES);
1613         StartTransactionCommand();
1614         oldctx = MemoryContextSwitchTo(ApplyContext);
1615
1616         MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
1617         if (!MySubscription)
1618         {
1619                 ereport(LOG,
1620                                 (errmsg("logical replication apply worker for subscription %u will not "
1621                                                 "start because the subscription was removed during startup",
1622                                                 MyLogicalRepWorker->subid)));
1623                 proc_exit(0);
1624         }
1625
1626         MySubscriptionValid = true;
1627         MemoryContextSwitchTo(oldctx);
1628
1629         if (!MySubscription->enabled)
1630         {
1631                 ereport(LOG,
1632                                 (errmsg("logical replication apply worker for subscription \"%s\" will not "
1633                                                 "start because the subscription was disabled during startup",
1634                                                 MySubscription->name)));
1635
1636                 proc_exit(0);
1637         }
1638
1639         /* Setup synchronous commit according to the user's wishes */
1640         SetConfigOption("synchronous_commit", MySubscription->synccommit,
1641                                         PGC_BACKEND, PGC_S_OVERRIDE);
1642
1643         /* Keep us informed about subscription changes. */
1644         CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
1645                                                                   subscription_change_cb,
1646                                                                   (Datum) 0);
1647
1648         if (am_tablesync_worker())
1649                 ereport(LOG,
1650                                 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
1651                                                 MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
1652         else
1653                 ereport(LOG,
1654                                 (errmsg("logical replication apply worker for subscription \"%s\" has started",
1655                                                 MySubscription->name)));
1656
1657         CommitTransactionCommand();
1658
1659         /* Connect to the origin and start the replication. */
1660         elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
1661                  MySubscription->conninfo);
1662
1663         if (am_tablesync_worker())
1664         {
1665                 char       *syncslotname;
1666
1667                 /* This is table synchroniation worker, call initial sync. */
1668                 syncslotname = LogicalRepSyncTableStart(&origin_startpos);
1669
1670                 /* The slot name needs to be allocated in permanent memory context. */
1671                 oldctx = MemoryContextSwitchTo(ApplyContext);
1672                 myslotname = pstrdup(syncslotname);
1673                 MemoryContextSwitchTo(oldctx);
1674
1675                 pfree(syncslotname);
1676         }
1677         else
1678         {
1679                 /* This is main apply worker */
1680                 RepOriginId originid;
1681                 TimeLineID      startpointTLI;
1682                 char       *err;
1683                 int                     server_version;
1684
1685                 myslotname = MySubscription->slotname;
1686
1687                 /*
1688                  * This shouldn't happen if the subscription is enabled, but guard
1689                  * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
1690                  * crash if slot is NULL.)
1691                  */
1692                 if (!myslotname)
1693                         ereport(ERROR,
1694                                         (errmsg("subscription has no replication slot set")));
1695
1696                 /* Setup replication origin tracking. */
1697                 StartTransactionCommand();
1698                 snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
1699                 originid = replorigin_by_name(originname, true);
1700                 if (!OidIsValid(originid))
1701                         originid = replorigin_create(originname);
1702                 replorigin_session_setup(originid);
1703                 replorigin_session_origin = originid;
1704                 origin_startpos = replorigin_session_get_progress(false);
1705                 CommitTransactionCommand();
1706
1707                 wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1708                                                                 &err);
1709                 if (wrconn == NULL)
1710                         ereport(ERROR,
1711                                         (errmsg("could not connect to the publisher: %s", err)));
1712
1713                 /*
1714                  * We don't really use the output identify_system for anything but it
1715                  * does some initializations on the upstream so let's still call it.
1716                  */
1717                 (void) walrcv_identify_system(wrconn, &startpointTLI,
1718                                                                           &server_version);
1719
1720         }
1721
1722         /*
1723          * Setup callback for syscache so that we know when something changes in
1724          * the subscription relation state.
1725          */
1726         CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
1727                                                                   invalidate_syncing_table_states,
1728                                                                   (Datum) 0);
1729
1730         /* Build logical replication streaming options. */
1731         options.logical = true;
1732         options.startpoint = origin_startpos;
1733         options.slotname = myslotname;
1734         options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
1735         options.proto.logical.publication_names = MySubscription->publications;
1736
1737         /* Start normal logical streaming replication. */
1738         walrcv_startstreaming(wrconn, &options);
1739
1740         /* Run the main loop. */
1741         LogicalRepApplyLoop(origin_startpos);
1742
1743         proc_exit(0);
1744 }
1745
1746 /*
1747  * Is current process a logical replication worker?
1748  */
1749 bool
1750 IsLogicalWorker(void)
1751 {
1752         return MyLogicalRepWorker != NULL;
1753 }