]> granicus.if.org Git - postgresql/commitdiff
Please apply this patch to contrib/dbmirror
authorBruce Momjian <bruce@momjian.us>
Tue, 17 Feb 2004 03:34:35 +0000 (03:34 +0000)
committerBruce Momjian <bruce@momjian.us>
Tue, 17 Feb 2004 03:34:35 +0000 (03:34 +0000)
In incorperates changes from myself and a number of contributors.

This update to dbmirror provides:

-replication of sequence operations via setval/nextval
-DBMirror.pl support for logging to syslog
-changed the names of the tables to dbmirror_*  (no quotes required)
-Support for writitng SQL statements to files instead of directly to
 a slave database
-More options for DBMirror.pl in the config files.

Steven Singer

contrib/dbmirror/DBMirror.pl
contrib/dbmirror/MirrorSetup.sql
contrib/dbmirror/README.dbmirror
contrib/dbmirror/pending.c

index 4a951d0a45d1c8d1826dc9a6fd6d8a95464bc9a2..1eb917bf180a2f49cf6fecb9734e2f78da92b7cb 100755 (executable)
@@ -33,7 +33,7 @@
 # 
 #
 ##############################################################################
-# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.7 2003/11/29 22:39:19 pgsql Exp $ 
+# $PostgreSQL: pgsql/contrib/dbmirror/DBMirror.pl,v 1.8 2004/02/17 03:34:35 momjian Exp $ 
 #
 ##############################################################################
 
@@ -79,17 +79,17 @@ sub mirrorCommand($$$$$$);
 sub mirrorInsert($$$$$);
 sub mirrorDelete($$$$$);
 sub mirrorUpdate($$$$$);
-sub sendQueryToSlaves($$);
 sub logErrorMessage($);
-sub openSlaveConnection($);
+sub setupSlave($);
 sub updateMirrorHostTable($$);
-                       sub extractData($$);
+sub extractData($$);
 local $::masterHost;
 local $::masterDb; 
 local $::masterUser; 
 local $::masterPassword; 
 local $::errorThreshold=5;
 local $::errorEmailAddr=undef;
+local $::sleepInterval=60;
 
 my %slaveInfoHash;
 local $::slaveInfo = \%slaveInfoHash;
@@ -115,8 +115,25 @@ sub Main() {
     die;
   }
   
-  
-  my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+  if (defined($::syslog))
+  {
+      # log with syslog
+      require Sys::Syslog; 
+      import Sys::Syslog qw(openlog syslog);
+      openlog($0, 'cons,pid', 'user');
+      syslog("info", '%s', "starting $0 script with $ARGV[0]");
+  }
+
+  my $connectString;
+  if(defined($::masterHost))
+  {
+      $connectString .= "host=$::masterHost ";
+  }
+  if(defined($::masterPort))
+  {
+      $connectString .= "port=$::masterPort ";
+  }
+  $connectString .= "dbname=$::masterDb user=$::masterUser password=$::masterPassword";
   
   $masterConn = Pg::connectdb($connectString);
   
@@ -138,33 +155,29 @@ sub Main() {
   my $firstTime = 1;
   while(1) {
     if($firstTime == 0) {
-      sleep 60
+      sleep $::sleepInterval
     } 
     $firstTime = 0;
-# Open up the connection to the slave.
-    if(! defined $::slaveInfo->{"status"} ||
-       $::slaveInfo->{"status"} == -1) {
-      openSlaveConnection($::slaveInfo);           
-    }
-    
     
+    setupSlave($::slaveInfo);
+   
    
-    sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
-    sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
     
     
     #Obtain a list of pending transactions using ordering by our approximation
     #to the commit time.  The commit time approximation is taken to be the
     #SeqId of the last row edit in the transaction.
-    my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd";
-    $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN";
-    $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = ";
-    $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"=";
-    $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; 
-    $pendingTransQuery .= " ON pd.\"XID\"";
-    $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null  ";
-    $pendingTransQuery .= " GROUP BY pd.\"XID\" ";
-    $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")";
+    my $pendingTransQuery = "SELECT pd.XID,MAX(SeqId) FROM dbmirror_Pending pd";
+    $pendingTransQuery .= " LEFT JOIN dbmirror_MirroredTransaction mt INNER JOIN";
+    $pendingTransQuery .= " dbmirror_MirrorHost mh ON mt.MirrorHostId = ";
+    $pendingTransQuery .= " mh.MirrorHostId AND mh.SlaveName=";
+    $pendingTransQuery .= " '$::slaveInfo->{\"slaveName\"}' "; 
+    $pendingTransQuery .= " ON pd.XID";
+    $pendingTransQuery .= " = mt.XID WHERE mt.XID is null ";
+   
+
+    $pendingTransQuery .= " GROUP BY pd.XID";
+    $pendingTransQuery .= " ORDER BY MAX(pd.SeqId)";
     
     
     my $pendingTransResults = $masterConn->exec($pendingTransQuery);
@@ -185,13 +198,21 @@ sub Main() {
       my $XID = $pendingTransResults->getvalue($curTransTuple,0);
       my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
       my $seqId;
-      
-      my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\",";
-      $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" ";
-      $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata ";
-      $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND ";
-      
-      $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC";
+
+     
+      if($::slaveInfo->{'status'} eq 'FileClosed')
+      {
+         openTransactionFile($::slaveInfo,$XID);
+      }
+
+
+      my $pendingQuery = "SELECT pnd.SeqId,pnd.TableName,";
+      $pendingQuery .= " pnd.Op,pnddata.IsKey, pnddata.Data AS Data ";
+      $pendingQuery .= " FROM dbmirror_Pending pnd, dbmirror_PendingData pnddata ";
+      $pendingQuery .= " WHERE pnd.SeqId = pnddata.SeqId ";
+     
+      $pendingQuery .= " AND pnd.XID=$XID ORDER BY SeqId, IsKey DESC";
       
       
       my $pendingResults = $masterConn->exec($pendingQuery);
@@ -200,40 +221,47 @@ sub Main() {
        die;
       }
       
-           
+      sendQueryToSlaves($XID,"BEGIN");
            
       my $numPending = $pendingResults->ntuples;
       my $curTuple = 0;
-      sendQueryToSlaves(undef,"BEGIN");
       while ($curTuple < $numPending) {
        $seqId = $pendingResults->getvalue($curTuple,0);
        my $tableName = $pendingResults->getvalue($curTuple,1);
        my $op = $pendingResults->getvalue($curTuple,2);
-       
        $curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
                                  $pendingResults,$curTuple) +1;
-       if($::slaveInfo->{"status"}==-1) {
-           last;
-       }
        
       }
-      #Now commit the transaction.
-      if($::slaveInfo->{"status"}==-1) {
+
+      if($::slaveInfo->{'status'} ne 'DBOpen' &&
+        $::slaveInfo->{'status'} ne 'FileOpen')
+      {
          last;
       }
       sendQueryToSlaves(undef,"COMMIT");
+      #Now commit the transaction.
       updateMirrorHostTable($XID,$seqId);
-      if($commandCount > 5000) {
-       $commandCount = 0;
-       $::slaveInfo->{"status"} = -1;
-       $::slaveInfo->{"slaveConn"}->reset;
-       #Open the connection right away.
-       openSlaveConnection($::slaveInfo);
-       
-      }
       
       $pendingResults = undef;
       $curTransTuple = $curTransTuple +1;
+
+      if($::slaveInfo->{'status'} eq 'FileOpen')
+      {
+         close ($::slaveInfo->{'TransactionFile'});
+      }
+      elsif($::slaveInfo->{'status'} eq 'DBOpen')
+      {
+         if($commandCount > 5000) {
+             $commandCount = 0;
+             $::slaveInfo->{"status"} = 'DBClosed';
+             $::slaveInfo->{"slaveConn"}->reset;
+             #Open the connection right away.
+             openSlaveConnection($::slaveInfo);
+             
+         }
+      }
+
     }#while transactions left.
        
        $pendingTransResults = undef;
@@ -303,6 +331,7 @@ sub mirrorCommand($$$$$$) {
     my $pendingResults = $_[4];
     my $currentTuple = $_[5];
 
+
     if($op eq 'i') {
       $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
                               ,$currentTuple);
@@ -315,6 +344,10 @@ sub mirrorCommand($$$$$$) {
       $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
                   $currentTuple);
     }
+    if($op eq 's')  {
+       $currentTuple = mirrorSequence($seqId,$tableName,$transId,$pendingResults,
+                                      $currentTuple);
+    }
     $commandCount = $commandCount +1;
     if($commandCount % 100 == 0) {
     #  print "Sent 100 commmands on SeqId $seqId \n";
@@ -411,7 +444,7 @@ sub mirrorInsert($$$$$) {
        $firstIteration=0;
     }
     $valuesQuery .= ")";
-    sendQueryToSlaves(undef,$insertQuery . $valuesQuery);
+    sendQueryToSlaves($transId,$insertQuery . $valuesQuery);
     return $currentTuple;
 }
 
@@ -491,7 +524,6 @@ sub mirrorDelete($$$$$) {
       $counter++;
       $firstField=0;
     }
-    
     sendQueryToSlaves($transId,$deleteQuery);
     return $currentTuple;
 }
@@ -554,14 +586,12 @@ sub mirrorUpdate($$$$$) {
     my $transId = $_[2];
     my $pendingResult = $_[3];
     my $currentTuple = $_[4];
-
+  
     my $counter;
     my $quotedValue;
     my $updateQuery = "UPDATE $tableName SET ";
     my $currentField;
 
-
-
     my %keyValueHash;
     my %dataValueHash;
     my $firstIteration=1;
@@ -615,12 +645,27 @@ sub mirrorUpdate($$$$$) {
       }
       $firstIteration=0;
     }
-    
     sendQueryToSlaves($transId,$updateQuery);
     return $currentTuple+1;
 }
 
 
+sub mirrorSequence($$$$$) {
+    my $seqId = $_[0];
+    my $sequenceName = $_[1];
+    my $transId = $_[2];
+    my $pendingResult = $_[3];
+    my $currentTuple = $_[4];
+
+    my $query;
+    my $sequenceValue = $pendingResult->getvalue($currentTuple,4);
+    $query = sprintf("select setval(%s,%s)",$sequenceName,$sequenceValue);
+
+    sendQueryToSlaves($transId,$query);
+    return $currentTuple;
+
+}
 
 =item sendQueryToSlaves(seqId,sqlQuery)
 
@@ -647,7 +692,7 @@ sub sendQueryToSlaves($$) {
     my $seqId = $_[0];
     my  $sqlQuery = $_[1];
        
-   if($::slaveInfo->{"status"} == 0) {
+   if($::slaveInfo->{"status"} eq 'DBOpen') {
        my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
        unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
           my $errorMessage;
@@ -660,10 +705,18 @@ sub sendQueryToSlaves($$) {
           $::slaveInfo->{"status"} = -1;
        }
    }
+    elsif($::slaveInfo->{"status"} eq 'FileOpen' ) {
+       my $xfile = $::slaveInfo->{'TransactionFile'};
+       print $xfile  $sqlQuery . ";\n";
+    }
+    
+    
 
 }
 
 
+
+
 =item logErrorMessage(error)
 
 Mails an error message to the users specified $errorEmailAddr
@@ -707,41 +760,30 @@ sub logErrorMessage($) {
       print mailPipe "\n\n\n=================================================\n";
       close mailPipe;
     }
+
+    if (defined($::syslog))
+    {
+       syslog('err', '%s (%m)', $error);
+    }
+
     warn($error);    
     
     $lastErrorMsg = $error;
 
 }
 
