In incorperates changes from myself and a number of contributors.
This update to dbmirror provides:
-replication of sequence operations via setval/nextval
-DBMirror.pl support for logging to syslog
-changed the names of the tables to dbmirror_* (no quotes required)
-Support for writitng SQL statements to files instead of directly to
a slave database
-More options for DBMirror.pl in the config files.
Steven Singer
#
#
##############################################################################
-# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.7 2003/11/29 22:39:19 pgsql Exp $
+# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.8 2004/02/17 03:34:35 momjian Exp $
#
##############################################################################
sub mirrorInsert($$$$$);
sub mirrorDelete($$$$$);
sub mirrorUpdate($$$$$);
-sub sendQueryToSlaves($$);
sub logErrorMessage($);
-sub openSlaveConnection($);
+sub setupSlave($);
sub updateMirrorHostTable($$);
- sub extractData($$);
+sub extractData($$);
local $::masterHost;
local $::masterDb;
local $::masterUser;
local $::masterPassword;
local $::errorThreshold=5;
local $::errorEmailAddr=undef;
+local $::sleepInterval=60;
my %slaveInfoHash;
local $::slaveInfo = \%slaveInfoHash;
die;
}
-
- my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+ if (defined($::syslog))
+ {
+ # log with syslog
+ require Sys::Syslog;
+ import Sys::Syslog qw(openlog syslog);
+ openlog($0, 'cons,pid', 'user');
+ syslog("info", '%s', "starting $0 script with $ARGV[0]");
+ }
+
+ my $connectString;
+ if(defined($::masterHost))
+ {
+ $connectString .= "host=$::masterHost ";
+ }
+ if(defined($::masterPort))
+ {
+ $connectString .= "port=$::masterPort ";
+ }
+ $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword";
$masterConn = Pg::connectdb($connectString);
my $firstTime = 1;
while(1) {
if($firstTime == 0) {
- sleep 60;
+ sleep $::sleepInterval;
}
$firstTime = 0;
-# Open up the connection to the slave.
- if(! defined $::slaveInfo->{"status"} ||
- $::slaveInfo->{"status"} == -1) {
- openSlaveConnection($::slaveInfo);
- }
-
+ setupSlave($::slaveInfo);
+
- sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
- sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
#Obtain a list of pending transactions using ordering by our approximation
#to the commit time. The commit time approximation is taken to be the
#SeqId of the last row edit in the transaction.
- my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd";
- $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN";
- $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = ";
- $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"=";
- $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' ";
- $pendingTransQuery .= " ON pd.\"XID\"";
- $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null ";
- $pendingTransQuery .= " GROUP BY pd.\"XID\" ";
- $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")";
+ my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd";
+ $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN";
+ $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = ";
+ $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName=";
+ $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' ";
+ $pendingTransQuery .= " ON pd.XID";
+ $pendingTransQuery .= " = mt.XID WHERE mt.XID is null ";
+
+
+ $pendingTransQuery .= " GROUP BY pd.XID";
+ $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)";
my $pendingTransResults = $masterConn->exec($pendingTransQuery);
my $XID = $pendingTransResults->getvalue($curTransTuple,0);
my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
my $seqId;
-
- my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\",";
- $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" ";
- $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata ";
- $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND ";
-
- $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC";
+
+
+ if($::slaveInfo->{'status'} eq 'FileClosed')
+ {
+ openTransactionFile($::slaveInfo,$XID);
+ }
+
+
+
+ my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,";
+ $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data ";
+ $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata ";
+ $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId ";
+
+ $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC";
my $pendingResults = $masterConn->exec($pendingQuery);
die;
}
-
+ sendQueryToSlaves($XID,"BEGIN");
my $numPending = $pendingResults->ntuples;
my $curTuple = 0;
- sendQueryToSlaves(undef,"BEGIN");
while ($curTuple < $numPending) {
$seqId = $pendingResults->getvalue($curTuple,0);
my $tableName = $pendingResults->getvalue($curTuple,1);
my $op = $pendingResults->getvalue($curTuple,2);
-
$curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
$pendingResults,$curTuple) +1;
- if($::slaveInfo->{"status"}==-1) {
- last;
- }
}
- #Now commit the transaction.
- if($::slaveInfo->{"status"}==-1) {
+
+ if($::slaveInfo->{'status'} ne 'DBOpen' &&
+ $::slaveInfo->{'status'} ne 'FileOpen')
+ {
last;
}
sendQueryToSlaves(undef,"COMMIT");
+ #Now commit the transaction.
updateMirrorHostTable($XID,$seqId);
- if($commandCount > 5000) {
- $commandCount = 0;
- $::slaveInfo->{"status"} = -1;
- $::slaveInfo->{"slaveConn"}->reset;
- #Open the connection right away.
- openSlaveConnection($::slaveInfo);
-
- }
$pendingResults = undef;
$curTransTuple = $curTransTuple +1;
+
+ if($::slaveInfo->{'status'} eq 'FileOpen')
+ {
+ close ($::slaveInfo->{'TransactionFile'});
+ }
+ elsif($::slaveInfo->{'status'} eq 'DBOpen')
+ {
+ if($commandCount > 5000) {
+ $commandCount = 0;
+ $::slaveInfo->{"status"} = 'DBClosed';
+ $::slaveInfo->{"slaveConn"}->reset;
+ #Open the connection right away.
+ openSlaveConnection($::slaveInfo);
+
+ }
+ }
+
}#while transactions left.
$pendingTransResults = undef;
my $pendingResults = $_[4];
my $currentTuple = $_[5];
+
if($op eq 'i') {
$currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
,$currentTuple);
$currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
$currentTuple);
}
+ if($op eq 's') {
+ $currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults,
+ $currentTuple);
+ }
$commandCount = $commandCount +1;
if($commandCount % 100 == 0) {
# print "Sent 100 commmands on SeqId $seqId \n";
$firstIteration=0;
}
$valuesQuery .= ")";
- sendQueryToSlaves(undef,$insertQuery . $valuesQuery);
+ sendQueryToSlaves($transId,$insertQuery . $valuesQuery);
return $currentTuple;
}
$counter++;
$firstField=0;
}
-
sendQueryToSlaves($transId,$deleteQuery);
return $currentTuple;
}
my $transId = $_[2];
my $pendingResult = $_[3];
my $currentTuple = $_[4];
-
+
my $counter;
my $quotedValue;
my $updateQuery = "UPDATE $tableName SET ";
my $currentField;
-
-
my %keyValueHash;
my %dataValueHash;
my $firstIteration=1;
}
$firstIteration=0;
}
-
sendQueryToSlaves($transId,$updateQuery);
return $currentTuple+1;
}
+sub mirrorSequence($$$$$) {
+ my $seqId = $_[0];
+ my $sequenceName = $_[1];
+ my $transId = $_[2];
+ my $pendingResult = $_[3];
+ my $currentTuple = $_[4];
+
+
+ my $query;
+ my $sequenceValue = $pendingResult->getvalue($currentTuple,4);
+ $query = sprintf("select setval(%s,%s)",$sequenceName,$sequenceValue);
+
+ sendQueryToSlaves($transId,$query);
+ return $currentTuple;
+
+}
=item sendQueryToSlaves(seqId,sqlQuery)
my $seqId = $_[0];
my $sqlQuery = $_[1];
- if($::slaveInfo->{"status"} == 0) {
+ if($::slaveInfo->{"status"} eq 'DBOpen') {
my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
my $errorMessage;
$::slaveInfo->{"status"} = -1;
}
}
+ elsif($::slaveInfo->{"status"} eq 'FileOpen' ) {
+ my $xfile = $::slaveInfo->{'TransactionFile'};
+ print $xfile $sqlQuery . ";\n";
+ }
+
+
}
+
+
=item logErrorMessage(error)
Mails an error message to the users specified $errorEmailAddr
print mailPipe "\n\n\n=================================================\n";
close mailPipe;
}
+
+ if (defined($::syslog))
+ {
+ syslog('err', '%s (%m)', $error);
+ }
+
warn($error);
$lastErrorMsg = $error;
}
-sub openSlaveConnection($) {
+sub setupSlave($) {
my $slavePtr = $_[0];
- my $slaveConn;
- my $slaveConnString = "host=" . $slavePtr->{"slaveHost"};
- $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
- $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
- $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
-
- $slaveConn = Pg::connectdb($slaveConnString);
-
- if($slaveConn->status != PGRES_CONNECTION_OK) {
- my $errorMessage = "Can't connect to slave database " ;
- $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
- $errorMessage .= $slaveConn->errorMessage;
- logErrorMessage($errorMessage);
- $slavePtr->{"status"} = -1;
- }
- else {
- $slavePtr->{"slaveConn"} = $slaveConn;
$slavePtr->{"status"} = 0;
#Determine the MirrorHostId for the slave from the master's database
- my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM '
- . ' "MirrorHost" WHERE "HostName"'
- . '=\'' . $slavePtr->{"slaveHost"}
+ my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM '
+ . ' dbmirror_MirrorHost WHERE SlaveName'
+ . '=\'' . $slavePtr->{"slaveName"}
. '\'');
if($resultSet->ntuples !=1) {
- my $errorMessage .= $slavePtr->{"slaveHost"} ."\n";
+ my $errorMessage .= $slavePtr->{"slaveName"} ."\n";
$errorMessage .= "Has no MirrorHost entry on master\n";
logErrorMessage($errorMessage);
$slavePtr->{"status"}=-1;
}
$slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
-
-
-
+
+ if(defined($::slaveInfo->{'slaveDb'})) {
+ # We talk directly to a slave database.
+ #
+ if($::slaveInfo->{"status"} ne 'DBOpen')
+ {
+ openSlaveConnection($::slaveInfo);
+ }
+ sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
+ }
+ else {
+ $::slaveInfo->{"status"} = 'FileClosed';
}
+
}
-
=item updateMirrorHostTable(lastTransId,lastSeqId)
Updates the MirroredTransaction table to reflect the fact that
my $lastTransId = shift;
my $lastSeqId = shift;
- if($::slaveInfo->{"status"}==0) {
- my $deleteTransactionQuery;
- my $deleteResult;
- my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" ";
- $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")";
- $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
-
- my $updateResult = $masterConn->exec($updateMasterQuery);
- unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
- my $errorMessage = $masterConn->errorMessage . "\n";
- $errorMessage .= $updateMasterQuery;
- logErrorMessage($errorMessage);
- die;
- }
+
+
+ my $deleteTransactionQuery;
+ my $deleteResult;
+ my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction ";
+ $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)";
+ $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
+
+ my $updateResult = $masterConn->exec($updateMasterQuery);
+ unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
+ my $errorMessage = $masterConn->errorMessage . "\n";
+ $errorMessage .= $updateMasterQuery;
+ logErrorMessage($errorMessage);
+ die;
+ }
# print "Updated slaves to transaction $lastTransId\n" ;
# flush STDOUT;
- #If this transaction has now been mirrored to all mirror hosts
- #then it can be deleted.
- $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"='
- . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"'
- . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
- . ' "MirrorHost")';
-
- $deleteResult = $masterConn->exec($deleteTransactionQuery);
- if($deleteResult->resultStatus!=PGRES_COMMAND_OK) {
- logErrorMessage($masterConn->errorMessage . "\n" .
- $deleteTransactionQuery);
- die;
- }
-
+ #If this transaction has now been mirrored to all mirror hosts
+ #then it can be deleted.
+ $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='
+ . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction'
+ . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
+ . ' dbmirror_MirrorHost)';
+
+ $deleteResult = $masterConn->exec($deleteTransactionQuery);
+ if($deleteResult->resultStatus!=PGRES_COMMAND_OK) {
+ logErrorMessage($masterConn->errorMessage . "\n" .
+ $deleteTransactionQuery);
+ die;
}
+
+
}
return %valuesHash;
}
+
+
+sub openTransactionFile($$)
+{
+ my $slaveInfo = shift;
+ my $XID =shift;
+# my $now_str = localtime;
+ my $nowsec;
+ my $nowmin;
+ my $nowhour;
+ my $nowmday;
+ my $nowmon;
+ my $nowyear;
+ my $nowwday;
+ my $nowyday;
+ my $nowisdst;
+ ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) =
+ localtime;
+ my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'},
+ $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin,
+ $nowsec,$XID);
+
+ my $xfile;
+ open($xfile,$fileName) or die "Can't open $fileName : $!";
+
+ $slaveInfo->{'TransactionFile'} = $xfile;
+ $slaveInfo->{'status'} = 'FileOpen';
+}
+
+
+
+sub openSlaveConnection($) {
+ my $slavePtr = $_[0];
+ my $slaveConn;
+
+
+ my $slaveConnString;
+ if(defined($slavePtr->{"slaveHost"}))
+ {
+ $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " ";
+ }
+ if(defined($slavePtr->{"slavePort"}))
+ {
+ $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " ";
+ }
+
+ $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
+ $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
+ $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
+
+ $slaveConn = Pg::connectdb($slaveConnString);
+
+ if($slaveConn->status != PGRES_CONNECTION_OK) {
+ my $errorMessage = "Can't connect to slave database " ;
+ $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
+ $errorMessage .= $slaveConn->errorMessage;
+ logErrorMessage($errorMessage);
+ $slavePtr->{"status"} = 'DBFailed';
+ }
+ else {
+ $slavePtr->{"slaveConn"} = $slaveConn;
+ $slavePtr->{"status"} = 'DBOpen';
+ }
+
+
+}
+BEGIN;
+
+SET autocommit TO 'on';
CREATE FUNCTION "recordchange" () RETURNS trigger AS
-'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C';
+'$libdir/pending.so', 'recordchange' LANGUAGE 'C';
+
-CREATE TABLE "MirrorHost" (
-"MirrorHostId" serial,
-"HostName" varchar NOT NULL,
-PRIMARY KEY("MirrorHostId")
-);
+CREATE TABLE dbmirror_MirrorHost (
+MirrorHostId serial not null,
+SlaveName varchar NOT NULL,
+PRIMARY KEY(MirrorHostId)
+);
-CREATE TABLE "Pending" (
-"SeqId" serial,
-"TableName" varchar NOT NULL,
-"Op" character,
-"XID" int4 NOT NULL,
-PRIMARY KEY ("SeqId")
+CREATE TABLE dbmirror_Pending (
+SeqId serial,
+TableName Name NOT NULL,
+Op character,
+XID int4 NOT NULL,
+PRIMARY KEY (SeqId)
);
-CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID");
+CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID);
-CREATE TABLE "PendingData" (
-"SeqId" int4 NOT NULL,
-"IsKey" bool NOT NULL,
-"Data" varchar,
-PRIMARY KEY ("SeqId", "IsKey") ,
-FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE
+CREATE TABLE dbmirror_PendingData (
+SeqId int4 NOT NULL,
+IsKey bool NOT NULL,
+Data varchar,
+PRIMARY KEY (SeqId, IsKey) ,
+FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE ON DELETE CASCADE
);
-CREATE TABLE "MirroredTransaction" (
-"XID" int4 NOT NULL,
-"LastSeqId" int4 NOT NULL,
-"MirrorHostId" int4 NOT NULL,
-PRIMARY KEY ("XID","MirrorHostId"),
-FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE,
-FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE
+CREATE TABLE dbmirror_MirroredTransaction (
+XID int4 NOT NULL,
+LastSeqId int4 NOT NULL,
+MirrorHostId int4 NOT NULL,
+PRIMARY KEY (XID,MirrorHostId),
+FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE,
+FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE
CASCADE ON DELETE CASCADE
);
+
+
+UPDATE pg_proc SET proname='nextval_pg' WHERE proname='nextval';
+
+CREATE FUNCTION pg_catalog.nextval(text) RETURNS int8 AS
+'/usr/local/postgresql-7.4/lib/pending.so', 'nextval' LANGUAGE 'C' STRICT;
+
+
+UPDATE pg_proc set proname='setval_pg' WHERE proname='setval';
+
+CREATE FUNCTION pg_catalog.setval(text,int4) RETURNS int8 AS
+'/usr/local/postgresql-7.4/lib/pending.so', 'setval' LANGUAGE 'C' STRICT;
+
+COMMIT;
\ No newline at end of file
database Written and maintained by Steven Singer(ssinger@navtechinc.com)
-(c) 2001-2002 Navtech Systems Support Inc.
+(c) 2001-2004 Navtech Systems Support Inc.
ALL RIGHTS RESERVED
Permission to use, copy, modify, and distribute this software and its
Requirments:
---------------------------------
-PostgreSQL-7.4 (Older versions are no longer supported)
--Perl 5.6(Other versions might work)
+-Perl 5.6 or 5.8 (Other versions might work)
-PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php)
You should now have a file named pending.so that contains the trigger.
-Install this file in /usr/local/pgsql/lib (or another suitable location).
+Install this file in your Postgresql lib directory (/usr/local/pgsql/lib)
-If you choose a different location the MirrorSetup.sql script will need
-to be modified to reflect your new location. The CREATE FUNCTION command
-in the MirrorSetup.sql script associates the trigger function with the
-pending.so shared library. Modify the arguments to this command if you
-choose to install the trigger elsewhere.
2) Run MirrorSetup.sql
This includes
-Telling PostgreSQL about the "recordchange" trigger function.
--Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables
+-Creating the dbmirror_Pending,dbmirror_PendingData,dbmirror_MirrorHost,
+dbmirror_MirroredTransaction tables
To execute the script use psql as follows
The master settings refer to the master database(The one that is
being mirrored).
-The slave settings refer to the database that the data is being mirrored to.
-The slaveHost parameter must refer to the machine name of the slave (Either
-a resolvable hostname or an IP address). The value for slave host
-must match the Hostname field in the MirrorHost table(See step 6).
+The slave settings refer to the database that the data is being
+mirrored to.
-The master user must have sufficient permissions to modify the Pending
-tables and to read all of the tables being mirrored.
+The slaveName setting in the configuration file must match the slave
+name specified in the dbmirror_MirrorHost table.
+
+DBMirror.pl can be run in two modes of operation:
+
+ A) It can connect directly to the slave database. To do this specify
+ a slave database name and optional host and port along with a username
+ and password. See slaveDatabase.conf for details.
+
+
+ The master user must have sufficient permissions to modify the Pending
+ tables and to read all of the tables being mirrored.
+
+ The slave user must have enough permissions on the slave database to
+ modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
+ mirrored.
+
+ B) The SQL statements that should be executed on the slave can be
+ written to files which can then be executed slave database through
+ psql. This would be suitable for setups where their is no direct
+ connection between the slave database and the master. A file is
+ generated for each transaction in the directory specified by
+ TransactionFileDirectory. The file name contains the date/time the
+ file was created along with the transaction id.
-The slave user must have enough permissions on the slave database to
-modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
-mirrored.
4) Add the trigger to tables.
slaveHost variable for that slave in the configuration file.
For example
-INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com');
+INSERT INTO "MirrorHost" ("SlaveName") VALUES ('backup_system');
6) Start DBMirror.pl
the configuration file.
DBMirror can be run from the master, the slave, or a third machine as long
-as it is able to access both the master and slave databases.
+as it is able to access both the master and slave databases(not
+required if SQL files are being generated)
7) Periodically run clean_pending.pl
clean_pending.pl cleans out any entries from the Pending tables that
----------
-Support for selective mirroring based on the content of data.
-Support for BLOB's.
--Support for conflict resolution.
--Batching SQL commands in DBMirror for better performance over WAN's.
+-Support for multi-master mirroring with conflict resolution.
-Better support for dealing with Schema changes.
+
+Significant Changes Since 7.4
+----------------
+-Support for mirroring SEQUENCE's
+-Support for unix domain sockets
+-Support for outputting slave SQL statements to a file
+-Changed the names of replication tables are now named
+dbmirror_pending etc..
+
+
+
+Credits
+-----------
+Achilleus Mantzios <achill@matrix.gatewaynet.com>
+
+
+
+
Steven Singer
Navtech Systems Support Inc.
ssinger@navtechinc.com
/****************************************************************************
* pending.c
- * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $
+ * $Id: pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
*
* This file contains a trigger for Postgresql-7.x to record changes to tables
* to a pending table for mirroring.
#include <executor/spi.h>
#include <commands/trigger.h>
#include <utils/lsyscache.h>
+#include <utils/array.h>
+
enum FieldUsage
{
PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
};
int storePending(char *cpTableName, HeapTuple tBeforeTuple,
- HeapTuple tAfterTuple,
- TupleDesc tTupdesc,
- TriggerData *tpTrigdata, char cOp);
+ HeapTuple tAfterTuple,
+ TupleDesc tTupdesc,
+ Oid tableOid,
+ char cOp);
+
+
int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
- TriggerData *tpTrigdata);
-int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
- TriggerData *tpTrigData, int iIncludeKeyData);
+ Oid tableOid);
+int storeData(char *cpTableName, HeapTuple tTupleData,
+ TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData);
int2vector *getPrimaryKey(Oid tblOid);
-char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
- TriggerData *tTrigData,
+char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
enum FieldUsage eKeyUsage);
+
#define BUFFER_SIZE 256
#define MAX_OID_LEN 10
-/*#define DEBUG_OUTPUT 1 */
+#define DEBUG_OUTPUT 1
extern Datum recordchange(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(recordchange);
+#if defined DEBUG_OUTPUT
+#define debug_msg2(x,y) elog(NOTICE,x,y)
+#define debug_msg(x) elog(NOTICE,x)
+#define debug_msg3(x,y,z) elog(NOTICE,x,y,z)
+#else
+#define debug_msg2(x,y)
+#define debug_msg(x)
+#define debug_msg(x,y,z)
+
+#endif
+
+
+
+extern Datum nextval(PG_FUNCTION_ARGS);
+extern Datum setval(PG_FUNCTION_ARGS);
+
+int saveSequenceUpdate(const text * sequenceName,
+ int nextSequenceValue);
+
+
/*****************************************************************************
* The entry point for the trigger function.
* The Trigger takes a single SQL 'text' argument indicating the name of the
char op = 0;
char *schemaname;
char *fullyqualtblname;
+ char *pkxpress=NULL;
if (fcinfo->context != NULL)
{
if (SPI_connect() < 0)
{
- elog(NOTICE, "storePending could not connect to SPI");
+ ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("dbmirror:recordchange could not connect to SPI")));
return -1;
}
trigdata = (TriggerData *) fcinfo->context;
beforeTuple = trigdata->tg_trigtuple;
op = 'd';
}
+ else
+ {
+ ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
+ errmsg("dbmirror:recordchange Unknown operation")));
+
+ }
- if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op))
+ if (storePending(fullyqualtblname, beforeTuple, afterTuple,
+ tupdesc, retTuple->t_tableOid, op))
{
/* An error occoured. Skip the operation. */
ereport(ERROR,
return PointerGetDatum(NULL);
}
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "returning on success");
-#endif
+ debug_msg("dbmirror:recordchange returning on success");
+
SPI_pfree(fullyqualtblname);
+ if(pkxpress != NULL)
+ SPI_pfree(pkxpress);
SPI_finish();
return PointerGetDatum(retTuple);
}
storePending(char *cpTableName, HeapTuple tBeforeTuple,
HeapTuple tAfterTuple,
TupleDesc tTupDesc,
- TriggerData *tpTrigData, char cOp)
+ Oid tableOid,
+ char cOp)
{
- char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
+ char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)";
int iResult = 0;
HeapTuple tCurTuple;
+ char nulls[3]=" ";
/* Points the current tuple(before or after) */
- Datum saPlanData[4];
- Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID};
+ Datum saPlanData[3];
+ Oid taPlanArgTypes[4] = {NAMEOID,
+ CHAROID,
+ INT4OID};
void *vpPlan;
tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
-
vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes);
if (vpPlan == NULL)
- elog(NOTICE, "error creating plan");
- /* SPI_saveplan(vpPlan); */
+ ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
+ errmsg("dbmirror:storePending error creating plan")));
+
saPlanData[0] = PointerGetDatum(cpTableName);
saPlanData[1] = CharGetDatum(cOp);
saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
-
- iResult = SPI_execp(vpPlan, saPlanData, NULL, 1);
+ iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
if (iResult < 0)
- elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);
+ elog(NOTICE, "storedPending fired (%s) returned %d",
+ cpQueryBase, iResult);
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "row successfully stored in pending table");
-#endif
+
+ debug_msg("dbmirror:storePending row successfully stored in pending table");
+
if (cOp == 'd')
{
* This is a record of a delete operation.
* Just store the key data.
*/
- iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData);
+ iResult = storeKeyInfo(cpTableName,
+ tBeforeTuple, tTupDesc, tableOid);
}
else if (cOp == 'i')
{
* An Insert operation.
* Store all data
*/
- iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE);
+ iResult = storeData(cpTableName, tAfterTuple,
+ tTupDesc, tableOid,TRUE);
}
else
{
/* op must be an update. */
- iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData);
- iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,
- tpTrigData, TRUE);
+ iResult = storeKeyInfo(cpTableName, tBeforeTuple,
+ tTupDesc, tableOid);
+ iResult = iResult ? iResult :
+ storeData(cpTableName, tAfterTuple, tTupDesc,
+ tableOid,TRUE);
}
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "done storing keyinfo");
-#endif
+
+ debug_msg("dbmirror:storePending done storing keyinfo");
return iResult;
int
storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
- TupleDesc tTupleDesc,
- TriggerData *tpTrigData)
+ TupleDesc tTupleDesc, Oid tableOid)
{
Oid saPlanArgTypes[1] = {NAMEOID};
- char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
+ char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
void *pplan;
Datum saPlanData[1];
char *cpKeyData;
}
/* pplan = SPI_saveplan(pplan); */
- cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY);
+ cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY);
if (cpKeyData == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("there is no PRIMARY KEY for table %s",
cpTableName)));
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "key data: %s", cpKeyData);
-#endif
+
+ debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData);
+
saPlanData[0] = PointerGetDatum(cpKeyData);
iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
if (iRetCode != SPI_OK_INSERT)
{
- elog(NOTICE, "error inserting row in pendingDelete");
+ ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION)
+ ,errmsg("error inserting row in pendingDelete")));
return -1;
}
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "insert successful");
-#endif
+
+ debug_msg("insert successful");
return 0;
* Stores a copy of the non-key data for the row.
*****************************************************************************/
int
-storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
- TriggerData *tpTrigData, int iIncludeKeyData)
+storeData(char *cpTableName, HeapTuple tTupleData,
+ TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
{
Oid planArgTypes[1] = {NAMEOID};
- char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
+ char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
void *pplan;
Datum planData[1];
char *cpKeyData;
/* pplan = SPI_saveplan(pplan); */
if (iIncludeKeyData == 0)
- cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY);
+ cpKeyData = packageData(tTupleData, tTupleDesc,
+ tableOid, NONPRIMARY);
else
- cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL);
+ cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL);
planData[0] = PointerGetDatum(cpKeyData);
iRetValue = SPI_execp(pplan, planData, NULL, 1);
elog(NOTICE, "error inserting row in pendingDelete");
return -1;
}
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "insert successful");
-#endif
+
+ debug_msg("dbmirror:storeKeyData insert successful");
+
return 0;
* ALL implies include all fields.
*/
char *
-packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
- TriggerData *tpTrigData,
+packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
enum FieldUsage eKeyUsage)
{
int iNumCols;
if (eKeyUsage != ALL)
{
- tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
+ tpPKeys = getPrimaryKey(tableOid);
if (tpPKeys == NULL)
return NULL;
}
-#if defined DEBUG_OUTPUT
+
if (tpPKeys != NULL)
- elog(NOTICE, "have primary keys");
-#endif
+ {
+ debug_msg("dbmirror:packageData have primary keys");
+
+ }
+
cpDataBlock = SPI_palloc(BUFFER_SIZE);
iDataBlockSize = BUFFER_SIZE;
iUsedDataBlock = 0; /* To account for the null */
{
/* Determine if this is a primary key or not. */
iIsPrimaryKey = 0;
- for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0;
+ for (iPrimaryKeyIndex = 0;
+ (*tpPKeys)[iPrimaryKeyIndex] != 0;
iPrimaryKeyIndex++)
{
- if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter)
+ if ((*tpPKeys)[iPrimaryKeyIndex]
+ == iColumnCounter)
{
iIsPrimaryKey = 1;
break;
}
}
- if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
+ if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) :
+ (eKeyUsage != NONPRIMARY))
{
/**
* Don't use.
*/
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "skipping column");
-#endif
+
+ debug_msg("dbmirror:packageData skipping column");
+
continue;
}
} /* KeyUsage!=ALL */
-#ifndef NODROPCOLUMN
- if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
- {
- /**
- * This column has been dropped.
- * Do not mirror it.
- */
- continue;
- }
-#endif
- cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
- [iColumnCounter - 1]->attname));
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "field name: %s", cpFieldName);
-#endif
- while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
+
+ if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
+ {
+ /**
+ * This column has been dropped.
+ * Do not mirror it.
+ */
+ continue;
+ }
+
+ cpFieldName = DatumGetPointer(NameGetDatum
+
+ (&tTupleDesc->attrs
+ [iColumnCounter - 1]->attname));
+
+ debug_msg2("dbmirror:packageData field name: %s", cpFieldName);
+
+ while (iDataBlockSize - iUsedDataBlock <
+ strlen(cpFieldName) + 6)
{
- cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+ cpDataBlock = SPI_repalloc(cpDataBlock,
+ iDataBlockSize +
+ BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
}
sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
- cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);
+ cpFieldData = SPI_getvalue(tTupleData, tTupleDesc,
+ iColumnCounter);
cpUnFormatedPtr = cpFieldData;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
continue;
}
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "field data: \"%s\"", cpFieldData);
- elog(NOTICE, "starting format loop");
-#endif
+ debug_msg2("dbmirror:packageData field data: \"%s\"",
+ cpFieldData);
+ debug_msg("dbmirror:packageData starting format loop");
+
while (*cpUnFormatedPtr != 0)
{
while (iDataBlockSize - iUsedDataBlock < 2)
{
- cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+ cpDataBlock = SPI_repalloc(cpDataBlock,
+ iDataBlockSize
+ + BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
while (iDataBlockSize - iUsedDataBlock < 3)
{
- cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+ cpDataBlock = SPI_repalloc(cpDataBlock,
+ iDataBlockSize +
+ BUFFER_SIZE);
iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
cpFormatedPtr = cpDataBlock + iUsedDataBlock;
}
sprintf(cpFormatedPtr, "' ");
iUsedDataBlock = iUsedDataBlock + 2;
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "data block: \"%s\"", cpDataBlock);
-#endif
+
+ debug_msg2("dbmirror:packageData data block: \"%s\"",
+ cpDataBlock);
} /* for iColumnCounter */
if (tpPKeys != NULL)
SPI_pfree(tpPKeys);
-#if defined DEBUG_OUTPUT
- elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize,
- iUsedDataBlock);
-#endif
+
+ debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d",
+ iDataBlockSize,
+ iUsedDataBlock);
+
memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);
return cpDataBlock;
}
+
+
+PG_FUNCTION_INFO_V1(setval);
+
+Datum setval(PG_FUNCTION_ARGS)
+{
+
+
+ text * sequenceName;
+
+ Oid setvalArgTypes[2] = {TEXTOID,INT4OID};
+ int nextValue;
+ void * setvalPlan=NULL;
+ Datum setvalData[2];
+ const char * setvalQuery = "SELECT setval_pg($1,$2)";
+ int ret;
+
+ sequenceName = PG_GETARG_TEXT_P(0);
+ nextValue = PG_GETARG_INT32(1);
+
+ setvalData[0] = PointerGetDatum(sequenceName);
+ setvalData[1] = Int32GetDatum(nextValue);
+
+ if (SPI_connect() < 0)
+ {
+ ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("dbmirror:setval could not connect to SPI")));
+ return -1;
+ }
+
+ setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes);
+ if(setvalPlan == NULL)
+ {
+ ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("dbmirror:setval could not prepare plan")));
+ return -1;
+ }
+
+ ret = SPI_execp(setvalPlan,setvalData,NULL,1);
+
+ if(ret != SPI_OK_SELECT || SPI_processed != 1)
+ return -1;
+
+ debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue);
+
+ ret = saveSequenceUpdate(sequenceName,nextValue);
+
+ SPI_pfree(setvalPlan);
+
+ SPI_finish();
+ debug_msg("dbmirror:setval about to return");
+ return Int64GetDatum(nextValue);
+
+}
+
+
+
+PG_FUNCTION_INFO_V1(nextval);
+
+Datum
+nextval(PG_FUNCTION_ARGS)
+{
+ text * sequenceName;
+
+ const char * nextvalQuery = "SELECT nextval_pg($1)";
+ Oid nextvalArgTypes[1] = {TEXTOID};
+ void * nextvalPlan=NULL;
+ Datum nextvalData[1];
+
+
+ int ret;
+ HeapTuple resTuple;
+ char isNull;
+ int nextSequenceValue;
+
+
+
+ debug_msg("dbmirror:nextval Starting pending.so:nextval");
+
+
+ sequenceName = PG_GETARG_TEXT_P(0);
+
+ if (SPI_connect() < 0)
+ {
+ ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("dbmirror:nextval could not connect to SPI")));
+ return -1;
+ }
+
+ nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes);
+
+
+ debug_msg("prepared plan to call nextval_pg");
+
+
+ if(nextvalPlan==NULL)
+ {
+ ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+ errmsg("dbmirror:nextval error creating plan")));
+ return -1;
+ }
+ nextvalData[0] = PointerGetDatum(sequenceName);
+
+ ret = SPI_execp(nextvalPlan,nextvalData,NULL,1);
+
+ debug_msg("dbmirror:Executed call to nextval_pg");
+
+
+ if(ret != SPI_OK_SELECT || SPI_processed != 1)
+ return -1;
+
+ resTuple = SPI_tuptable->vals[0];
+
+ debug_msg("dbmirror:nextval Set resTuple");
+
+ nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple,
+ SPI_tuptable->tupdesc,
+ 1,&isNull)));
+
+
+
+ debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue);
+
+
+ saveSequenceUpdate(sequenceName,nextSequenceValue);
+ SPI_pfree(resTuple);
+ SPI_pfree(nextvalPlan);
+
+ SPI_finish();
+
+ return Int64GetDatum(nextSequenceValue);
+}
+
+
+int
+saveSequenceUpdate(const text * sequenceName,
+ int nextSequenceVal)
+{
+
+ Oid insertArgTypes[2] = {TEXTOID,INT4OID};
+ Oid insertDataArgTypes[1] = {NAMEOID};
+ void * insertPlan=NULL;
+ void * insertDataPlan=NULL;
+ Datum insertDatum[2];
+ Datum insertDataDatum[1];
+ char nextSequenceText[32];
+
+ const char * insertQuery =
+ "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \
+ "($1,'s',$2)";
+ const char * insertDataQuery =
+ "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \
+ "(currval('dbmirror_pending_seqid_seq'),'t',$1)";
+
+ int ret;
+
+
+ insertPlan = SPI_prepare(insertQuery,2,insertArgTypes);
+ insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes);
+
+ debug_msg("Prepared insert query");
+
+
+ if(insertPlan == NULL || insertDataPlan == NULL)
+ {
+ ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan")));
+ }
+
+ insertDatum[1] = Int32GetDatum(GetCurrentTransactionId());
+ insertDatum[0] = PointerGetDatum(sequenceName);
+
+ sprintf(nextSequenceText,"%d",nextSequenceVal);
+ insertDataDatum[0] = PointerGetDatum(nextSequenceText);
+ debug_msg2("dbmirror:savesequenceupdate: Setting value %s",
+ nextSequenceText);
+
+ debug_msg("dbmirror:About to execute insert query");
+
+ ret = SPI_execp(insertPlan,insertDatum,NULL,1);
+
+ ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1);
+
+ debug_msg("dbmirror:Insert query finished");
+ SPI_pfree(insertPlan);
+ SPI_pfree(insertDataPlan);
+
+ return ret;
+
+}
+