]> granicus.if.org Git - postgresql/commitdiff
Fix updating of pg_subscription_rel from workers
authorPeter Eisentraut <peter_e@gmx.net>
Wed, 7 Jun 2017 17:49:14 +0000 (13:49 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Wed, 7 Jun 2017 17:49:14 +0000 (13:49 -0400)
A logical replication worker should not insert new rows into
pg_subscription_rel, only update existing rows, so that there are no
races if a concurrent refresh removes rows.  Adjust the API to be able
to choose that behavior.

Author: Masahiko Sawada <sawada.mshk@gmail.com>
Reported-by: tushar <tushar.ahuja@enterprisedb.com>
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/include/catalog/pg_subscription_rel.h

index ab5f3719fc397fdaba5f4f7fefc7dc7fd54b2a35..c5b2541319ee8fa13e01cd1d368e409316c64c74 100644 (file)
@@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray)
 /*
  * Set the state of a subscription table.
  *
+ * If update_only is true and the record for given table doesn't exist, do
+ * nothing.  This can be used to avoid inserting a new record that was deleted
+ * by someone else.  Generally, subscription DDL commands should use false,
+ * workers should use true.
+ *
  * The insert-or-update logic in this function is not concurrency safe so it
  * might raise an error in rare circumstances.  But if we took a stronger lock
  * such as ShareRowExclusiveLock, we would risk more deadlocks.
  */
 Oid
 SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                                               XLogRecPtr sublsn)
+                                               XLogRecPtr sublsn, bool update_only)
 {
        Relation        rel;
        HeapTuple       tup;
-       Oid                     subrelid;
+       Oid                     subrelid = InvalidOid;
        bool            nulls[Natts_pg_subscription_rel];
        Datum           values[Natts_pg_subscription_rel];
 
@@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
         * If the record for given table does not exist yet create new record,
         * otherwise update the existing one.
         */
-       if (!HeapTupleIsValid(tup))
+       if (!HeapTupleIsValid(tup) && !update_only)
        {
                /* Form the tuple. */
                memset(values, 0, sizeof(values));
@@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 
                heap_freetuple(tup);
        }
-       else
+       else if (HeapTupleIsValid(tup))
        {
                bool            replaces[Natts_pg_subscription_rel];
 
index ad98b38efe8adc6d899f319dc171fd0eb1ac7959..49737a904207ed8c000e092ee9d95ee0b76b78bb 100644 (file)
@@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                                                                                 rv->schemaname, rv->relname);
 
                                SetSubscriptionRelState(subid, relid, table_state,
-                                                                               InvalidXLogRecPtr);
+                                                                               InvalidXLogRecPtr, false);
                        }
 
                        ereport(NOTICE,
@@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
                {
                        SetSubscriptionRelState(sub->oid, relid,
                                                  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-                                                                       InvalidXLogRecPtr);
+                                                                       InvalidXLogRecPtr, false);
                        ereport(NOTICE,
                                        (errmsg("added subscription for table %s.%s",
                                                        quote_identifier(rv->schemaname),
index 6fe39d20237d3956819b3da64ffc997df21cd5bc..f57ae6ee2d560abc048abad2b7b345aff600c955 100644 (file)
@@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                MyLogicalRepWorker->relid,
                                                                MyLogicalRepWorker->relstate,
-                                                               MyLogicalRepWorker->relstate_lsn);
+                                                               MyLogicalRepWorker->relstate_lsn,
+                                                               true);
 
                walrcv_endstreaming(wrconn, &tli);
                finish_sync_worker();
@@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                }
                                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                                rstate->relid, rstate->state,
-                                                                               rstate->lsn);
+                                                                               rstate->lsn, true);
                        }
                }
                else
@@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                                MyLogicalRepWorker->relid,
                                                                                MyLogicalRepWorker->relstate,
-                                                                               MyLogicalRepWorker->relstate_lsn);
+                                                                               MyLogicalRepWorker->relstate_lsn,
+                                                                               true);
                                CommitTransactionCommand();
                                pgstat_report_stat(false);
 
@@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                        SetSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                                        MyLogicalRepWorker->relid,
                                                                                        SUBREL_STATE_SYNCDONE,
-                                                                                       *origin_startpos);
+                                                                                       *origin_startpos,
+                                                                                       true);
                                        finish_sync_worker();
                                }
                                break;
index 391f96b76e4f4110e90336a6572d261fdab6d352..f5f6191676884af40a21070741e4120a8517455c 100644 (file)
@@ -71,7 +71,7 @@ typedef struct SubscriptionRelState
 } SubscriptionRelState;
 
 extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                                               XLogRecPtr sublsn);
+                                               XLogRecPtr sublsn, bool update_only);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
                                                XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);