}
/*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing. This can be used to avoid inserting a new record that was deleted
- * by someone else. Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances. But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
*/
Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
{
Relation rel;
HeapTuple tup;
- Oid subrelid = InvalidOid;
+ Oid subrelid;
bool nulls[Natts_pg_subscription_rel];
Datum values[Natts_pg_subscription_rel];
tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
ObjectIdGetDatum(subid));
+ if (HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u already exists",
+ relid, subid);
- /*
- * If the record for given table does not exist yet create new record,
- * otherwise update the existing one.
- */
- if (!HeapTupleIsValid(tup) && !update_only)
- {
- /* Form the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
- values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
- tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
- /* Insert tuple into catalog. */
- subrelid = CatalogTupleInsert(rel, tup);
-
- heap_freetuple(tup);
- }
- else if (HeapTupleIsValid(tup))
- {
- bool replaces[Natts_pg_subscription_rel];
+ /* Form the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+ values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
- /* Update the tuple. */
- memset(values, 0, sizeof(values));
- memset(nulls, false, sizeof(nulls));
- memset(replaces, false, sizeof(replaces));
+ tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
- replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
- values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+ /* Insert tuple into catalog. */
+ subrelid = CatalogTupleInsert(rel, tup);
- replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
- if (sublsn != InvalidXLogRecPtr)
- values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
- else
- nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ heap_freetuple(tup);
- tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
- replaces);
+ /* Cleanup. */
+ heap_close(rel, NoLock);
- /* Update the catalog. */
- CatalogTupleUpdate(rel, &tup->t_self, tup);
+ return subrelid;
+}
- subrelid = HeapTupleGetOid(tup);
- }
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+ XLogRecPtr sublsn)
+{
+ Relation rel;
+ HeapTuple tup;
+ Oid subrelid;
+ bool nulls[Natts_pg_subscription_rel];
+ Datum values[Natts_pg_subscription_rel];
+ bool replaces[Natts_pg_subscription_rel];
+
+ LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+ rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+ /* Try finding existing mapping. */
+ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(subid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "subscription table %u in subscription %u does not exist",
+ relid, subid);
+
+ /* Update the tuple. */
+ memset(values, 0, sizeof(values));
+ memset(nulls, false, sizeof(nulls));
+ memset(replaces, false, sizeof(replaces));
+
+ replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+ values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+ if (sublsn != InvalidXLogRecPtr)
+ values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+ else
+ nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+ tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+ replaces);
+
+ /* Update the catalog. */
+ CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+ subrelid = HeapTupleGetOid(tup);
/* Cleanup. */
heap_close(rel, NoLock);
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
StartTransactionCommand();
started_tx = true;
}
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;