-sub openSlaveConnection($) {
+sub setupSlave($) {
     my $slavePtr = $_[0];
-    my $slaveConn;
     
     
-    my $slaveConnString = "host=" . $slavePtr->{"slaveHost"};    
-    $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
-    $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
-    $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
-    
-    $slaveConn = Pg::connectdb($slaveConnString);
-    
-    if($slaveConn->status != PGRES_CONNECTION_OK) {
-       my $errorMessage = "Can't connect to slave database " ;
-       $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
-       $errorMessage .= $slaveConn->errorMessage;
-       logErrorMessage($errorMessage);    
-       $slavePtr->{"status"} = -1;
-    }
-    else {
-       $slavePtr->{"slaveConn"} = $slaveConn;
        $slavePtr->{"status"} = 0;
        #Determine the MirrorHostId for the slave from the master's database
-       my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM '
-                                         . ' "MirrorHost" WHERE "HostName"'
-                                         . '=\'' . $slavePtr->{"slaveHost"}
+       my $resultSet = $masterConn->exec('SELECT MirrorHostId FROM '
+                                         . ' dbmirror_MirrorHost WHERE SlaveName'
+                                         . '=\'' . $slavePtr->{"slaveName"}
                                          . '\'');
        if($resultSet->ntuples !=1) {
-           my $errorMessage .= $slavePtr->{"slaveHost"} ."\n";
+           my $errorMessage .= $slavePtr->{"slaveName"} ."\n";
            $errorMessage .= "Has no MirrorHost entry on master\n";
            logErrorMessage($errorMessage);
            $slavePtr->{"status"}=-1;
@@ -749,14 +791,24 @@ sub openSlaveConnection($) {
            
        }
        $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
-       
-       
-       
+
+    if(defined($::slaveInfo->{'slaveDb'})) {
+       # We talk directly to a slave database.
+        #
+       if($::slaveInfo->{"status"} ne 'DBOpen')
+       {
+           openSlaveConnection($::slaveInfo);
+       }
+       sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+       sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
+    }
+    else {
+       $::slaveInfo->{"status"} = 'FileClosed';
     }
+       
 
 }
 
-
 =item updateMirrorHostTable(lastTransId,lastSeqId)
 
 Updates the MirroredTransaction table to reflect the fact that
@@ -783,39 +835,40 @@ sub updateMirrorHostTable($$) {
     my $lastTransId = shift;
     my $lastSeqId = shift;
 
-    if($::slaveInfo->{"status"}==0) {
-       my $deleteTransactionQuery;
-       my $deleteResult;
-       my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" ";
-       $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")";
-       $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
-       
-       my $updateResult = $masterConn->exec($updateMasterQuery);
-       unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
-           my $errorMessage = $masterConn->errorMessage . "\n";
-           $errorMessage .= $updateMasterQuery;
-           logErrorMessage($errorMessage);
-           die;
-       }
+
+    
+    my $deleteTransactionQuery;
+    my $deleteResult;
+    my $updateMasterQuery = "INSERT INTO dbmirror_MirroredTransaction ";
+    $updateMasterQuery .= " (XID,LastSeqId,MirrorHostId)";
+    $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
+    
+    my $updateResult = $masterConn->exec($updateMasterQuery);
+    unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
+       my $errorMessage = $masterConn->errorMessage . "\n";
+       $errorMessage .= $updateMasterQuery;
+       logErrorMessage($errorMessage);
+       die;
+    }
 #      print "Updated slaves to transaction $lastTransId\n" ;   
 #        flush STDOUT;  
 
-       #If this transaction has now been mirrored to all mirror hosts
-       #then it can be deleted.
-       $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"='
-           . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"'
-               . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
-                   . ' "MirrorHost")';
-       
-       $deleteResult = $masterConn->exec($deleteTransactionQuery);
-       if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { 
-           logErrorMessage($masterConn->errorMessage . "\n" . 
-                           $deleteTransactionQuery);
-           die;
-       }
-       
+    #If this transaction has now been mirrored to all mirror hosts
+    #then it can be deleted.
+    $deleteTransactionQuery = 'DELETE FROM dbmirror_Pending WHERE XID='
+       . $lastTransId . ' AND (SELECT COUNT(*) FROM dbmirror_MirroredTransaction'
+       . ' WHERE XID=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
+       . ' dbmirror_MirrorHost)';
+    
+    $deleteResult = $masterConn->exec($deleteTransactionQuery);
+    if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { 
+       logErrorMessage($masterConn->errorMessage . "\n" . 
+                       $deleteTransactionQuery);
+       die;
     }
     
+  
+
 }
 
 
@@ -889,3 +942,69 @@ sub extractData($$) {
   return %valuesHash;
     
 }
+
+
+sub openTransactionFile($$)
+{
+    my $slaveInfo = shift;
+    my $XID =shift;
+#      my $now_str = localtime;
+    my $nowsec;
+    my $nowmin;
+    my $nowhour;
+    my $nowmday;
+    my $nowmon;
+    my $nowyear;
+    my $nowwday;
+    my $nowyday;
+    my $nowisdst;
+    ($nowsec,$nowmin,$nowhour,$nowmday,$nowmon,$nowyear,$nowwday,$nowyday,$nowisdst) =
+       localtime;
+    my $fileName=sprintf(">%s/%s_%d-%d-%d_%d:%d:%dXID%d.sql", $::slaveInfo->{'TransactionFileDirectory'},
+                        $::slaveInfo->{"MirrorHostId"},($nowyear+1900),($nowmon+1),$nowmday,$nowhour,$nowmin,
+                        $nowsec,$XID);
+    
+    my $xfile;
+    open($xfile,$fileName) or die "Can't open $fileName : $!";
+    
+    $slaveInfo->{'TransactionFile'} = $xfile;
+    $slaveInfo->{'status'} = 'FileOpen';
+}
+
+
+
+sub openSlaveConnection($) {
+    my $slavePtr = $_[0];
+    my $slaveConn;
+    
+    
+    my $slaveConnString;
+    if(defined($slavePtr->{"slaveHost"}))
+    {
+       $slaveConnString .= "host=" . $slavePtr->{"slaveHost"} . " ";    
+    }
+    if(defined($slavePtr->{"slavePort"}))
+    {
+       $slaveConnString .= "port=" . $slavePtr->{"slavePort"} . " ";
+    }
+
+    $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
+    $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
+    $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
+    
+    $slaveConn = Pg::connectdb($slaveConnString);
+    
+    if($slaveConn->status != PGRES_CONNECTION_OK) {
+       my $errorMessage = "Can't connect to slave database " ;
+       $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
+       $errorMessage .= $slaveConn->errorMessage;
+       logErrorMessage($errorMessage);    
+       $slavePtr->{"status"} = 'DBFailed';
+    }
+    else {
+       $slavePtr->{"slaveConn"} = $slaveConn;
+       $slavePtr->{"status"} = 'DBOpen';       
+    }
+              
+
+}
index 4227ca5f3965e993bced08ba07b85853ae380b31..cae686f929b2d0e9b8bf9309ceaf01f52983aead 100644 (file)
@@ -1,43 +1,61 @@
+BEGIN;
+
+SET autocommit TO 'on';
 
 CREATE FUNCTION "recordchange" () RETURNS trigger AS
-'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C';
+'$libdir/pending.so', 'recordchange' LANGUAGE 'C';
+
 
-CREATE TABLE "MirrorHost" (
-"MirrorHostId" serial,
-"HostName" varchar NOT NULL,
-PRIMARY KEY("MirrorHostId")
-);
 
+CREATE TABLE dbmirror_MirrorHost (
+MirrorHostId serial not null,
+SlaveName varchar NOT NULL,
+PRIMARY KEY(MirrorHostId)
+);
 
 
 
 
-CREATE TABLE "Pending" (
-"SeqId" serial,
-"TableName" varchar NOT NULL,
-"Op" character,
-"XID" int4 NOT NULL,
-PRIMARY KEY ("SeqId")
 
+CREATE TABLE dbmirror_Pending (
+SeqId serial,
+TableName Name NOT NULL,
+Op character,
+XID int4 NOT NULL,
+PRIMARY KEY (SeqId)
 );
 
-CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID");
+CREATE INDEX "dbmirror_Pending_XID_Index" ON dbmirror_Pending (XID);
 
-CREATE TABLE "PendingData" (
-"SeqId" int4 NOT NULL,
-"IsKey" bool NOT NULL,
-"Data" varchar,
-PRIMARY KEY ("SeqId", "IsKey") ,
-FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE  ON DELETE CASCADE
+CREATE TABLE dbmirror_PendingData (
+SeqId int4 NOT NULL,
+IsKey bool NOT NULL,
+Data varchar,
+PRIMARY KEY (SeqId, IsKey) ,
+FOREIGN KEY (SeqId) REFERENCES dbmirror_Pending (SeqId) ON UPDATE CASCADE  ON DELETE CASCADE
 );
 
 
-CREATE TABLE "MirroredTransaction" (
-"XID" int4 NOT NULL,
-"LastSeqId" int4 NOT NULL,
-"MirrorHostId" int4 NOT NULL,
-PRIMARY KEY  ("XID","MirrorHostId"),
-FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE,
-FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId")  ON UPDATE
+CREATE TABLE dbmirror_MirroredTransaction (
+XID int4 NOT NULL,
+LastSeqId int4 NOT NULL,
+MirrorHostId int4 NOT NULL,
+PRIMARY KEY  (XID,MirrorHostId),
+FOREIGN KEY (MirrorHostId) REFERENCES dbmirror_MirrorHost (MirrorHostId) ON UPDATE CASCADE ON DELETE CASCADE,
+FOREIGN KEY (LastSeqId) REFERENCES dbmirror_Pending (SeqId)  ON UPDATE
 CASCADE ON DELETE CASCADE
 );
+
+
+UPDATE pg_proc SET proname='nextval_pg' WHERE proname='nextval';
+
+CREATE FUNCTION pg_catalog.nextval(text) RETURNS int8  AS
+'/usr/local/postgresql-7.4/lib/pending.so', 'nextval' LANGUAGE 'C' STRICT;
+
+
+UPDATE pg_proc set proname='setval_pg' WHERE proname='setval';
+
+CREATE FUNCTION pg_catalog.setval(text,int4) RETURNS int8  AS
+'/usr/local/postgresql-7.4/lib/pending.so', 'setval' LANGUAGE 'C' STRICT;
+
+COMMIT;
\ No newline at end of file
index 993bbb1f9401e617292707fb9bda494bea81aa30..ea38e3b3acd93ca86367384bd4c575946083b006 100644 (file)
@@ -6,7 +6,7 @@ DBMirror is a database mirroring system developed for the PostgreSQL
 database Written and maintained by Steven Singer(ssinger@navtechinc.com)
 
 
-(c) 2001-2002 Navtech Systems Support Inc.
+(c) 2001-2004 Navtech Systems Support Inc.
 ALL RIGHTS RESERVED
 
   Permission to use, copy, modify, and distribute this software and its
@@ -57,7 +57,7 @@ Pending tables.
 Requirments:
 ---------------------------------
 -PostgreSQL-7.4 (Older versions are no longer supported)
--Perl 5.6(Other versions might work)
+-Perl 5.6 or 5.8 (Other versions might work)
 -PgPerl (http://gborg.postgresql.org/project/pgperl/projdisplay.php)
 
 
@@ -81,13 +81,8 @@ PostgreSQL-7.4  Make Instructions:
 
 You should now have a file named pending.so that contains the trigger.
 
-Install this file in /usr/local/pgsql/lib  (or another suitable location).
+Install this file in your Postgresql lib directory (/usr/local/pgsql/lib)
 
-If you choose a different location the MirrorSetup.sql script will need
-to be modified to reflect your new location.   The CREATE FUNCTION command
-in the MirrorSetup.sql script associates the trigger function with the
-pending.so shared library.  Modify the arguments to this command if you
-choose to install the trigger elsewhere.
 
 2) Run MirrorSetup.sql
 
@@ -95,7 +90,8 @@ This file contains SQL commands to setup the Mirroring environment.
 This includes
 
 -Telling PostgreSQL about the "recordchange" trigger function.
--Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables
+-Creating the dbmirror_Pending,dbmirror_PendingData,dbmirror_MirrorHost, 
+dbmirror_MirroredTransaction tables
 
 
 To execute the script use psql as follows 
@@ -114,17 +110,34 @@ DBMirror.pl script.  See slaveDatabase.conf for a sample.
 The master settings refer to the master database(The one that is
 being mirrored).
 
-The slave settings refer to the database that the data is being mirrored to.
-The slaveHost parameter must refer to the machine name of the slave (Either
-a resolvable hostname or an IP address).  The value for slave host
-must match the Hostname field in the MirrorHost table(See step 6).
+The slave settings refer to the database that the data is being
+mirrored to.
 
-The master user must have sufficient permissions to modify the Pending
-tables and to read all of the tables being mirrored.
+The slaveName setting in the configuration file must match the slave
+name specified in the dbmirror_MirrorHost table.
+
+DBMirror.pl can be run in two modes of operation: 
+
+ A) It can connect directly to the slave database.  To do this specify
+ a slave database name and optional host and port along with a username
+ and password.  See slaveDatabase.conf for details.
+
+
+ The master user must have sufficient permissions to modify the Pending
+ tables and to read all of the tables being mirrored.
+
+ The slave user must have enough permissions on the slave database to
+ modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
+ mirrored. 
+
+ B) The SQL statements that should be executed on the slave can be
+ written to files which can then be executed slave database through
+ psql.  This would be suitable for setups where their is no direct
+ connection between the slave database and the master. A file is
+ generated for each transaction in the directory specified by
+ TransactionFileDirectory. The file name contains the date/time the
+ file was created along with the transaction id.
 
-The slave user must have enough permissions on the slave database to
-modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
-mirrored. 
 
 4) Add the trigger to tables.
 
@@ -153,7 +166,7 @@ The name of the host in the MirrorHost table must exactly match the
 slaveHost variable for that slave in the configuration file.
 
 For example
-INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com');
+INSERT INTO "MirrorHost" ("SlaveName") VALUES ('backup_system');
 
 
 6)  Start DBMirror.pl
@@ -171,7 +184,8 @@ Any errors are printed to standard out and emailed to the address specified in
 the configuration file. 
 
 DBMirror can be run from the master, the slave, or a third machine as long
-as it is able to access both the master and slave databases.
+as it is able to access both the master and slave databases(not
+required if SQL files are being generated)
 
 7) Periodically run clean_pending.pl 
 clean_pending.pl cleans out any entries from the Pending tables that
@@ -194,11 +208,28 @@ TODO(Current Limitations)
 ----------
 -Support for selective mirroring based on the content of data.
 -Support for BLOB's.
--Support for conflict resolution.
--Batching SQL commands in DBMirror for better performance over WAN's.
+-Support for multi-master mirroring with conflict resolution.
 -Better support for dealing with Schema changes.
 
 
+
+Significant Changes Since 7.4
+----------------
+-Support for mirroring SEQUENCE's
+-Support for unix domain sockets
+-Support for outputting slave SQL statements to a file
+-Changed the names of replication tables are now named
+dbmirror_pending etc..
+
+
+
+Credits
+-----------
+Achilleus Mantzios <achill@matrix.gatewaynet.com>
+
+
+
+
 Steven Singer
 Navtech Systems Support Inc.
 ssinger@navtechinc.com
index 4703d30f3c83f1457bbb0d59d1dbbc23707e16ee..24fb71b9e2db93e2b8e2c00644c98dbc24eb2942 100644 (file)
@@ -1,6 +1,7 @@
 /****************************************************************************
  * pending.c
- * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $
+ * $Id: pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $
  *
  * This file contains a trigger for Postgresql-7.x to record changes to tables
  * to a pending table for mirroring.
 #include <executor/spi.h>
 #include <commands/trigger.h>
 #include <utils/lsyscache.h>
+#include <utils/array.h>
+
 enum FieldUsage
 {
        PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE
 };
 
 int storePending(char *cpTableName, HeapTuple tBeforeTuple,
-                        HeapTuple tAfterTuple,
-                        TupleDesc tTupdesc,
-                        TriggerData *tpTrigdata, char cOp);
+                HeapTuple tAfterTuple,
+                TupleDesc tTupdesc,
+                Oid tableOid,
+                char cOp);
+
+
 
 int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
-                        TriggerData *tpTrigdata);
-int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
-                 TriggerData *tpTrigData, int iIncludeKeyData);
+                Oid tableOid);
+int storeData(char *cpTableName, HeapTuple tTupleData, 
+             TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData);
 
 int2vector *getPrimaryKey(Oid tblOid);
 
-char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
-                       TriggerData *tTrigData,
+char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid,
                        enum FieldUsage eKeyUsage);
 
+
 #define BUFFER_SIZE 256
 #define MAX_OID_LEN 10
-/*#define DEBUG_OUTPUT 1 */
+#define DEBUG_OUTPUT 1
 extern Datum recordchange(PG_FUNCTION_ARGS);
 
 PG_FUNCTION_INFO_V1(recordchange);
 
 
+#if defined DEBUG_OUTPUT
+#define debug_msg2(x,y) elog(NOTICE,x,y)
+#define debug_msg(x) elog(NOTICE,x)
+#define debug_msg3(x,y,z) elog(NOTICE,x,y,z)
+#else
+#define debug_msg2(x,y) 
+#define debug_msg(x)
+#define debug_msg(x,y,z)
+
+#endif
+
+
+
+extern Datum nextval(PG_FUNCTION_ARGS);
+extern Datum setval(PG_FUNCTION_ARGS);
+
+int saveSequenceUpdate(const text * sequenceName,
+                      int nextSequenceValue);
+
+
 /*****************************************************************************
  * The entry point for the trigger function.
  * The Trigger takes a single SQL 'text' argument indicating the name of the
@@ -81,13 +107,15 @@ recordchange(PG_FUNCTION_ARGS)
        char            op = 0;
        char       *schemaname;
        char       *fullyqualtblname;
+       char *pkxpress=NULL;
 
        if (fcinfo->context != NULL)
        {
 
                if (SPI_connect() < 0)
                {
-                       elog(NOTICE, "storePending could not connect to SPI");
+                 ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE),
+                                              errmsg("dbmirror:recordchange could not connect to SPI")));
                        return -1;
                }
                trigdata = (TriggerData *) fcinfo->context;
@@ -124,8 +152,15 @@ recordchange(PG_FUNCTION_ARGS)
                        beforeTuple = trigdata->tg_trigtuple;
                        op = 'd';
                }
+               else
+               {
+                 ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
+                                       errmsg("dbmirror:recordchange Unknown operation")));
+                                
+               }
 
-               if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op))
+               if (storePending(fullyqualtblname, beforeTuple, afterTuple, 
+                                tupdesc, retTuple->t_tableOid, op))
                {
                        /* An error occoured. Skip the operation. */
                        ereport(ERROR,
@@ -135,10 +170,11 @@ recordchange(PG_FUNCTION_ARGS)
                        return PointerGetDatum(NULL);
 
                }
-#if defined DEBUG_OUTPUT
-               elog(NOTICE, "returning on success");
-#endif
+               debug_msg("dbmirror:recordchange returning on success");
+
                SPI_pfree(fullyqualtblname);
+               if(pkxpress != NULL)
+                 SPI_pfree(pkxpress);
                SPI_finish();
                return PointerGetDatum(retTuple);
        }
@@ -160,41 +196,45 @@ int
 storePending(char *cpTableName, HeapTuple tBeforeTuple,
                         HeapTuple tAfterTuple,
                         TupleDesc tTupDesc,
-                        TriggerData *tpTrigData, char cOp)
+            Oid tableOid,
+                        char cOp)
 {
-       char       *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
+       char       *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)";
 
        int                     iResult = 0;
        HeapTuple       tCurTuple;
+       char nulls[3]="   ";
 
        /* Points the current tuple(before or after) */
-       Datum           saPlanData[4];
-       Oid                     taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID};
+       Datum           saPlanData[3];
+       Oid                     taPlanArgTypes[4] = {NAMEOID, 
+                                                    CHAROID, 
+                                                    INT4OID};
        void       *vpPlan;
 
        tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
 
 
 
-
        vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes);
        if (vpPlan == NULL)
-               elog(NOTICE, "error creating plan");
-       /* SPI_saveplan(vpPlan); */
+         ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION),
+                        errmsg("dbmirror:storePending error creating plan")));
+
 
        saPlanData[0] = PointerGetDatum(cpTableName);
        saPlanData[1] = CharGetDatum(cOp);
        saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
 
-
-       iResult = SPI_execp(vpPlan, saPlanData, NULL, 1);
+       iResult = SPI_execp(vpPlan, saPlanData, nulls, 1);
        if (iResult < 0)
-               elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult);
+               elog(NOTICE, "storedPending fired (%s) returned %d", 
+                    cpQueryBase, iResult);
 
 
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "row successfully stored in pending table");
-#endif
+
+       debug_msg("dbmirror:storePending row successfully stored in pending table");
+
 
        if (cOp == 'd')
        {
@@ -202,7 +242,8 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
                 * This is a record of a delete operation.
                 * Just store the key data.
                 */
-               iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData);
+               iResult = storeKeyInfo(cpTableName, 
+                                      tBeforeTuple, tTupDesc, tableOid);
        }
        else if (cOp == 'i')
        {
@@ -210,20 +251,22 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
                 * An Insert operation.
                 * Store all data
                 */
-               iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE);
+               iResult = storeData(cpTableName, tAfterTuple, 
+                                   tTupDesc, tableOid,TRUE);
 
        }
        else
        {
                /* op must be an update. */
-               iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData);
-               iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc,
-                                                                                               tpTrigData, TRUE);
+               iResult = storeKeyInfo(cpTableName, tBeforeTuple, 
+                                      tTupDesc, tableOid);
+               iResult = iResult ? iResult : 
+                 storeData(cpTableName, tAfterTuple, tTupDesc,
+                           tableOid,TRUE);
        }
 
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "done storing keyinfo");
-#endif
+
+       debug_msg("dbmirror:storePending done storing keyinfo");
 
        return iResult;
 
@@ -231,12 +274,11 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple,
 
 int
 storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
-                        TupleDesc tTupleDesc,
-                        TriggerData *tpTrigData)
+                        TupleDesc tTupleDesc, Oid tableOid)
 {
 
        Oid                     saPlanArgTypes[1] = {NAMEOID};
-       char       *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
+       char       *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)";
        void       *pplan;
        Datum           saPlanData[1];
        char       *cpKeyData;
@@ -250,7 +292,7 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
        }
 
        /* pplan = SPI_saveplan(pplan); */
-       cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY);
+       cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY);
        if (cpKeyData == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
@@ -258,9 +300,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
                                 errmsg("there is no PRIMARY KEY for table %s",
                                                cpTableName)));
 
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "key data: %s", cpKeyData);
-#endif
+
+       debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData);
+
        saPlanData[0] = PointerGetDatum(cpKeyData);
 
        iRetCode = SPI_execp(pplan, saPlanData, NULL, 1);
@@ -270,12 +312,12 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData,
 
        if (iRetCode != SPI_OK_INSERT)
        {
-               elog(NOTICE, "error inserting row in pendingDelete");
+               ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION)
+                              ,errmsg("error inserting row in pendingDelete")));
                return -1;
        }
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "insert successful");
-#endif
+
+       debug_msg("insert successful");
 
        return 0;
 
