Add dbmirror to /contrib. Minor C cleanups and Makefile.
Bruce Momjian <bruce@momjian.us>
Sun, 23 Jun 2002 21:58:08 +0000 (21:58 +0000)
committerBruce Momjian <bruce@momjian.us>
Sun, 23 Jun 2002 21:58:08 +0000 (21:58 +0000)
Steven Singer

@@ -0,0 +1,869 @@
+# DBMirror.pl
+# Contains the Database mirroring script.
+# This script queries the pending table off the database specified
+# (along with the associated schema) for updates that are pending on a 
+# specific host.  The database on that host is then updated with the changes.
+#    Written by Steven Singer (ssinger@navtechinc.com)
+#    (c) 2001-2002 Navtech Systems Support Inc.
+#    Released under the GNU Public License version 2. See COPYING.
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    GNU General Public License for more details.
+# $Id: DBMirror.pl,v 1.1 2002/06/23 21:58:07 momjian Exp $ 
+=head1 NAME
+DBMirror.pl - A Perl module to mirror database changes from a master database
+to a slave.
+=head1 SYNPOSIS
+DBMirror.pl slaveConfigfile.conf
+This Perl script will connect to the master database and query its pending 
+table for a list of pending changes.
+The transactions of the original changes to the master will be preserved
+when sending things to the slave.
+=head1 METHODS
+=over 4
+  # add in a global path to files
+  # Pg should be included. 
+use strict;
+use Pg;
+use IO::Handle;
+sub mirrorCommand($$$$$$);
+sub mirrorInsert($$$$$);
+sub mirrorDelete($$$$$);
+sub mirrorUpdate($$$$$);
+sub sendQueryToSlaves($$);
+sub logErrorMessage($);
+sub openSlaveConnection($);
+sub updateMirrorHostTable($$);
+                       sub extractData($$);
+local $::masterHost;
+local $::masterDb; 
+local $::masterUser; 
+local $::masterPassword; 
+local $::errorThreshold=5;
+local $::errorEmailAddr=undef;
+my %slaveInfoHash;
+local $::slaveInfo = \%slaveInfoHash;
+my $lastErrorMsg;
+my $repeatErrorCount=0;
+my $lastXID;
+my $commandCount=0;
+my $masterConn;
+sub Main() {
+#run the configuration file.
+  if ($#ARGV != 0) {
+    die "usage: DBMirror.pl configFile\n";
+  }
+  if( ! defined do $ARGV[0]) {
+    logErrorMessage("Invalid Configuration file $ARGV[0]");
+    die;
+  }
+  my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+  $masterConn = Pg::connectdb($connectString);
+  unless($masterConn->status == PGRES_CONNECTION_OK) {
+    logErrorMessage("Can't connect to master database\n" .
+                   $masterConn->errorMessage);
+    die;
+  }
+  my $firstTime = 1;
+  while(1) {
+    if($firstTime == 0) {
+      sleep 60; 
+    } 
+    $firstTime = 0;
+# Open up the connection to the slave.
+    if(! defined $::slaveInfo->{"status"} ||
+       $::slaveInfo->{"status"} == -1) {
+      openSlaveConnection($::slaveInfo);           
+    }
+    sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
+    #Obtain a list of pending transactions using ordering by our approximation
+    #to the commit time.  The commit time approximation is taken to be the
+    #SeqId of the last row edit in the transaction.
+    my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd";
+    $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN";
+    $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = ";
+    $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"=";
+    $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' "; 
+    $pendingTransQuery .= " ON pd.\"XID\"";
+    $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null  ";
+    $pendingTransQuery .= " GROUP BY pd.\"XID\" ";
+    $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")";
+    my $pendingTransResults = $masterConn->exec($pendingTransQuery);
+    unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) {
+      logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
+      die;
+    }
+    my $numPendingTrans = $pendingTransResults->ntuples;
+    my $curTransTuple = 0;
+    #
+    # This loop loops through each pending transaction in the proper order.
+    # The Pending row edits for that transaction will be queried from the 
+    # master and sent + committed to the slaves.
+    while($curTransTuple < $numPendingTrans) {
+      my $XID = $pendingTransResults->getvalue($curTransTuple,0);
+      my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
+      my $seqId;
+      my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\",";
+      $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" ";
+      $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata ";
+      $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND ";
+      $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC";
+      my $pendingResults = $masterConn->exec($pendingQuery);
+      unless($pendingResults->resultStatus==PGRES_TUPLES_OK) {
+       logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
+       die;
+      }
+      my $numPending = $pendingResults->ntuples;
+      my $curTuple = 0;
+      sendQueryToSlaves(undef,"BEGIN");
+      while ($curTuple < $numPending) {
+       $seqId = $pendingResults->getvalue($curTuple,0);
+       my $tableName = $pendingResults->getvalue($curTuple,1);
+       my $op = $pendingResults->getvalue($curTuple,2);
+       $curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
+                                 $pendingResults,$curTuple) +1;
+       if($::slaveInfo->{"status"}==-1) {
+           last;
+       }
+      }
+      #Now commit the transaction.
+      if($::slaveInfo->{"status"}==-1) {
+         last;
+      }
+      sendQueryToSlaves(undef,"COMMIT");
+      updateMirrorHostTable($XID,$seqId);
+      if($commandCount > 5000) {
+       $commandCount = 0;
+       $::slaveInfo->{"status"} = -1;
+       $::slaveInfo->{"slaveConn"}->reset;
+       #Open the connection right away.
+       openSlaveConnection($::slaveInfo);
+      }
+      $pendingResults = undef;
+      $curTransTuple = $curTransTuple +1;
+    }#while transactions left.
+       $pendingTransResults = undef;
+  }#while(1)
+=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple)
+Mirrors a single SQL Command(change to a single row) to the slave.
+=over 4
+=item * SeqId
+The id number of the change to mirror.  This is the
+primary key of the pending table.
+=item * tableName
+The name of the table the transaction takes place on.
+=item * op
+The type of operation this transaction is.  'i' for insert, 'u' for update or
+'d' for delete.
+=item * transId
+The Transaction of of the Transaction that this command is part of.
+=item * pendingResults
+A Results set structure returned from Pg::execute that contains the 
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction. 
+=item * currentTuple 
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited.   If the command is an update then this points to the row
+with IsKey equal to true.  The next row, curTuple+1 is the contains the
+PendingData with IsKey false for the update.
+=item returns
+The tuple number of last tuple for this command.  This might be equal to
+currentTuple or it might be larger (+1 in the case of an Update).
+sub mirrorCommand($$$$$$) {
+    my $seqId = $_[0];
+    my $tableName = $_[1];
+    my $op = $_[2];
+    my $transId = $_[3];
+    my $pendingResults = $_[4];
+    my $currentTuple = $_[5];
+    if($op eq 'i') {
+      $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
+                              ,$currentTuple);
+    }
+    if($op eq 'd') {
+      $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults,
+                              $currentTuple);
+    }
+    if($op eq 'u') {
+      $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
+                  $currentTuple);
+    }
+    $commandCount = $commandCount +1;
+    if($commandCount % 100 == 0) {
+    #  print "Sent 100 commmands on SeqId $seqId \n";
+    #  flush STDOUT;
+    }
+    return $currentTuple
+  }
+=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple)
+Mirrors an INSERT operation to the slave database.  A new row is placed
+in the slave database containing the primary key from pendingKeys along with
+the data fields contained in the row identified by sourceOid.
+=over 4
+=item * transId
+The sequence id of the INSERT operation being mirrored. This is the primary
+key of the pending table.
+=item * tableName
+The name of the table the transaction takes place on.
+=item * sourceOid
+The OID of the row in the master database for which this transaction effects.
+If the transaction is a delete then the operation is not valid.
+=item * transId 
+The Transaction Id of transaction that this insert is part of.
+=item * pendingResults
+A Results set structure returned from Pg::execute that contains the 
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction. 
+=item * currentTuple 
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited.   In the case of an insert this should point to the one 
+row for the row edit.
+=item returns
+The tuple number of the last tuple for the row edit.  This should be 
+sub mirrorInsert($$$$$) {
+    my $seqId = $_[0];
+    my $tableName = $_[1];
+    my $transId = $_[2];
+    my $pendingResults = $_[3];
+    my $currentTuple = $_[4];
+    my $counter;
+    my $column;
+    my $firstIteration=1;
+    my %recordValues = extractData($pendingResults,$currentTuple);
+    #Now build the insert query.
+    my $insertQuery = "INSERT INTO \"$tableName\" (";
+    my $valuesQuery = ") VALUES (";
+    foreach $column (keys (%recordValues)) {
+       if($firstIteration==0) {
+           $insertQuery .= " ,";
+           $valuesQuery .= " ,";
+       }
+      $insertQuery .= "\"$column\"";
+      if(defined $recordValues{$column}) {
+       my $quotedValue = $recordValues{$column};
+       $quotedValue =~ s/\\/\\\\/g;
+       $quotedValue =~ s/'/\\'/g;
+       $valuesQuery .= "'$quotedValue'";
+      }
+      else {
+       $valuesQuery .= "null";
+      }
+       $firstIteration=0;
+    }
+    $valuesQuery .= ")";
+    sendQueryToSlaves(undef,$insertQuery . $valuesQuery);
+    return $currentTuple;
+=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple)
+Deletes a single row from the slave database.  The row is identified by the
+primary key for the transaction in the pendingKeys table.
+=over 4
+=item * SeqId
+The Sequence id for this delete request.
+=item * tableName
+The name of the table to delete the row from.
+=item * transId 
+The Transaction Id of the transaction that this command is part of.
+=item * pendingResults
+A Results set structure returned from Pg::execute that contains the 
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction. 
+=item * currentTuple 
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited.   In the case of a  delete this should point to the one 
+row for the row edit.
+=item returns
+The tuple number of the last tuple for the row edit.  This should be 
+sub mirrorDelete($$$$$) {
+    my $seqId = $_[0];
+    my $tableName = $_[1];
+    my $transId = $_[2];
+    my $pendingResult = $_[3];
+    my $currentTuple = $_[4];
+    my %dataHash;
+    my $currentField;
+    my $firstField=1;
+    %dataHash = extractData($pendingResult,$currentTuple);
+    my $counter=0;
+    my $deleteQuery = "DELETE FROM \"$tableName\" WHERE ";
+    foreach $currentField (keys %dataHash) {
+      if($firstField==0) {
+       $deleteQuery .= " AND ";
+      }
+      my $currentValue = $dataHash{$currentField};
+      $deleteQuery .= "\"";
+      $deleteQuery .= $currentField;
+      if(defined $currentValue) {
+       $deleteQuery .= "\"='";
+       $deleteQuery .= $currentValue;
+       $deleteQuery .= "'";
+      }
+      else {
+       $deleteQuery .= " is null ";
+      }
+      $counter++;
+      $firstField=0;
+    }
+    sendQueryToSlaves($transId,$deleteQuery);
+    return $currentTuple;
+=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple)
+Mirrors over an edit request to a single row of the database.
+The primary key from before the edit is used to determine which row in the
+slave should be changed.  
+After the edit takes place on the slave its primary key will match the primary 
+key the master had immediatly following the edit.  All other fields will be set
+to the current values.   
+Data integrity is maintained because the mirroring is performed in an 
+SQL transcation so either all pending changes are made or none are.
+=over 4
+=item * seqId 
+The Sequence id of the update.
+=item * tableName
+The name of the table to perform the update on.
+=item * transId
+The transaction Id for the transaction that this command is part of.
+=item * pendingResults
+A Results set structure returned from Pg::execute that contains the 
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction. 
+=item * currentTuple 
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited.   In the case of a  delete this should point to the one 
+row for the row edit.
+=item returns
+The tuple number of the last tuple for the row edit.  This should be 
+currentTuple +1.  Which points to the non key row of the update.
+sub mirrorUpdate($$$$$) {
+    my $seqId = $_[0];
+    my $tableName = $_[1];
+    my $transId = $_[2];
+    my $pendingResult = $_[3];
+    my $currentTuple = $_[4];
+    my $counter;
+    my $quotedValue;
+    my $updateQuery = "UPDATE \"$tableName\" SET ";
+    my $currentField;
+    my %keyValueHash;
+    my %dataValueHash;
+    my $firstIteration=1;
+    #Extract the Key values. This row contains the values of the
+    # key fields before the update occours(the WHERE clause)
+    %keyValueHash = extractData($pendingResult,$currentTuple);
+    #Extract the data values.  This is a SET clause that contains 
+    #values for the entire row AFTER the update.    
+    %dataValueHash = extractData($pendingResult,$currentTuple+1);
+    $firstIteration=1;
+    foreach $currentField (keys (%dataValueHash)) {
+      if($firstIteration==0) {
+       $updateQuery .= ", ";
+      }
+      $updateQuery .= " \"$currentField\"=";
+      my $currentValue = $dataValueHash{$currentField};
+      if(defined $currentValue ) {
+       $quotedValue = $currentValue;
+       $quotedValue =~ s/\\/\\\\/g;
+       $quotedValue =~ s/'/\\'/g;
+       $updateQuery .= "'$quotedValue'";
+       }
+      else {
+       $updateQuery .= "null ";
+      }
+      $firstIteration=0;
+    }
+    $updateQuery .= " WHERE ";
+    $firstIteration=1;
+    foreach $currentField (keys (%keyValueHash)) {   
+      my $currentValue;
+      if($firstIteration==0) {
+       $updateQuery .= " AND ";
+      }
+      $updateQuery .= "\"$currentField\"=";
+      $currentValue = $keyValueHash{$currentField};
+      if(defined $currentValue) {
+       $quotedValue = $currentValue;
+       $quotedValue =~ s/\\/\\\\/g;
+        $quotedValue =~ s/'/\\'/g;
+       $updateQuery .= "'$quotedValue'";
+      }
+      else {
+       $updateQuery .= " null ";
+      }
+      $firstIteration=0;
+    }
+    sendQueryToSlaves($transId,$updateQuery);
+    return $currentTuple+1;
+=item sendQueryToSlaves(seqId,sqlQuery)
+Sends an SQL query to the slave.
+=over 4
+=item * seqId
+The sequence Id of the command being sent. Undef if no command is associated 
+with the query being sent.
+=item * sqlQuery
+SQL operation to perform on the slave.
+sub sendQueryToSlaves($$) {
+    my $seqId = $_[0];
+    my  $sqlQuery = $_[1];
+   if($::slaveInfo->{"status"} == 0) {
+       my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
+       unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
+          my $errorMessage;
+          $errorMessage = "Error sending query  $seqId to " ;
+          $errorMessage .= $::slaveInfo->{"slaveHost"};
+          $errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage;
+          $errorMessage .= "\n" . $sqlQuery;
+          logErrorMessage($errorMessage);
+          $::slaveInfo->{"slaveConn"}->exec("ROLLBACK");
+          $::slaveInfo->{"status"} = -1;
+       }
+   }
+=item logErrorMessage(error)
+Mails an error message to the users specified $errorEmailAddr
+The error message is also printed to STDERR.
+=over 4
+=item * error
+The error message to log.
+sub logErrorMessage($) {
+    my $error = $_[0];
+    if(defined $lastErrorMsg and $error eq $lastErrorMsg) {
+       if($repeatErrorCount<$::errorThreshold) {
+           $repeatErrorCount++;
+           warn($error);
+           return;
+       }
+    }
+    $repeatErrorCount=0;
+    if(defined $::errorEmailAddr) {
+      my $mailPipe;
+      open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr");
+      print mailPipe "=====================================================\n";
+      print mailPipe "         DBMirror.pl                                 \n";
+      print mailPipe "\n";
+      print mailPipe " The DBMirror.pl script has encountred an error.     \n";
+      print mailPipe " It might indicate that either the master database has\n";
+      print mailPipe " gone down or that the connection to a slave database can\n";
+      print mailPipe " not be made.                                         \n";
+      print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n";
+      print mailPipe  "\n";
+      print mailPipe $error;
+      print mailPipe "\n\n\n=================================================\n";
+      close mailPipe;
+    }
+    warn($error);    
+    $lastErrorMsg = $error;
+sub openSlaveConnection($) {
+    my $slavePtr = $_[0];
+    my $slaveConn;
+    my $slaveConnString = "host=" . $slavePtr->{"slaveHost"};    
+    $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
+    $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
+    $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
+    $slaveConn = Pg::connectdb($slaveConnString);
+    if($slaveConn->status !=PGRES_CONNECTION_OK) {
+       my $errorMessage = "Can't connect to slave database " ;
+       $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
+       $errorMessage .= $slaveConn->errorMessage;
+       logErrorMessage($errorMessage);    
+       $slavePtr->{"status"} = -1;
+    }
+    else {
+       $slavePtr->{"slaveConn"} = $slaveConn;
+       $slavePtr->{"status"} = 0;
+       #Determine the MirrorHostId for the slave from the master's database
+       my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM '
+                                         . ' "MirrorHost" WHERE "HostName"'
+                                         . '=\'' . $slavePtr->{"slaveHost"}
+                                         . '\'');
+       if($resultSet->ntuples !=1) {
+           my $errorMessage .= $slavePtr->{"slaveHost"} ."\n";
+           $errorMessage .= "Has no MirrorHost entry on master\n";
+           logErrorMessage($errorMessage);
+           $slavePtr->{"status"}=-1;
+           return;
+       }
+       $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
+    }
+=item updateMirrorHostTable(lastTransId,lastSeqId)
+Updates the MirroredTransaction table to reflect the fact that
+this transaction has been sent to the current slave.
+=over 4 
+=item * lastTransId
+The Transaction id for the last transaction that has been succesfully mirrored to
+the currently open slaves.
+=item * lastSeqId 
+The Sequence Id of the last command that has been succefully mirrored
+sub updateMirrorHostTable($$) {
+    my $lastTransId = shift;
+    my $lastSeqId = shift;
+    if($::slaveInfo->{"status"}==0) {
+       my $deleteTransactionQuery;
+       my $deleteResult;
+       my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" ";
+       $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")";
+       $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
+       my $updateResult = $masterConn->exec($updateMasterQuery);
+       unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
+           my $errorMessage = $masterConn->errorMessage . "\n";
+           $errorMessage .= $updateMasterQuery;
+           logErrorMessage($errorMessage);
+           die;
+       }
+#      print "Updated slaves to transaction $lastTransId\n" ;   
+#        flush STDOUT;  
+       #If this transaction has now been mirrored to all mirror hosts
+       #then it can be deleted.
+       $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"='
+           . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"'
+               . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
+                   . ' "MirrorHost")';
+       $deleteResult = $masterConn->exec($deleteTransactionQuery);
+       if($deleteResult->resultStatus!=PGRES_COMMAND_OK) { 
+           logErrorMessage($masterConn->errorMessage . "\n" . 
+                           $deleteTransactionQuery);
+           die;
+       }
+    }
+sub extractData($$) {
+  my $pendingResult = $_[0];
+  my $currentTuple = $_[1];
+  my $fnumber;
+  my %valuesHash;
+  $fnumber = 4;
+  my $dataField = $pendingResult->getvalue($currentTuple,$fnumber);
+  while(length($dataField)>0) {
+    # Extract the field name that is surronded by double quotes
+    $dataField =~ m/(\".*?\")/s;
+    my $fieldName = $1;
+    $dataField = substr $dataField ,length($fieldName);
+    $fieldName =~ s/\"//g; #Remove the surronding " signs.
+    if($dataField =~ m/(^= )/s) {
+      #Matched null
+       $dataField = substr $dataField , length($1);
+      $valuesHash{$fieldName}=undef;
+    }
+    elsif ($dataField =~ m/(^=\')/s) {
+      #Has data.
+      my $value;
+      $dataField = substr $dataField ,2; #Skip the ='
+    LOOP: {  #This is to allow us to use last from a do loop.
+            #Recommended in perlsyn manpage.
+      do {
+       my $matchString;
+       #Find the substring ending with the first ' or first \
+       $dataField =~ m/(.*?[\'\\])?/s; 
+       $matchString = $1;
+       $value .= substr $matchString,0,length($matchString)-1;
+       if($matchString =~ m/(\'$)/s) {
+         # $1 runs to the end of the field value.
+           $dataField = substr $dataField,length($matchString)+1;
+           last;
+       }
+       else {
+         #deal with the escape character.
+         #It The character following the escape gets appended.
+           $dataField = substr $dataField,length($matchString);            
+           $dataField =~ s/(^.)//s;        
+           $value .=  $1;
+       }
+      } until(length($dataField)==0);
+  }
+      $valuesHash{$fieldName} = $value;
+      }#else if 
+         else {
+           logErrorMessage "Error in PendingData Sequence Id " .
+               $pendingResult->getvalue($currentTuple,0);
+           die;
+         }
+  } #while
+  return %valuesHash;
@@ -0,0 +1,10 @@
+# $Header: /cvsroot/pgsql/contrib/dbmirror/Attic/Makefile,v 1.1 2002/06/23 21:58:07 momjian Exp $
+subdir = contrib/dbmirror
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+MODULES = pending
+DOCS = README.dbmirror
+include $(top_srcdir)/contrib/contrib-global.mk
+CREATE FUNCTION "recordchange" () RETURNS opaque AS
+'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C';
+CREATE TABLE "MirrorHost" (
+"MirrorHostId" serial,
+"HostName" varchar NOT NULL
+CREATE TABLE "Pending" (
+"SeqId" serial,
+"TableName" varchar NOT NULL,
+"Op" character,
+"XID" int4 NOT NULL,
+CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID");
+CREATE TABLE "PendingData" (
+"SeqId" int4 NOT NULL,
+"IsKey" bool NOT NULL,
+"Data" varchar,
+PRIMARY KEY ("SeqId", "IsKey") ,
+CREATE TABLE "MirroredTransaction" (
+"XID" int4 NOT NULL,
+"LastSeqId" int4 NOT NULL,
+"MirrorHostId" int4 NOT NULL,
+PRIMARY KEY  ("XID","MirrorHostId"),
+FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId")  ON UPDATE
+DBMirror - Postgres Database Mirroring
+DBMirror is a database mirroring system developed for the Postgres
+database Written and maintened by Steven Singer(ssinger@navtechinc.com)
+(c) 2001-2002 Navtech Systems Support Inc.
+Released under the GNU Public License version 2. See COPYING.
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    GNU General Public License for more details.
+The mirroring system is trigger based and provides the following key features:
+-Support for multiple mirror slaves
+-Transactions are maintained
+-Per table selection of what gets mirrored.
+The system is based on the idea that a master database exist where all
+edits are made to the tables being mirrored.   A trigger attatched to the
+tables being mirrored runs logging information about the edit to 
+the Pending table and  PendingData table. 
+A perl script(DBMirror.pl) runs continiously for each slave database(A database
+that the change is supposed to be mirrored to) examining the Pending
+table; searching for transactions that need to be sent to that particular slave 
+database.  Those transactions are then mirrored to the slave database and
+the MirroredTransaction table is updated to reflect that the transaction has
+been sent.
+If the transaction has been sent to all know slave hosts (All entries
+in the MirrorHost table) then all records of it are purged from the
+Pending tables.
+Installation Instructions
+1) Compile pending.c
+The file pending.c contains the recordchange trigger.  This runs every
+time a row inside of a table being mirrored changes.
+To build the trigger run  make on the "Makefile" in the DBMirror directory.
+The Makefile supplied assumes that the postgres include files are in 
+Postgres-7.1.x installations should change this to
+/usr/local/pgsql/include  (The server part is for 7.2+)
+If you have installed the postgres include files to another location then
+modify the Makefile to reflect this.
+The trigger requires that all postgres headers be installed, this is
+accomplished in postgresql(7.1 or 7.2) by running "make install-all-headers"
+in the postgres source directory.
+The Makefile should create a file named pending.so that contains the trigger.
+Install this file in /usr/local/pgsql/lib  (or another suitable location).
+If you choose a different location the MirrorSetup.sql script will need
+to be modified to reflect your new location.   The CREATE FUNCTION command
+in the MirrorSetup.sql script associates the trigger function with the
+pending.so shared library.  Modify the arguments to this command if you
+choose to install the trigger elsewhere.
+2) Run MirroSetup.sql
+This file contains SQL commands to setup the Mirroring environment.  
+This includes
+-Telling Postgres about the "recordchange" trigger function.
+-Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables
+To execute the script use psql as follows 
+"psql -f MirrorSetup.sql  MyDatabaseName"
+where MyDatabaseName is the name of the database you wish to install mirroring
+on(Your master).
+3) Create slaveDatabase.conf files.
+Each slave database needs its own configuration file for the 
+DBMirror.pl script.  See slaveDatabase.conf for a sample.
+The master settings refer to the master database(The one that is
+being mirrored).
+The slave settings refer to the database that the data is being mirrored to.
+The slaveHost parameter must refer to the machine name of the slave (Either
+a resolvable hostname or an IP address).  The value for slave host
+must match the Hostname field in the MirrorHost table(See step 6).
+The master user must have sufficient permissions to modify the Pending
+tables and to read all of the tables being mirrored.
+The slave user must have enough permissions on the slave database to
+modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
+4) Add the trigger to tables.
+Execute the SQL code in AddTrigger.sql once for each table that should
+be mirrored.   Replace MyTableName with the name of the table that should
+be mirrored.
+5)  Create the slave database.
+The DBMirror system keeps the contents of mirrored tables identical on the
+master and slave databases.  When you first install the mirror triggers the
+master and slave databases must be the same.
+If you are starting with an empty master database then the slave should
+be empty as well.  Otherwise use pg_dump to ensure that the slave database
+tables are initially identical to the master.
+6) Add entries in the MirrorHost table.
+Each slave database must have an entry in the MirrorHost table.
+The name of the host in the MirrorHost table must exactly match the
+slaveHost variable for that slave in the configuration file.
+For example
+INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com');
+6)  Start DBMirror.pl
+DBMirror.pl is the perl script that handles the mirroring.  
+It requires the Perl library Pg(See src/interfaces/perl5 in the postgres 
+source distribution).
+It takes its configuration file as an argument(The one from step 3)
+One instance of DBMirror.pl runs for each slave machine that is receiving
+mirrored data.
+Any errors are printed to standard out and emailed to the address specified in
+the configuration file. 
+DBMirror can be run from the master, the slave, or a third machine as long
+as it is able to access both the master and slave databases.
+7) Periodically run clean_pending.pl 
+clean_pending.pl cleans out any entries from the Pending tables that
+have already been mirrored to all hosts in the MirrorHost table.
+It uses the same configuration file as DBMirror.pl.
+Normally DBMirror.pl will clean these tables as it goes but in some 
+circumstances this will not happen.
+For example if a transaction has been mirrored to all slaves except for
+one, then that host is removed from the MirrorHost table(It stops being
+a mirror slave) the transactions that had already been mirrored to 
+all the other hosts will not be deleted from the Pending tables by 
+DBMirror.pl since DBMirror.pl will run against these transactions again
+since they have already been sent to all the other hosts.
+clean_pending.pl will remove these transactions.
+TODO(Current Limitations)
+-Support for selective mirroring based on the content of data.
+-Support for BLOB's.
+-Support for conflict resolution.
+-Batching SQL commands in DBMirror for better performance over WAN's.
+-Better support for dealing with Schema changes.
+Tested Platforms:
+DBMirror has been tested on the following configurations but should
+work on any platform with Postgres >= 7.1 and Perl 5.6. 
+RedHat Linux 7.1 & 6.2
+ -Postgres 7.1.2
+ -Perl 5.6
+Mandrake Linux 8.0(Limited Testing)
+ -Postgres 7.2
+ -Perl 5.6
+Steven Singer
+Navtech Systems Support Inc.
+# clean_pending.pl
+# This perl script removes entries from the pending,pendingKeys,
+# pendingDeleteData tables that have already been mirrored to all hosts.
+#    Written by Steven Singer (ssinger@navtechinc.com)
+#    (c) 2001-2002 Navtech Systems Support Inc.
+#    Released under the GNU Public License version 2. See COPYING.
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    GNU General Public License for more details.
+# $Id: clean_pending.pl,v 1.1 2002/06/23 21:58:08 momjian Exp $
+=head1 NAME
+clean_pending.pl - A Perl script to remove old entries from the 
+pending, pendingKeys, and pendingDeleteData tables.
+=head1 SYNPOSIS
+clean_pending.pl databasename
+This Perl script connects to the database specified as a command line argument
+on the local system.  It uses a hard-coded username and password.
+It then removes any entries from the pending, pendingDeleteData, and 
+pendingKeys tables that have already been sent to all hosts in mirrorHosts.
+    # add in a global path to files
+    #Ensure that Pg is in the path.
+use strict;
+use Pg;
+if ($#ARGV != 0) {
+   die "usage: clean_pending.pl configFile\n";
+if( ! defined do $ARGV[0]) {
+    die("Invalid Configuration file $ARGV[0]");
+#connect to the database.
+my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+my $dbConn = Pg::connectdb($connectString);
+unless($dbConn->status == PGRES_CONNECTION_OK) {
+    printf("Can't connect to database\n");
+    die;
+my $result = $dbConn->exec("BEGIN");
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+   die $dbConn->errorMessage;
+#delete all transactions that have been sent to all mirrorhosts
+#or delete everything if no mirror hosts are defined.
+# Postgres takes the "SELECT COUNT(*) FROM "MirrorHost"  and makes it into
+# an InitPlan.  EXPLAIN show's this.  
+my $deletePendingQuery = 'DELETE FROM "Pending" WHERE (SELECT ';
+$deletePendingQuery .= ' COUNT(*) FROM "MirroredTransaction" WHERE ';
+$deletePendingQuery .= ' "XID"="Pending"."XID") = (SELECT COUNT(*) FROM ';
+$deletePendingQuery .= ' "MirrorHost") OR (SELECT COUNT(*) FROM ';
+$deletePendingQuery .= ' "MirrorHost") = 0';
+my $result = $dbConn->exec($deletePendingQuery);
+unless ($result->resultStatus == PGRES_COMMAND_OK ) {
+    printf($dbConn->errorMessage);
+    die;
+$result = $dbConn->exec('VACUUM "Pending"');
+unless ($result->resultStatus == PGRES_COMMAND_OK) {
+   printf($dbConn->errorMessage);
+$result = $dbConn->exec('VACUUM "PendingData"');
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+   printf($dbConn->errorMessage);
+$result = $dbConn->exec('VACUUM "MirroredTransaction"');
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+  printf($dbConn->errorMessage);
+ * pending.c
+ * $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $ 
+ *
+ * This file contains a trigger for Postgresql-7.x to record changes to tables
+ * to a pending table for mirroring.
+ * All tables that should be mirrored should have this trigger hooked up to it.
+ *
+ *   Written by Steven Singer (ssinger@navtechinc.com)
+ *   (c) 2001-2002 Navtech Systems Support Inc.
+ *   Released under the GNU Public License version 2. See COPYING.
+ *
+ *
+ *   This program is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   GNU General Public License for more details.
+ *
+ *
+ ***************************************************************************/
+#include <executor/spi.h>
+#include <commands/trigger.h>
+#include <postgres.h>
+int storePending(char * cpTableName, HeapTuple  tBeforeTuple, 
+                HeapTuple tAfterTuple,
+                 TupleDesc tTupdesc,
+                TriggerData * tpTrigdata,char cOp);
+int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
+                TriggerData * tpTrigdata);
+int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
+             TriggerData * tpTrigData,int iIncludeKeyData);
+int2vector * getPrimaryKey(Oid tblOid);
+char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
+                  TriggerData * tTrigData,
+                  enum FieldUsage eKeyUsage );
+#define BUFFER_SIZE 256
+#define MAX_OID_LEN 10
+extern Datum recordchange(PG_FUNCTION_ARGS);
+ * The entry point for the trigger function.
+ * The Trigger takes a single SQL 'text' argument indicating the name of the
+ * table the trigger was applied to.  If this name is incorrect so will the
+ * mirroring.
+ ****************************************************************************/
+Datum recordchange(PG_FUNCTION_ARGS) {
+  TriggerData * trigdata; 
+  TupleDesc tupdesc;
+  HeapTuple beforeTuple=NULL;
+  HeapTuple afterTuple=NULL;
+  HeapTuple retTuple=NULL;
+  char * tblname;
+  char op;
+  if(fcinfo->context!=NULL) {
+    if(SPI_connect() < 0) {
+      elog(NOTICE,"storePending could not connect to SPI");
+      return -1;
+    }
+    trigdata = (TriggerData*)fcinfo->context;
+    /* Extract the table name */
+    tblname = SPI_getrelname(trigdata->tg_relation);
+    tupdesc = trigdata->tg_relation->rd_att;
+    if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
+      retTuple = trigdata->tg_newtuple;
+      beforeTuple = trigdata->tg_trigtuple;
+      afterTuple = trigdata->tg_newtuple;
+      op='u';
+    }
+    else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) {
+      retTuple = trigdata->tg_trigtuple;
+      afterTuple = trigdata->tg_trigtuple;
+      op = 'i';
+    }
+    else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) {
+      retTuple = trigdata->tg_trigtuple;
+      beforeTuple = trigdata->tg_trigtuple;
+      op = 'd';
+    }
+    if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) {
+      /* An error occoured. Skip the operation. */
+      elog(ERROR,"Operation could not be mirrored");
+      return PointerGetDatum(NULL);
+    }
+#if defined DEBUG_OUTPUT
+    elog(NOTICE,"Returning on success");
+    SPI_finish();
+    return PointerGetDatum(retTuple);
+  }
+  else {
+    /*
+     * Not being called as a trigger.
+     */
+    return PointerGetDatum(NULL);
+  }
+ * Constructs and executes an SQL query to write a record of this tuple change
+ * to the pending table. 
+ *****************************************************************************/
+int storePending(char * cpTableName, HeapTuple  tBeforeTuple, 
+                HeapTuple tAfterTuple,
+                 TupleDesc tTupDesc,
+                 TriggerData * tpTrigData,char cOp) {
+  char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
+  int iResult=0;
+  HeapTuple tCurTuple; // Points the current tuple(before or after)
+  Datum saPlanData[4];
+  Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID};
+  void * vpPlan;
+  tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
+  vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes);
+  if(vpPlan==NULL) {
+    elog(NOTICE,"Error creating plan");
+  }
+  //  SPI_saveplan(vpPlan);
+  saPlanData[0] = PointerGetDatum(cpTableName);
+  saPlanData[1] = CharGetDatum(cOp);
+  saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
+  iResult = SPI_execp(vpPlan,saPlanData,NULL,1);
+  if(iResult < 0) {
+    elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult);
+  }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"row successfully stored in pending table");
+  if(cOp=='d') {
+    /**
+     * This is a record of a delete operation.
+     * Just store the key data.
+     */
+    iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
+  }
+  else if (cOp=='i') {
+    /**
+     * An Insert operation.
+     * Store all data
+     */
+    iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE);
+  }
+  else {
+    /* op must be an update. */
+    iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
+    iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc,
+                                           tpTrigData,TRUE);
+  }
+#if defined DEBUG_OUTPUT
+  elog(NOTICE,"DOne storing keyinfo");
+  return iResult;
+int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, 
+                TupleDesc tTupleDesc,
+                TriggerData * tpTrigData) {
+  Oid saPlanArgTypes[1] = {NAMEOID};
+  char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
+  void * pplan;
+  Datum saPlanData[1];
+  char * cpKeyData;
+  int iRetCode;
+  pplan = SPI_prepare(insQuery,1,saPlanArgTypes);
+    if(pplan==NULL) {
+      elog(NOTICE,"Could not prepare INSERT plan");
+      return -1;
+    }
+    //    pplan = SPI_saveplan(pplan);
+    cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY);
+#if defined DEBUG_OUTPUT
+    elog(NOTICE,cpKeyData);
+    saPlanData[0] = PointerGetDatum(cpKeyData);
+    iRetCode = SPI_execp(pplan,saPlanData,NULL,1);    
+    if(cpKeyData!=NULL) {
+      SPI_pfree(cpKeyData);
+    }
+    if(iRetCode != SPI_OK_INSERT ) {
+      elog(NOTICE,"Error inserting row in pendingDelete");
+      return -1;
+    }
+#if defined DEBUG_OUTPUT
+    return 0;
+int2vector * getPrimaryKey(Oid tblOid) {
+  char * queryBase;
+  char * query;
+  bool isNull;
+  int2vector * resultKey;
+  int2vector * tpResultKey;
+  HeapTuple resTuple;
+  Datum resDatum;
+  int ret;
+  queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
+  query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1);
+  sprintf(query,"%s%d",queryBase,tblOid);
+  ret = SPI_exec(query,1);
+  if(ret != SPI_OK_SELECT || SPI_processed != 1 ) {
+    elog(NOTICE,"Could not select primary index key");
+    return NULL;
+  }
+  resTuple = SPI_tuptable->vals[0];
+  resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull);
+  tpResultKey = (int2vector*) DatumGetPointer(resDatum);
+  resultKey = SPI_palloc(sizeof(int2vector));
+  memcpy(resultKey,tpResultKey,sizeof(int2vector));
+  SPI_pfree(query);
+  return resultKey;
+ * Stores a copy of the non-key data for the row.
+ *****************************************************************************/
+int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
+                    TriggerData * tpTrigData,int iIncludeKeyData) {
+  Oid planArgTypes[1] = {NAMEOID};
+  char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
+  void * pplan;
+  Datum planData[1];
+  char * cpKeyData;
+  int iRetValue;
+  pplan = SPI_prepare(insQuery,1,planArgTypes);
+    if(pplan==NULL) {
+      elog(NOTICE,"Could not prepare INSERT plan");
+      return -1;
+    }
+    //    pplan = SPI_saveplan(pplan);
+    if(iIncludeKeyData==0) {
+      cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY); 
+    }
+    else {
+      cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL);
+    }
+    planData[0] = PointerGetDatum(cpKeyData);
+    iRetValue = SPI_execp(pplan,planData,NULL,1);
+    if(cpKeyData!=0) {
+      SPI_pfree(cpKeyData);
+    }
+    if(iRetValue != SPI_OK_INSERT ) {
+      elog(NOTICE,"Error inserting row in pendingDelete");
+      return -1;
+    }
+#if defined DEBUG_OUTPUT
+    return 0;  
+ * Packages the data in tTupleData into a string of the format 
+ * FieldName='value text'  where any quotes inside of value text
+ * are escaped with a backslash and any backslashes in value text
+ * are esacped by a second back slash.
+ *
+ * tTupleDesc should be a description of the tuple stored in 
+ * tTupleData.  
+ *
+ * eFieldUsage specifies which fields to use.
+ *  PRIMARY implies include only primary key fields.
+ *  NONPRIMARY implies include only non-primary key fields.
+ *  ALL implies include all fields.
+ */
+char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
+                  TriggerData *  tpTrigData,
+                  enum FieldUsage eKeyUsage ) {
+  int iNumCols;
+  int2vector * tpPKeys=NULL;
+  int iColumnCounter;
+  char * cpDataBlock;
+  int iDataBlockSize;
+  int  iUsedDataBlock;
+  iNumCols = tTupleDesc->natts;
+  if(eKeyUsage!=ALL) {
+    tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
+    if(tpPKeys==NULL) {
+      return NULL;
+    }
+  }
+#if defined DEBUG_OUTPUT
+  if(tpPKeys!=NULL) {
+    elog(NOTICE,"Have primary keys");
+  }
+  cpDataBlock = SPI_palloc(BUFFER_SIZE);
+  iDataBlockSize = BUFFER_SIZE;
+  iUsedDataBlock = 0;                   /* To account for the null */
+  for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) {
+    int iIsPrimaryKey;
+    int iPrimaryKeyIndex;
+    char * cpUnFormatedPtr;
+    char * cpFormatedPtr;
+    char * cpFieldName;
+    char * cpFieldData;
+    if(eKeyUsage!=ALL) {
+      //Determine if this is a primary key or not.
+      iIsPrimaryKey=0;
+      for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0;
+         iPrimaryKeyIndex++) {
+       if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) {      
+         iIsPrimaryKey=1;
+         break;
+       }
+      }
+      if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) {
+       /**
+        * Don't use.
+        */
+#if defined DEBUG_OUTPUT
+       elog(NOTICE,"Skipping column");
+       continue;
+      }
+    }                                   /* KeyUsage!=ALL */
+    cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
+                                              [iColumnCounter-1]->attname));
+#if defined DEBUG_OUTPUT
+    elog(NOTICE,cpFieldName);
+    while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) {
+      cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE);
+      iDataBlockSize = iDataBlockSize + BUFFER_SIZE;      
+    }
+    sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName);
+    iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3;
+    cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter);
+    cpUnFormatedPtr = cpFieldData;
+    cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+    if(cpFieldData!=NULL) {
+      *cpFormatedPtr='\'';
+      iUsedDataBlock++;
+      cpFormatedPtr++;
+    }
+    else {
+      *cpFormatedPtr=' ';
+      iUsedDataBlock++;
+      cpFormatedPtr++;
+      continue;
+    }
+#if defined DEBUG_OUTPUT
+    elog(NOTICE,cpFieldData);
+    elog(NOTICE,"Starting format loop");
+    while(*cpUnFormatedPtr!=0) {
+      while(iDataBlockSize - iUsedDataBlock < 2) {
+       cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
+       iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
+       cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+      }
+      if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') {
+       *cpFormatedPtr='\\';
+       cpFormatedPtr++;
+       iUsedDataBlock++;
+      }
+      *cpFormatedPtr=*cpUnFormatedPtr;
+      cpFormatedPtr++;
+      cpUnFormatedPtr++;
+      iUsedDataBlock++;
+    }
+    SPI_pfree(cpFieldData);
+    while(iDataBlockSize - iUsedDataBlock < 3) {
+      cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
+      iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
+      cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+    }
+    sprintf(cpFormatedPtr,"' ");
+    iUsedDataBlock = iUsedDataBlock +2;
+#if defined DEBUG_OUTPUT
+    elog(NOTICE,cpDataBlock);
+  }                                     /*  for iColumnCounter  */
+  if(tpPKeys!=NULL) {
+    SPI_pfree(tpPKeys);    
+  }
+#if defined DEBUG_OUTPUT
+  elog(NOTICE,"Returning");
+  memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock);
+  return cpDataBlock;
+# Config file for DBMirror.pl
+# This file contains a sample configuration file for DBMirror.pl
+# It contains configuration information to mirror data from 
+# the master database to a single slave system.
+# $Id: slaveDatabase.conf,v 1.1 2002/06/23 21:58:08 momjian Exp $
+$masterHost = "masterMachine.mydomain.com";
+$masterDb = "myDatabase";
+$masterUser = "postgres";
+$masterPassword = "postgrespassword";
+# Where to email Error messages to
+# $errorEmailAddr = "me@mydomain.com";
+$slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com";
+$slaveInfo->{"slaveDb"} = "myDatabase";
+$slaveInfo->{"slaveUser"} = "postgres";
+$slaveInfo->{"slavePassword"} = "postgrespassword";