]> granicus.if.org Git - postgresql/commitdiff
Split the SetSubscriptionRelState function into two
authorPeter Eisentraut <peter_e@gmx.net>
Fri, 6 Apr 2018 14:00:26 +0000 (10:00 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Fri, 6 Apr 2018 14:00:26 +0000 (10:00 -0400)
We don't actually need the insert-or-update logic, so it's clearer to
have separate functions for the inserting and updating.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
src/backend/catalog/pg_subscription.c
src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/include/catalog/pg_subscription_rel.h

index 8e16d3b7bcec3239fb151f0038e23966a1c92a87..8705d8b1d36d536e3ee33266b504a890f3208896 100644 (file)
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                                               XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+                                               XLogRecPtr sublsn)
 {
        Relation        rel;
        HeapTuple       tup;
-       Oid                     subrelid = InvalidOid;
+       Oid                     subrelid;
        bool            nulls[Natts_pg_subscription_rel];
        Datum           values[Natts_pg_subscription_rel];
 
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
        tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
                                                          ObjectIdGetDatum(relid),
                                                          ObjectIdGetDatum(subid));
+       if (HeapTupleIsValid(tup))
+               elog(ERROR, "subscription table %u in subscription %u already exists",
+                        relid, subid);
 
-       /*
-        * If the record for given table does not exist yet create new record,
-        * otherwise update the existing one.
-        */
-       if (!HeapTupleIsValid(tup) && !update_only)
-       {
-               /* Form the tuple. */
-               memset(values, 0, sizeof(values));
-               memset(nulls, false, sizeof(nulls));
-               values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-               values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-               values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-               if (sublsn != InvalidXLogRecPtr)
-                       values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-               else
-                       nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-               tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-               /* Insert tuple into catalog. */
-               subrelid = CatalogTupleInsert(rel, tup);
-
-               heap_freetuple(tup);
-       }
-       else if (HeapTupleIsValid(tup))
-       {
-               bool            replaces[Natts_pg_subscription_rel];
+       /* Form the tuple. */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+       values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+       values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+       values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+       if (sublsn != InvalidXLogRecPtr)
+               values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+       else
+               nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-               /* Update the tuple. */
-               memset(values, 0, sizeof(values));
-               memset(nulls, false, sizeof(nulls));
-               memset(replaces, false, sizeof(replaces));
+       tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-               replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-               values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+       /* Insert tuple into catalog. */
+       subrelid = CatalogTupleInsert(rel, tup);
 
-               replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-               if (sublsn != InvalidXLogRecPtr)
-                       values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-               else
-                       nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+       heap_freetuple(tup);
 
-               tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-                                                               replaces);
+       /* Cleanup. */
+       heap_close(rel, NoLock);
 
-               /* Update the catalog. */
-               CatalogTupleUpdate(rel, &tup->t_self, tup);
+       return subrelid;
+}
 
-               subrelid = HeapTupleGetOid(tup);
-       }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+                                                  XLogRecPtr sublsn)
+{
+       Relation        rel;
+       HeapTuple       tup;
+       Oid                     subrelid;
+       bool            nulls[Natts_pg_subscription_rel];
+       Datum           values[Natts_pg_subscription_rel];
+       bool            replaces[Natts_pg_subscription_rel];
+
+       LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+       rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+       /* Try finding existing mapping. */
+       tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+                                                         ObjectIdGetDatum(relid),
+                                                         ObjectIdGetDatum(subid));
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "subscription table %u in subscription %u does not exist",
+                        relid, subid);
+
+       /* Update the tuple. */
+       memset(values, 0, sizeof(values));
+       memset(nulls, false, sizeof(nulls));
+       memset(replaces, false, sizeof(replaces));
+
+       replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+       values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+       replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+       if (sublsn != InvalidXLogRecPtr)
+               values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+       else
+               nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+       tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+                                                       replaces);
+
+       /* Update the catalog. */
+       CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+       subrelid = HeapTupleGetOid(tup);
 
        /* Cleanup. */
        heap_close(rel, NoLock);
index 2694e1b2d746ff714b25fdcbd8bc38fd3717b31c..f138e61a8d3d9d111e52b5fe59742df1967e9042 100644 (file)
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                                CheckSubscriptionRelkind(get_rel_relkind(relid),
                                                                                 rv->schemaname, rv->relname);
 
-                               SetSubscriptionRelState(subid, relid, table_state,
-                                                                               InvalidXLogRecPtr, false);
+                               AddSubscriptionRelState(subid, relid, table_state,
+                                                                               InvalidXLogRecPtr);
                        }
 
                        /*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
                if (!bsearch(&relid, subrel_local_oids,
                                         list_length(subrel_states), sizeof(Oid), oid_cmp))
                {
-                       SetSubscriptionRelState(sub->oid, relid,
+                       AddSubscriptionRelState(sub->oid, relid,
                                                                        copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-                                                                       InvalidXLogRecPtr, false);
+                                                                       InvalidXLogRecPtr);
                        ereport(DEBUG1,
                                        (errmsg("table \"%s.%s\" added to subscription \"%s\"",
                                                        rv->schemaname, rv->relname, sub->name)));
index e50b9f790565b912594995b1189fe0573f2fb575..acc6498567d07c3e93d0adf5039a10c693e38f64 100644 (file)
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
                SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-               SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                                               MyLogicalRepWorker->relid,
-                                                               MyLogicalRepWorker->relstate,
-                                                               MyLogicalRepWorker->relstate_lsn,
-                                                               true);
+               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                                                  MyLogicalRepWorker->relid,
+                                                                  MyLogicalRepWorker->relstate,
+                                                                  MyLogicalRepWorker->relstate_lsn);
 
                walrcv_endstreaming(wrconn, &tli);
                finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                        StartTransactionCommand();
                                        started_tx = true;
                                }
-                               SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                                                               rstate->relid, rstate->state,
-                                                                               rstate->lsn, true);
+
+                               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                                                                  rstate->relid, rstate->state,
+                                                                                  rstate->lsn);
                        }
                }
                else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
                                /* Update the state and make it visible to others. */
                                StartTransactionCommand();
-                               SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                                                               MyLogicalRepWorker->relid,
-                                                                               MyLogicalRepWorker->relstate,
-                                                                               MyLogicalRepWorker->relstate_lsn,
-                                                                               true);
+                               UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                                                                  MyLogicalRepWorker->relid,
+                                                                                  MyLogicalRepWorker->relstate,
+                                                                                  MyLogicalRepWorker->relstate_lsn);
                                CommitTransactionCommand();
                                pgstat_report_stat(false);
 
@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                         * Update the new state in catalog.  No need to bother
                                         * with the shmem state as we are exiting for good.
                                         */
-                                       SetSubscriptionRelState(MyLogicalRepWorker->subid,
-                                                                                       MyLogicalRepWorker->relid,
-                                                                                       SUBREL_STATE_SYNCDONE,
-                                                                                       *origin_startpos,
-                                                                                       true);
+                                       UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+                                                                                          MyLogicalRepWorker->relid,
+                                                                                          SUBREL_STATE_SYNCDONE,
+                                                                                          *origin_startpos);
                                        finish_sync_worker();
                                }
                                break;
index d936973a9da091f768037fa1f4e543daf500356f..5cf268f181451e894be293e1feeaeb7f699db430 100644 (file)
@@ -67,8 +67,10 @@ typedef struct SubscriptionRelState
        char            state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-                                               XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+                                               XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+                                                  XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
                                                XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);