@@ -318,12 +360,12 @@ getPrimaryKey(Oid tblOid)
  * Stores a copy of the non-key data for the row.
  *****************************************************************************/
 int
-storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
-                 TriggerData *tpTrigData, int iIncludeKeyData)
+storeData(char *cpTableName, HeapTuple tTupleData, 
+         TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData)
 {
 
        Oid                     planArgTypes[1] = {NAMEOID};
-       char       *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
+       char       *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)";
        void       *pplan;
        Datum           planData[1];
        char       *cpKeyData;
@@ -338,9 +380,10 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
 
        /* pplan = SPI_saveplan(pplan); */
        if (iIncludeKeyData == 0)
-               cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY);
+               cpKeyData = packageData(tTupleData, tTupleDesc, 
+                                       tableOid, NONPRIMARY);
        else
-               cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL);
+               cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL);
 
        planData[0] = PointerGetDatum(cpKeyData);
        iRetValue = SPI_execp(pplan, planData, NULL, 1);
@@ -353,9 +396,9 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
                elog(NOTICE, "error inserting row in pendingDelete");
                return -1;
        }
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "insert successful");
-#endif
+
+       debug_msg("dbmirror:storeKeyData insert successful");
+
 
        return 0;
 
@@ -376,8 +419,7 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc,
  *     ALL implies include all fields.
  */
 char *
-packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
-                       TriggerData *tpTrigData,
+packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid,
                        enum FieldUsage eKeyUsage)
 {
        int                     iNumCols;
@@ -391,14 +433,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
 
        if (eKeyUsage != ALL)
        {
-               tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
+               tpPKeys = getPrimaryKey(tableOid);
                if (tpPKeys == NULL)
                        return NULL;
        }
-#if defined DEBUG_OUTPUT
+
        if (tpPKeys != NULL)
-               elog(NOTICE, "have primary keys");
-#endif
+         {
+              debug_msg("dbmirror:packageData have primary keys");
+
+         }
+
        cpDataBlock = SPI_palloc(BUFFER_SIZE);
        iDataBlockSize = BUFFER_SIZE;
        iUsedDataBlock = 0;                     /* To account for the null */
@@ -417,49 +462,58 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
                {
                        /* Determine if this is a primary key or not. */
                        iIsPrimaryKey = 0;
-                       for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0;
+                       for (iPrimaryKeyIndex = 0;
+                            (*tpPKeys)[iPrimaryKeyIndex] != 0;
                                 iPrimaryKeyIndex++)
                        {
-                               if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter)
+                               if ((*tpPKeys)[iPrimaryKeyIndex] 
+                                   == iColumnCounter)
                                {
                                        iIsPrimaryKey = 1;
                                        break;
                                }
                        }
-                       if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY))
+                       if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : 
+                           (eKeyUsage != NONPRIMARY))
                        {
                                /**
                                 * Don't use.
                                 */
-#if defined DEBUG_OUTPUT
-                               elog(NOTICE, "skipping column");
-#endif
+
+                               debug_msg("dbmirror:packageData skipping column");
+
                                continue;
                        }
                }                                               /* KeyUsage!=ALL */
-#ifndef  NODROPCOLUMN
-               if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
-                 {
-                   /**
-                    * This column has been dropped.
-                    * Do not mirror it.
-                    */
-                   continue;
-                 }
-#endif
-               cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
-                                                                                [iColumnCounter - 1]->attname));
-#if defined DEBUG_OUTPUT
-               elog(NOTICE, "field name: %s", cpFieldName);
-#endif
-               while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6)
+
+               if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped)
+                 {
+                   /**
+                    * This column has been dropped.
+                    * Do not mirror it.
+                    */
+                   continue;
+                 }
+
+               cpFieldName = DatumGetPointer(NameGetDatum
+                                             
+                                             (&tTupleDesc->attrs
+                                              [iColumnCounter - 1]->attname));
+
+               debug_msg2("dbmirror:packageData field name: %s", cpFieldName);
+
+               while (iDataBlockSize - iUsedDataBlock < 
+                      strlen(cpFieldName) + 6)
                {
-                       cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+                       cpDataBlock = SPI_repalloc(cpDataBlock, 
+                                                  iDataBlockSize + 
+                                                  BUFFER_SIZE);
                        iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                }
                sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName);
                iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3;
-               cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter);
+               cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, 
+                                          iColumnCounter);
 
                cpUnFormatedPtr = cpFieldData;
                cpFormatedPtr = cpDataBlock + iUsedDataBlock;
@@ -477,15 +531,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
                        continue;
 
                }
-#if defined DEBUG_OUTPUT
-               elog(NOTICE, "field data: \"%s\"", cpFieldData);
-               elog(NOTICE, "starting format loop");
-#endif
+               debug_msg2("dbmirror:packageData field data: \"%s\"", 
+                          cpFieldData);
+               debug_msg("dbmirror:packageData starting format loop");
+
                while (*cpUnFormatedPtr != 0)
                {
                        while (iDataBlockSize - iUsedDataBlock < 2)
                        {
-                               cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+                               cpDataBlock = SPI_repalloc(cpDataBlock, 
+                                                          iDataBlockSize 
+                                                          + BUFFER_SIZE);
                                iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                                cpFormatedPtr = cpDataBlock + iUsedDataBlock;
                        }
@@ -505,25 +561,218 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
 
                while (iDataBlockSize - iUsedDataBlock < 3)
                {
-                       cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE);
+                       cpDataBlock = SPI_repalloc(cpDataBlock, 
+                                                  iDataBlockSize + 
+                                                  BUFFER_SIZE);
                        iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
                        cpFormatedPtr = cpDataBlock + iUsedDataBlock;
                }
                sprintf(cpFormatedPtr, "' ");
                iUsedDataBlock = iUsedDataBlock + 2;
