From: Peter Eisentraut Date: Fri, 6 Apr 2018 14:00:26 +0000 (-0400) Subject: Split the SetSubscriptionRelState function into two X-Git-Tag: REL_11_BETA1~337 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=bcf79b5bb648d30696406034a61ce0ca3dcb0dea;p=postgresql Split the SetSubscriptionRelState function into two We don't actually need the insert-or-update logic, so it's clearer to have separate functions for the inserting and updating. Author: Petr Jelinek Reviewed-by: Masahiko Sawada --- diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 8e16d3b7bc..8705d8b1d3 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray) } /* - * Set the state of a subscription table. - * - * If update_only is true and the record for given table doesn't exist, do - * nothing. This can be used to avoid inserting a new record that was deleted - * by someone else. Generally, subscription DDL commands should use false, - * workers should use true. - * - * The insert-or-update logic in this function is not concurrency safe so it - * might raise an error in rare circumstances. But if we took a stronger lock - * such as ShareRowExclusiveLock, we would risk more deadlocks. + * Add new state record for a subscription table. */ Oid -SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only) +AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) { Relation rel; HeapTuple tup; - Oid subrelid = InvalidOid; + Oid subrelid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; @@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); + if (HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u already exists", + relid, subid); - /* - * If the record for given table does not exist yet create new record, - * otherwise update the existing one. - */ - if (!HeapTupleIsValid(tup) && !update_only) - { - /* Form the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); - values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - - tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - - /* Insert tuple into catalog. */ - subrelid = CatalogTupleInsert(rel, tup); - - heap_freetuple(tup); - } - else if (HeapTupleIsValid(tup)) - { - bool replaces[Natts_pg_subscription_rel]; + /* Form the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - /* Update the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replaces, false, sizeof(replaces)); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + /* Insert tuple into catalog. */ + subrelid = CatalogTupleInsert(rel, tup); - replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + heap_freetuple(tup); - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Cleanup. */ + heap_close(rel, NoLock); - /* Update the catalog. */ - CatalogTupleUpdate(rel, &tup->t_self, tup); + return subrelid; +} - subrelid = HeapTupleGetOid(tup); - } +/* + * Update the state of a subscription table. + */ +Oid +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + HeapTuple tup; + Oid subrelid; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + subrelid = HeapTupleGetOid(tup); /* Cleanup. */ heap_close(rel, NoLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 2694e1b2d7..f138e61a8d 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr, false); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); } /* @@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { - SetSubscriptionRelState(sub->oid, relid, + AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr, false); + InvalidXLogRecPtr); ereport(DEBUG1, (errmsg("table \"%s.%s\" added to subscription \"%s\"", rv->schemaname, rv->relname, sub->name))); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e50b9f7905..acc6498567 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; } - SetSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn, true); + + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); } } else @@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); pgstat_report_stat(false); @@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. */ - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_SYNCDONE, - *origin_startpos, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_SYNCDONE, + *origin_startpos); finish_sync_worker(); } break; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index d936973a9d..5cf268f181 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -67,8 +67,10 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; -extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only); +extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); +extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid);