From bbd8550bce146f86e5e883f1232292a975c314fb Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 1 Feb 2017 13:42:41 -0500 Subject: [PATCH] Refactor other replication commands to use DestRemoteSimple. Commit a84069d9350400c860d5e932b50dfd337aa407b0 added a new type of DestReceiver to avoid duplicating the existing code for the SHOW command, but it turns out we can leverage that new DestReceiver type in a few more places, saving some code. Michael Paquier, reviewed by Andres Freund and by me. Discussion: http://postgr.es/m/CAB7nPqSdFOQC0evc0r1nJeQyGBqjBrR41MC4rcMqUUpoJaZbtQ%40mail.gmail.com Discussion: http://postgr.es/m/CAB7nPqT2K4XFT1JgqufFBjsOc-NUKXg5qBDucHPMbk6Xi1kYaA@mail.gmail.com --- src/backend/access/common/printsimple.c | 21 ++ src/backend/access/common/tupdesc.c | 8 + src/backend/replication/walsender.c | 247 ++++++++---------------- 3 files changed, 114 insertions(+), 162 deletions(-) diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c index 420de65e20..5fe1c72da8 100644 --- a/src/backend/access/common/printsimple.c +++ b/src/backend/access/common/printsimple.c @@ -22,6 +22,7 @@ #include "catalog/pg_type.h" #include "fmgr.h" #include "libpq/pqformat.h" +#include "utils/builtins.h" /* * At startup time, send a RowDescription message. @@ -99,6 +100,26 @@ printsimple(TupleTableSlot *slot, DestReceiver *self) } break; + case INT4OID: + { + int32 num = DatumGetInt32(value); + char str[12]; /* sign, 10 digits and '\0' */ + + pg_ltoa(num, str); + pq_sendcountedtext(&buf, str, strlen(str), false); + } + break; + + case INT8OID: + { + int64 num = DatumGetInt64(value); + char str[23]; /* sign, 21 digits and '\0' */ + + pg_lltoa(num, str); + pq_sendcountedtext(&buf, str, strlen(str), false); + } + break; + default: elog(ERROR, "unsupported type OID: %u", attr->atttypid); } diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 083c0303dc..4e2ebe1ae7 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -629,6 +629,14 @@ TupleDescInitBuiltinEntry(TupleDesc desc, att->attstorage = 'p'; att->attcollation = InvalidOid; break; + + case INT8OID: + att->attlen = 8; + att->attbyval = FLOAT8PASSBYVAL; + att->attalign = 'd'; + att->attstorage = 'p'; + att->attcollation = InvalidOid; + break; } } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5909b7dd8c..76f09fbdbf 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -302,13 +302,15 @@ WalSndShutdown(void) static void IdentifySystem(void) { - StringInfoData buf; char sysid[32]; - char tli[11]; char xpos[MAXFNAMELEN]; XLogRecPtr logptr; char *dbname = NULL; - Size len; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[4]; + bool nulls[4]; /* * Reply with a result set with one row, four columns. First col is system @@ -328,8 +330,6 @@ IdentifySystem(void) else logptr = GetFlushRecPtr(); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); if (MyDatabaseId != InvalidOid) @@ -346,79 +346,42 @@ IdentifySystem(void) MemoryContextSwitchTo(cur); } - /* Send a RowDescription message */ - pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 4, 2); /* 4 fields */ - - /* first field */ - pq_sendstring(&buf, "systemid"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* second field */ - pq_sendstring(&buf, "timeline"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, INT4OID, 4); /* type oid */ - pq_sendint(&buf, 4, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* third field */ - pq_sendstring(&buf, "xlogpos"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); - /* fourth field */ - pq_sendstring(&buf, "dbname"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - pq_endmessage(&buf); + /* need a tuple descriptor representing four columns */ + tupdesc = CreateTemplateTupleDesc(4, false); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline", + INT4OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname", + TEXTOID, -1, 0); - /* Send a DataRow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 4, 2); /* # of columns */ + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc); /* column 1: system identifier */ - len = strlen(sysid); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, (char *) &sysid, len); + values[0] = CStringGetTextDatum(sysid); /* column 2: timeline */ - len = strlen(tli); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, (char *) tli, len); + values[1] = Int32GetDatum(ThisTimeLineID); /* column 3: xlog position */ - len = strlen(xpos); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, (char *) xpos, len); + values[2] = CStringGetTextDatum(xpos); /* column 4: database name, or NULL if none */ if (dbname) - { - len = strlen(dbname); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, (char *) dbname, len); - } + values[3] = CStringGetTextDatum(dbname); else - { - pq_sendint(&buf, -1, 4); - } + nulls[3] = true; - pq_endmessage(&buf); + /* send it to dest */ + do_tup_output(tstate, values, nulls); + + end_tup_output(tstate); } @@ -695,54 +658,41 @@ StartReplication(StartReplicationCmd *cmd) */ if (sendTimeLineIsHistoric) { - char tli_str[11]; char startpos_str[8 + 1 + 8 + 1]; - Size len; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; - snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); snprintf(startpos_str, sizeof(startpos_str), "%X/%X", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto); - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 2, 2); /* 2 fields */ - - /* Field header */ - pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); /* + * Need a tuple descriptor representing two columns. * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. */ - pq_sendint(&buf, INT8OID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); - pq_sendint(&buf, 0, 4); - pq_sendint(&buf, 0, 2); - - pq_sendstring(&buf, "next_tli_startpos"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); - pq_sendint(&buf, 0, 4); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", + TEXTOID, -1, 0); - /* Data row */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* number of columns */ + /* prepare for projection of tuple */ + tstate = begin_tup_output_tupdesc(dest, tupdesc); - len = strlen(tli_str); - pq_sendint(&buf, len, 4); /* length */ - pq_sendbytes(&buf, tli_str, len); + values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); + values[1] = CStringGetTextDatum(startpos_str); - len = strlen(startpos_str); - pq_sendint(&buf, len, 4); /* length */ - pq_sendbytes(&buf, startpos_str, len); + /* send it to dest */ + do_tup_output(tstate, values, nulls); - pq_endmessage(&buf); + end_tup_output(tstate); } /* Send CommandComplete message */ @@ -790,8 +740,12 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { const char *snapshot_name = NULL; char xpos[MAXFNAMELEN]; - StringInfoData buf; - Size len; + char *slot_name; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[4]; + bool nulls[4]; Assert(!MyReplicationSlot); @@ -868,82 +822,51 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), (uint32) MyReplicationSlot->data.confirmed_flush); - pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 4, 2); /* 4 fields */ - - /* first field: slot name */ - pq_sendstring(&buf, "slot_name"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* second field: LSN at which we became consistent */ - pq_sendstring(&buf, "consistent_point"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* third field: exported snapshot's name */ - pq_sendstring(&buf, "snapshot_name"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* fourth field: output plugin */ - pq_sendstring(&buf, "output_plugin"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); - pq_endmessage(&buf); - - /* Send a DataRow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 4, 2); /* # of columns */ + /* + * Need a tuple descriptor representing four columns: + * - first field: the slot name + * - second field: LSN at which we became consistent + * - third field: exported snapshot's name + * - fourth field: output plugin + */ + tupdesc = CreateTemplateTupleDesc(4, false); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name", + TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin", + TEXTOID, -1, 0); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc); /* slot_name */ - len = strlen(NameStr(MyReplicationSlot->data.name)); - pq_sendint(&buf, len, 4); /* col1 len */ - pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len); + slot_name = NameStr(MyReplicationSlot->data.name); + values[0] = CStringGetTextDatum(slot_name); /* consistent wal location */ - len = strlen(xpos); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, xpos, len); + values[1] = CStringGetTextDatum(xpos); /* snapshot name, or NULL if none */ if (snapshot_name != NULL) - { - len = strlen(snapshot_name); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, snapshot_name, len); - } + values[2] = CStringGetTextDatum(snapshot_name); else - pq_sendint(&buf, -1, 4); + nulls[2] = true; /* plugin, or NULL if none */ if (cmd->plugin != NULL) - { - len = strlen(cmd->plugin); - pq_sendint(&buf, len, 4); - pq_sendbytes(&buf, cmd->plugin, len); - } + values[3] = CStringGetTextDatum(cmd->plugin); else - pq_sendint(&buf, -1, 4); + nulls[3] = true; - pq_endmessage(&buf); + /* send it to dest */ + do_tup_output(tstate, values, nulls); + end_tup_output(tstate); ReplicationSlotRelease(); } -- 2.40.0