-#if defined DEBUG_OUTPUT
-               elog(NOTICE, "data block: \"%s\"", cpDataBlock);
-#endif
+
+               debug_msg2("dbmirror:packageData data block: \"%s\"", 
+                          cpDataBlock);
 
        }                                                       /* for iColumnCounter  */
        if (tpPKeys != NULL)
                SPI_pfree(tpPKeys);
-#if defined DEBUG_OUTPUT
-       elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize,
-                iUsedDataBlock);
-#endif
+
+       debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d", 
+                  iDataBlockSize,
+                  iUsedDataBlock);
+
        memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock);
 
        return cpDataBlock;
 
 }
+
+
+PG_FUNCTION_INFO_V1(setval);
+
+Datum setval(PG_FUNCTION_ARGS)
+{
+
+
+  text * sequenceName;
+
+  Oid setvalArgTypes[2] = {TEXTOID,INT4OID};
+  int nextValue;
+  void * setvalPlan=NULL;
+  Datum setvalData[2];
+  const char * setvalQuery = "SELECT setval_pg($1,$2)";
+  int ret;
+    
+  sequenceName = PG_GETARG_TEXT_P(0);
+  nextValue = PG_GETARG_INT32(1);
+
+  setvalData[0] = PointerGetDatum(sequenceName);
+  setvalData[1] = Int32GetDatum(nextValue);
+
+  if (SPI_connect() < 0)
+    {
+      ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), 
+                    errmsg("dbmirror:setval could not connect to SPI")));
+      return -1;
+    }
+
+  setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes);
+  if(setvalPlan == NULL)
+    {
+      ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+                    errmsg("dbmirror:setval could not prepare plan")));
+      return -1;
+    }
+
+  ret = SPI_execp(setvalPlan,setvalData,NULL,1);
+  
+  if(ret != SPI_OK_SELECT || SPI_processed != 1)
+    return -1;
+
+  debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue);
+  
+ ret =  saveSequenceUpdate(sequenceName,nextValue);
+  
+ SPI_pfree(setvalPlan);
+
+ SPI_finish();
+ debug_msg("dbmirror:setval about to return");
+ return  Int64GetDatum(nextValue);
+
+}
+
+
+
+PG_FUNCTION_INFO_V1(nextval);
+
+Datum 
+nextval(PG_FUNCTION_ARGS)
+{
+  text *  sequenceName;
+  const char * nextvalQuery = "SELECT nextval_pg($1)";
+  Oid  nextvalArgTypes[1] = {TEXTOID};  
+  void * nextvalPlan=NULL;
+  Datum nextvalData[1];
+  
+  
+  int ret;
+  HeapTuple resTuple;
+  char isNull;
+  int nextSequenceValue;
+  
+
+
+  debug_msg("dbmirror:nextval Starting pending.so:nextval");
+
+
+  sequenceName = PG_GETARG_TEXT_P(0); 
+
+  if (SPI_connect() < 0)
+    {
+      ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), 
+                    errmsg("dbmirror:nextval could not connect to SPI")));
+      return -1;
+    }
+  
+  nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes);
+  
+
+  debug_msg("prepared plan to call nextval_pg");
+
+
+  if(nextvalPlan==NULL)
+    {
+      ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+                    errmsg("dbmirror:nextval error creating plan")));
+      return -1;
+    }
+  nextvalData[0] = PointerGetDatum(sequenceName);
+
+  ret = SPI_execp(nextvalPlan,nextvalData,NULL,1);
+
+  debug_msg("dbmirror:Executed call to nextval_pg");
+
+
+  if(ret != SPI_OK_SELECT || SPI_processed != 1)
+    return -1;
+
+  resTuple = SPI_tuptable->vals[0];
+
+  debug_msg("dbmirror:nextval Set resTuple");
+
+  nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple,
+                                                    SPI_tuptable->tupdesc,
+                                                    1,&isNull)));
+
+  
+
+  debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue);
+
+  saveSequenceUpdate(sequenceName,nextSequenceValue);
+  SPI_pfree(resTuple);
+  SPI_pfree(nextvalPlan);
+
+  SPI_finish();
+
+  return Int64GetDatum(nextSequenceValue);
+}
+
+
+int
+saveSequenceUpdate(const text * sequenceName,
+                  int nextSequenceVal)
+{
+
+  Oid  insertArgTypes[2] = {TEXTOID,INT4OID};
+  Oid  insertDataArgTypes[1] = {NAMEOID};
+  void * insertPlan=NULL;
+  void * insertDataPlan=NULL;
+  Datum insertDatum[2];
+  Datum insertDataDatum[1];
+  char nextSequenceText[32];
+
+  const char * insertQuery = 
+    "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES"  \
+    "($1,'s',$2)";
+  const char * insertDataQuery =
+    "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \
+    "(currval('dbmirror_pending_seqid_seq'),'t',$1)";
+  
+  int ret;
+
+
+  insertPlan = SPI_prepare(insertQuery,2,insertArgTypes);
+  insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes);
+
+  debug_msg("Prepared insert query");
+
+
+  if(insertPlan == NULL || insertDataPlan == NULL)
+    {
+      ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan")));
+    }
+
+  insertDatum[1] = Int32GetDatum(GetCurrentTransactionId());
+  insertDatum[0] = PointerGetDatum(sequenceName);
+
+  sprintf(nextSequenceText,"%d",nextSequenceVal);
+  insertDataDatum[0] = PointerGetDatum(nextSequenceText);
+  debug_msg2("dbmirror:savesequenceupdate: Setting value %s",
+            nextSequenceText);
+
+  debug_msg("dbmirror:About to execute insert query");
+
+  ret = SPI_execp(insertPlan,insertDatum,NULL,1);
+  
+  ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1);
+
+  debug_msg("dbmirror:Insert query finished");
+  SPI_pfree(insertPlan);
+  SPI_pfree(insertDataPlan);
+  
+  return ret;
+
+}
+