static Datum
pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool binary)
{
- Name name = PG_GETARG_NAME(0);
+ Name name;
XLogRecPtr upto_lsn;
int32 upto_nchanges;
-
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
-
XLogRecPtr end_of_wal;
XLogRecPtr startptr;
-
LogicalDecodingContext *ctx;
-
ResourceOwner old_resowner = CurrentResourceOwner;
ArrayType *arr;
Size ndim;
List *options = NIL;
DecodingOutputState *p;
+ check_permissions();
+
+ CheckLogicalDecodingRequirements();
+
+ if (PG_ARGISNULL(0))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("slot name must not be null")));
+ name = PG_GETARG_NAME(0);
+
if (PG_ARGISNULL(1))
upto_lsn = InvalidXLogRecPtr;
else
else
upto_nchanges = PG_GETARG_INT32(2);
+ if (PG_ARGISNULL(3))
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("options array must not be null")));
+ arr = PG_GETARG_ARRAYTYPE_P(3);
+
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
if (get_call_result_type(fcinfo, NULL, &p->tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
- check_permissions();
-
- CheckLogicalDecodingRequirements();
-
- arr = PG_GETARG_ARRAYTYPE_P(3);
- ndim = ARR_NDIM(arr);
-
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
+ /* Deconstruct options array */
+ ndim = ARR_NDIM(arr);
if (ndim > 1)
{
ereport(ERROR,
else
end_of_wal = GetXLogReplayRecPtr(NULL);
- CheckLogicalDecodingRequirements();
ReplicationSlotAcquire(NameStr(*name));
PG_TRY();
break;
CHECK_FOR_INTERRUPTS();
}
+
+ tuplestore_donestoring(tupstore);
+
+ CurrentResourceOwner = old_resowner;
+
+ /*
+ * Next time, start where we left off. (Hunting things, the family
+ * business..)
+ */
+ if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
+ LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
+
+ /* free context, call shutdown callback */
+ FreeDecodingContext(ctx);
+
+ ReplicationSlotRelease();
+ InvalidateSystemCaches();
}
PG_CATCH();
{
}
PG_END_TRY();
- tuplestore_donestoring(tupstore);
-
- CurrentResourceOwner = old_resowner;
-
- /*
- * Next time, start where we left off. (Hunting things, the family
- * business..)
- */
- if (ctx->reader->EndRecPtr != InvalidXLogRecPtr && confirm)
- LogicalConfirmReceivedLocation(ctx->reader->EndRecPtr);
-
- /* free context, call shutdown callback */
- FreeDecodingContext(ctx);
-
- ReplicationSlotRelease();
- InvalidateSystemCaches();
-
return (Datum) 0;
}
Datum
pg_logical_slot_get_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, true, false);
}
/*
Datum
pg_logical_slot_peek_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, false, false);
}
/*
Datum
pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, true, true);
}
/*
Datum
pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
- Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true);
-
- return ret;
+ return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
DESCR("brin(internal)");
DATA(insert OID = 3801 ( brinoptions PGNSP PGUID 12 1 0 0 0 f f f f t f s s 2 0 17 "1009 16" _null_ _null_ _null_ _null_ _null_ brinoptions _null_ _null_ _null_ ));
DESCR("brin(internal)");
-DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f f f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ ));
+DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ ));
DESCR("brin: standalone scan new table pages");
DATA(insert OID = 339 ( poly_same PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 16 "604 604" _null_ _null_ _null_ _null_ _null_ poly_same _null_ _null_ _null_ ));
DESCR("statistics: reset collected statistics for current database");
DATA(insert OID = 3775 ( pg_stat_reset_shared PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_shared _null_ _null_ _null_ ));
DESCR("statistics: reset collected statistics shared across the cluster");
-DATA(insert OID = 3776 ( pg_stat_reset_single_table_counters PGNSP PGUID 12 1 0 0 0 f f f f f f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_table_counters _null_ _null_ _null_ ));
+DATA(insert OID = 3776 ( pg_stat_reset_single_table_counters PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_table_counters _null_ _null_ _null_ ));
DESCR("statistics: reset collected statistics for a single table or index in the current database");
-DATA(insert OID = 3777 ( pg_stat_reset_single_function_counters PGNSP PGUID 12 1 0 0 0 f f f f f f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_function_counters _null_ _null_ _null_ ));
+DATA(insert OID = 3777 ( pg_stat_reset_single_function_counters PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 2278 "26" _null_ _null_ _null_ _null_ _null_ pg_stat_reset_single_function_counters _null_ _null_ _null_ ));
DESCR("statistics: reset collected statistics for a single function in the current database");
DATA(insert OID = 3163 ( pg_trigger_depth PGNSP PGUID 12 1 0 0 0 f f f f t f s s 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_trigger_depth _null_ _null_ _null_ ));
DESCR("SP-GiST support for quad tree over range");
/* replication slots */
-DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot");
-DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot");
DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
DESCR("information about replication slots currently in use");
-DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
+DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
DESCR("set up a logical replication slot");
DATA(insert OID = 3782 ( pg_logical_slot_get_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,25}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_get_changes _null_ _null_ _null_ ));
DESCR("get changes from replication slot");