]> granicus.if.org Git - postgresql/commitdiff
Add replication email.
authorBruce Momjian <bruce@momjian.us>
Fri, 29 Jun 2001 20:10:12 +0000 (20:10 +0000)
committerBruce Momjian <bruce@momjian.us>
Fri, 29 Jun 2001 20:10:12 +0000 (20:10 +0000)
doc/TODO.detail/replication

index 0c27a4f79dd1ace9756ab06bb8e4f02c26321b36..564851b8f8deab1969b2e98d88c1ba6ca8caa030 100644 (file)
@@ -43,7 +43,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 10:01:18 1999
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA11295
        for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 11:01:17 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id KAA20310 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 10:39:18 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id KAA20310 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 10:39:18 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id KAA61760;
        Fri, 24 Dec 1999 10:31:13 -0500 (EST)
@@ -129,7 +129,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 18:31:03 1999
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id TAA26244
        for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:31:02 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id TAA12730 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:30:05 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id TAA12730 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 19:30:05 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id TAA57851;
        Fri, 24 Dec 1999 19:23:31 -0500 (EST)
@@ -212,7 +212,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 21:31:10 1999
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id WAA02578
        for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:31:09 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id WAA16641 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:18:56 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id WAA16641 for <pgman@candle.pha.pa.us>; Fri, 24 Dec 1999 22:18:56 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id WAA89135;
        Fri, 24 Dec 1999 22:11:12 -0500 (EST)
@@ -486,7 +486,7 @@ From owner-pgsql-hackers@hub.org Sun Dec 26 08:31:09 1999
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA17976
        for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:31:07 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id JAA23337 for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:28:36 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id JAA23337 for <pgman@candle.pha.pa.us>; Sun, 26 Dec 1999 09:28:36 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id JAA90738;
        Sun, 26 Dec 1999 09:21:58 -0500 (EST)
@@ -909,7 +909,7 @@ From owner-pgsql-hackers@hub.org Thu Dec 30 08:01:09 1999
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA10317
        for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 09:01:08 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id IAA02365 for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 08:37:10 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id IAA02365 for <pgman@candle.pha.pa.us>; Thu, 30 Dec 1999 08:37:10 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id IAA87902;
        Thu, 30 Dec 1999 08:34:22 -0500 (EST)
@@ -1006,7 +1006,7 @@ From owner-pgsql-patches@hub.org Sun Jan  2 23:01:38 2000
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA16274
        for <pgman@candle.pha.pa.us>; Mon, 3 Jan 2000 00:01:28 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id XAA02655 for <pgman@candle.pha.pa.us>; Sun, 2 Jan 2000 23:45:55 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id XAA02655 for <pgman@candle.pha.pa.us>; Sun, 2 Jan 2000 23:45:55 -0500 (EST)
 Received: from hub.org (hub.org [216.126.84.1])
        by hub.org (8.9.3/8.9.3) with ESMTP id XAA13828;
        Sun, 2 Jan 2000 23:40:47 -0500 (EST)
@@ -1424,7 +1424,7 @@ From owner-pgsql-hackers@hub.org Tue Jan  4 10:31:01 2000
 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4])
        by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA17522
        for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:31:00 -0500 (EST)
-Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id LAA01541 for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:27:30 -0500 (EST)
+Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id LAA01541 for <pgman@candle.pha.pa.us>; Tue, 4 Jan 2000 11:27:30 -0500 (EST)
 Received: from localhost (majordom@localhost)
        by hub.org (8.9.3/8.9.3) with SMTP id LAA09992;
        Tue, 4 Jan 2000 11:18:07 -0500 (EST)
@@ -1905,3 +1905,1723 @@ Great.
   +  If your life is a hard drive,     |  830 Blythe Avenue
   +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026
 
+From pgsql-general-owner+M805@postgresql.org Tue Nov 21 23:53:04 2000
+Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
+       by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA19262
+       for <pgman@candle.pha.pa.us>; Wed, 22 Nov 2000 00:53:03 -0500 (EST)
+Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28])
+       by mail.postgresql.org (8.11.1/8.11.1) with SMTP id eAM5qYs47249;
+       Wed, 22 Nov 2000 00:52:34 -0500 (EST)
+       (envelope-from pgsql-general-owner+M805@postgresql.org)
+Received: from racerx.cabrion.com (racerx.cabrion.com [166.82.231.4])
+       by mail.postgresql.org (8.11.1/8.11.1) with ESMTP id eAM5lJs46653
+       for <pgsql-general@postgresql.org>; Wed, 22 Nov 2000 00:47:19 -0500 (EST)
+       (envelope-from rob@cabrion.com)
+Received: from cabrionhome (gso163-25-211.triad.rr.com [24.163.25.211])
+       by racerx.cabrion.com (8.8.7/8.8.7) with SMTP id AAA13731
+       for <pgsql-general@postgresql.org>; Wed, 22 Nov 2000 00:45:20 -0500
+Message-ID: <006501c05447$fb9aa0c0$4100fd0a@cabrion.org>
+From: "rob" <rob@cabrion.com>
+To: <pgsql-general@postgresql.org>
+Subject: [GENERAL] Synchronization Toolkit
+Date: Wed, 22 Nov 2000 00:49:29 -0500
+MIME-Version: 1.0
+Content-Type: multipart/mixed;
+       boundary="----=_NextPart_000_0062_01C0541E.125CAF30"
+X-Priority: 3
+X-MSMail-Priority: Normal
+X-Mailer: Microsoft Outlook Express 5.50.4133.2400
+X-MimeOLE: Produced By Microsoft MimeOLE V5.50.4133.2400
+Precedence: bulk
+Sender: pgsql-general-owner@postgresql.org
+Status: OR
+
+This is a multi-part message in MIME format.
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: text/plain; charset="iso-8859-1"
+Content-Transfer-Encoding: 7bit
+
+Not to be confused with replication, my concept of synchronization is to
+manage changes between a server table (or tables) and one or more mobile,
+disconnected databases (i.e. PalmPilot, laptop, etc.).
+
+I read through the notes in the TODO for this topic and devised a tool kit
+for doing synchronization.  I hope that the Postgresql development community
+will find this useful and will help me refine this concept by offering
+insight, experience and some good old fashion hacking if you are so
+inclined.
+
+The bottom of this message describes how to use the attached files.
+
+I look forward to your feedback.
+
+--rob
+
+
+Methodology:
+
+I devised a concept that I call "session versioning".  This means that every
+time a row changes it does NOT get a new version.  Rather it gets stamped
+with the current session version common to all published tables.  Clients,
+when they connect for synchronization, will immediately increment this
+common version number reserve the result as a "post version" and then
+increment the session version again.  This version number, implemented as a
+sequence, is common to all synchronized tables and rows.
+
+Any time the server makes changes to the row gets stamped with the current
+session version, when the client posts its changes it uses the reserved
+"post version".  The client then makes all it's changes stamping the changed
+rows with it's reserved "post version" rather than the current version.  The
+reason why is explained later.  It is important that the client post all its
+own changes first so that it does not end up receiving records which changed
+since it's last session that it is about to update anyway.
+
+Reserving the post version is a two step process.  First, the number is
+simply stored in a variable for later use.  Second, the value is added to a
+lock table (last_stable) to indicate to any concurrent sessions that rows
+with higher version numbers are to be considered "unstable" at the moment
+and they should not attempt to retrieve them at this time.  Each client,
+upon connection, will use the lowest value in this lock table (max_version)
+to determine the upper boundary for versions it should retrieve.  The lower
+boundary is simply the  previous session's "max_version" plus one.  Thus
+when the client retrieves changes is uses the following SQL "where"
+expression:
+
+WHERE row_version >= max_version and row_version <= last_stable_version and
+version <> this_post_version
+
+The point of reserving and locking a post version is important in that it
+allows concurrent synchronization by multiple clients.  The first, of many,
+clients to connect basically dictates to all future clients that they must
+not take any rows equal to or greater than the one which it just reserved
+and locked.  The reason the session version is incremented a second time is
+so that the server may continue to post changes concurrent with any client
+changes and be certain that these concurrent server changes will not taint
+rows the client is about to retrieve. Once the client is finished with it's
+session it removes the lock on it's post version.
+
+Partitioning data for use by each node is the next challenge we face.  How
+can we control which "slice" of data each client receives?  A slice can be
+horizontal or vertical within a table.  Horizontal slices are easy,  it's
+just the where clause of an SQL statement that says "give me the rows that
+match X criteria".  We handle this by storing and appending a where clause
+to each client's retrieval statement  in addition to where clause described
+above.  Actually, two where clauses are stored and appended.  One is per
+client and one is per publication (table).
+
+We defined horizontal slices by filtering rows.  Vertical slices are limits
+by column.  The tool kit does provide a mechanism for pseudo vertical
+partitioning.  When a client is "subscribed" to a publication, the toolkit
+stores what columns that node is to receive during a session.  These are
+stored in the subscribed_cols table.  While this does limit the number
+columns transmitted, the insert/update/delete triggers do not recognize
+changes based on columns.   The "pseudo" nature of our vertical partitioning
+is evident by example:
+
+Say you have a table with name, address and phone number as columns.  You
+restrict a client to see only name and address.  This means that phone
+number information will not be sent to the client during synchronization,
+and the client can't attempt to alter the phone number of a given entry.
+Great, but . . . if, on the server, the phone number (but not the name or
+address) is changed, the entire row gets marked with a new version.  This
+means that the name and address will get sent to the client even though they
+didn't change.
+
+Well, there's the flaw in vertical partitioning.  Other than wasting
+bandwidth, the extra row does no harm to the process.  The workaround for
+this is to highly normalize your schema when possible.
+
+Collisions are the next crux one encounters with synchronization.  When two
+clients retrieve the same row and both make (different)changes, which one is
+correct?  So far the system operates totally independent of time.  This is
+good because it doesn't rely on the server or client to keep accurate time.
+We can just ignore time all together, but then we force our clients to
+synchronize on a strict schedule in order to avoid (or reduce) collisions.
+If every node synchronized immediately after making changes we could just
+stop here.  Unfortunately this isn't reality.  Reality dictates that of two
+clients: Client A & B will each pick up the same record on Monday.  A will
+make changes on Monday, then leave for vacation.  B will make changes on
+Wednesday because new information was gathered in A's absence.  Client B
+posts those changes Wednesday.  Meanwhile, client A returns from vacation on
+Friday and synchronizes his changes.  A over writes B's changes even though
+A made changes before the most recent information was posted by B.
+
+It is clear that we need some form of time stamp to cope with the above
+example.  While clocks aren't the most reliable, they are the only common
+version control available to solve this problem.  The system is set up to
+accept (but not require) timestamps from clients and changes on the server
+are time stamped.  The system, when presented a time stamp with a row, will
+compare them to figure out who wins in a tie.   The system makes certain
+"sanity" checks with regard to these time stamps.  A client may not attempt
+to post a change with a timestamp that is more than one hour in the future
+(according to what the server thinks "now" is) nor one hour before it's last
+synchronization date/time.  The client row will be immediately placed into
+the collision table if the timestamp is that far out of whack.
+Implementations of the tool kit should take care to ensure that client &
+server agree on what "now" is before attempting to submit changes with
+timestamps.
+
+Time stamps are not required.  Should a client be incapable of tracking
+timestamps, etc.  The system will assume that any server row which has been
+changed since the client's last session will win a tie.  This is quite error
+prone, so timestamps are encouraged where possible.
+
+Inserts pose an interesting challenge.  Since multiple clients cannot share
+a sequence (often used as a primary key) while disconnected.  They will be
+responsible for their own unique "row_id" when inserting records.   Inserts
+accept any arbitrary key, and write back to the client a special kind of
+update that gives the server's row_id.  The client is responsible for making
+sure that this update takes place locally.
+
+Deletes are the last portion of the process.  When deletes occur, the
+row_id, version, etc. are stored in a "deleted" table.  These entries are
+retrieved by the client using the same version filter as described above.
+The table is pruned at the end of each session by deleting all records with
+versions that are less than the lowest 'last_version' stored for each
+client.
+
+Having wrapped up the synchronization process, I'll move on to describe some
+points about managing clients, publications and the like.
+
+The tool kit is split into two objects: SyncManagement and Synchronization.
+The Synchronization object exposes an API that client implementations use to
+communicate and receive changes.  The management functions handle system
+install and uninstall in addition to publication of tables and client
+subscriptions.
+
+Installation and uninstallation are handled by their corresponding functions
+in the API.  All system tables are prefixed and suffixed with four
+underscores, in hopes that this avoids conflict with an existing tables.
+Calling the install function more than once will generate an error message.
+Uninstall will remove all related tables, sequences,  functions and triggers
+from the system.
+
+The first step, after installing the system, is to publish a table.  A table
+can be published more than once under different names.  Simply provide a
+unique name as the second argument to the publish function.  Since object
+names are restricted to 32 characters in Postgres, each table is given a
+unique id and this id is used to create the trigger and sequence names.
+Since one table can be published multiple times, but only needs one set of
+triggers and one sequence for change management a reference count is kept so
+that we know when to add/drop triggers and functions.  By default, all
+columns are published, but the third argument to the publish function
+accepts an array reference of column names that allows you to specify a
+limited set.  Information about the table is stored in the "tables" table,
+info about the publication is in the "publications" table and column names
+are stored in "subscribed_cols" table.
+
+The next step is to subscribe a client to a table.  A client is identified
+by a user name and a node name.  The subscribe function takes three
+arguments: user, node & publication.  The subscription process writes an
+entry into the "subscribed" table with default values.  Of note, the
+"RefreshOnce" attribute is set to true whenever a table is published.  This
+indicates to the system that a full table refresh should be sent the next
+time the client connects even if the client requests synchronization rather
+than refresh.
+
+The toolkit does not, yet, provide a way to manage the whereclause stored at
+either the publication or client level.  To use or test this feature, you
+will need to set the whereclause attributes manually.
+
+Tables and users can be unpublished and unsubscribed using the corresponding
+functions within the tool kit's management interface.  Because postgres
+lacks an "ALTER TABLE DROP COLUMN" function, the unpublish function only
+removes default values and indexes for those columns.
+
+The API isn't the most robust thing in the world right now.  All functions
+return undef on success and an error string otherwise (like DBD).  I hope to
+clean up the API considerably over the next month.  The code has not been
+field tested at this time.
+
+
+The files attached are:
+
+1) SynKit.pm (A perl module that contains install/uninstall functions and a
+simple api for synchronization & management)
+
+2) sync_install.pl (Sample code to demonstrate the installation, publishing
+and subscribe process)
+
+3) sync_uninstall.pl (Sample code to demonstrate the uninstallation,
+unpublishing and unsubscribe process)
+
+
+To use them on Linux (don't know about Win32 but should work fine):
+
+ - set up a test database and make SURE plpgsql is installed
+
+ - install perl 5.05 along with Date::Parse(TimeDate-1.1) , DBI and DBD::Pg
+modules [www.cpan.org]
+
+ - copy all three attached files to a test directory
+
+ - cd to your test directory
+
+ - edit all three files and change the three DBI variables to suit your
+system (they are clearly marked)
+
+ - % perl sync_install.pl
+
+ - check out the tables, functions & triggers installed
+
+ - % perl sync.pl
+
+ - check out the 'sync_test' table, do some updates/inserts/deletes and run
+sync.pl again
+        NOTE: Sanity checks default to allow no more than 50% of the table
+to be changed by the client in a single session.
+        If you delete all (or most of) the rows  you will get errors when
+you run sync.pl again! (by design)
+
+ - % perl sync_uninstall.pl  (when you are done)
+
+ - check out  the sample scripts and the perl module code (commented, but
+not documented)
+
+
+
+
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync.pl"
+
+
+
+# This script depicts the syncronization process for two users.
+
+
+##  CHANGE THESE THREE VARIABLE TO MATCH YOUR SYSTEM  ###########
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy';       #
+my $db_user =3D 'test';                                                #
+my $db_pass =3D 'test';                                                #
+#################################################################
+
+my $ret; #holds return value
+
+use SynKit;
+
+#create a synchronization object (pass dbi connection info)
+my $s =3D Synchronize->new($dbi_connect_string,$db_user,$db_pass);
+
+#start a session by passing a user name, "node" identifier and a collision =
+queue name (client or server)
+$ret =3D $s->start_session('JOE','REMOTE_NODE_NAME','server');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this once before attempting to apply individual changes
+$ret =3D $s->start_changes('sync_test',['name']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D  $s->apply_change(CLIENTROWID,'insert',undef,['ted']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D  $s->apply_change(CLIENTROWID,'insert','1973-11-10 11:25:00 AM -05=
+',['tim']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D  $s->apply_change(999,'update',undef,['tom']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this for each change the client wants to make to the database
+$ret =3D  $s->apply_change(1,'update',undef,['tom']);
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this once after all changes have been submitted
+$ret =3D $s->end_changes();
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to get updates from all subscribed tables
+$ret =3D $s->get_all_updates();
+print "Handle this error: $ret\n\n" if $ret;
+
+print "\n\nSyncronization session is complete. (JOE) \n\n";
+
+
+# make some changes to the database (server perspective)
+
+print "\n\nMaking changes to the the database. (server side) \n\n";
+
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+
+$dbh->do("insert into sync_test values ('roger')");
+$dbh->do("insert into sync_test values ('john')");
+$dbh->do("insert into sync_test values ('harry')");
+$dbh->do("delete from sync_test where name =3D 'roger'");
+$dbh->do("update sync_test set name =3D 'tom' where name =3D 'harry'");
+
+$dbh->disconnect;
+
+
+#now do another session for a different user
+
+#start a session by passing a user name, "node" identifier and a collision =
+queue name (client or server)
+$ret =3D $s->start_session('KEN','ANOTHER_REMOTE_NODE_NAME','server');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to get updates from all subscribed tables
+$ret =3D $s->get_all_updates();
+print "Handle this error: $ret\n\n" if $ret;
+
+print "\n\nSynchronization session is complete. (KEN)\n\n";
+
+print "Now look at your database and see what happend, make changes to the =
+test table, etc. and run this again.\n\n";
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync_uninstall.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync_uninstall.pl"
+
+
+# this script uninstalls the synchronization system using the SyncManager o=
+bject;
+
+use SynKit;
+
+###  CHANGE THESE TO MATCH YOUR SYSTEM   ########################
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy';       #
+my $db_user =3D 'test';                                                #
+my $db_pass =3D 'test';                                                #
+#################################################################
+
+
+my $ret; #holds return value
+
+#create an instance of the SyncManager object
+my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
+
+# call this to unsubscribe a user/node (not necessary if you are uninstalli=
+ng)
+print $m->unsubscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
+
+#call this to unpublish a table (not necessary if you are uninstalling)
+print $m->unpublish('sync_test');
+
+#call this to uninstall the syncronization system
+#  NOTE: this will automatically unpublish & unsubscribe all users
+print $m->UNINSTALL;
+
+# now let's drop our little test table
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+$dbh->do("drop table sync_test");
+$dbh->disconnect;
+
+print "\n\nI hope you enjoyed this little demonstration\n\n";
+
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="sync_install.pl"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="sync_install.pl"
+
+
+# This script shows how to install the synchronization system=20
+# using the SyncManager object
+
+use SynKit;
+
+### CHANGE THESE TO MATCH YOUR SYSTEM  ##########################
+my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy';       #
+my $db_user =3D 'test';                                                #
+my $db_pass =3D 'test';                                                #
+#################################################################
+my $ret; #holds return value
+
+
+#create an instance of the sync manager object
+my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass);
+
+#Call this to install the syncronization management tables, etc.
+$ret =3D $m->INSTALL;
+die "Handle this error: $ret\n\n" if $ret;
+
+
+
+#create a test table for us to demonstrate with
+use DBI;
+my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass);
+$dbh->do("create table sync_test (name text)");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('rob')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->do("insert into sync_test values ('ted')");
+$dbh->disconnect;
+
+
+
+
+#call this to "publish" a table
+$ret =3D $m->publish('sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to "subscribe" a user/node to a publication (table)
+$ret =3D $m->subscribe('JOE','REMOTE_NODE_NAME','sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+#call this to "subscribe" a user/node to a publication (table)
+$ret =3D $m->subscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test');
+print "Handle this error: $ret\n\n" if $ret;
+
+
+print "Now you can do: 'perl sync.pl' a few times to play\n\n";
+print "Do 'perl sync_uninstall.pl' to uninstall the system\n";
+
+
+------=_NextPart_000_0062_01C0541E.125CAF30
+Content-Type: application/octet-stream; name="SynKit.pm"
+Content-Transfer-Encoding: quoted-printable
+Content-Disposition: attachment; filename="SynKit.pm"
+
+# Perl DB synchronization toolkit
+
+#created for postgres 7.0.2 +
+use strict;
+
+BEGIN {
+        use vars       qw($VERSION);
+        # set the version for version checking
+        $VERSION     =3D 1.00;
+}
+
+
+package Synchronize;
+
+use DBI;
+
+use Date::Parse;
+
+# new requires 3 arguments: dbi connection string, plus the corresponding u=
+sername and password to get connected to the database
+sub new {
+       my $proto =3D shift;
+       my $class =3D ref($proto) || $proto;
+       my $self =3D {};
+
+       my $dbi =3D shift;
+       my $user =3D shift;
+       my $pass =3D shift;
+
+       $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
+to database: ".DBI->errstr();
+
+       $self->{user} =3D undef;
+       $self->{node} =3D undef;
+       $self->{status} =3D undef; # holds status of table update portion of sessi=
+on
+       $self->{pubs} =3D {}; #holds hash of pubs available to sessiom with val =
+=3D 1 if ok to request sync
+       $self->{orderpubs} =3D undef; #holds array ref of subscribed pubs ordered =
+by sync_order
+       $self->{this_post_ver} =3D undef; #holds the version number under which th=
+is session will post changes
+       $self->{max_ver} =3D undef; #holds the maximum safe version for getting up=
+dates
+       $self->{current} =3D {}; #holds the current publication info to which chan=
+ges are being applied
+       $self->{queue} =3D 'server'; # tells collide function what to do with coll=
+isions. (default is to hold on server)
+
+       $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
+ ".DBI->errstr();=20
+
+
+       return bless ($self, $class);
+}
+
+sub dblog {=20
+       my $self =3D shift;
+       my $msg =3D $self->{DBLOG}->quote($_[0]);
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+       $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
+, message) values($quser, $qnode, now(), $msg)");
+}
+
+
+#start_session establishes session wide information and other housekeeping =
+chores
+       # Accepts username, nodename and queue (client or server) as arguments;
+
+sub start_session {
+       my $self =3D shift;
+       $self->{user} =3D shift || die 'Username is required';
+       $self->{node} =3D shift || die 'Nodename is required';
+       $self->{queue} =3D shift;
+
+
+       if ($self->{queue} ne 'server' && $self->{queue} ne 'client') {
+               die "You must provide a queue argument of either 'server' or 'client'";
+       }
+
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+
+       my $sql =3D "select pubname from ____subscribed____ where username =3D $qu=
+ser and nodename =3D $qnode";
+       my @pubs =3D $self->GetColList($sql);
+
+       return 'User/Node has no subscriptions!' if !defined(@pubs);
+
+       # go though the list and check permissions and rules for each
+       foreach my $pub (@pubs) {
+               my $qpub =3D $self->{DBH}->quote($pub);
+               my $sql =3D "select disabled, pubname, fullrefreshonly, refreshonce,post_=
+ver from ____subscribed____ where username =3D $quser and pubname =3D $qpub=
+ and nodename =3D $qnode";
+               my $sth =3D $self->{DBH}->prepare($sql) || die $self->{DBH}->errstr;
+               $sth->execute || die $self->{DBH}->errstr;
+               my @row;
+               while (@row =3D $sth->fetchrow_array) {
+                       next if $row[0]; #publication is disabled
+                       next if !defined($row[1]); #publication does not exist (should never occ=
+ur)
+                       if ($row[2] || $row[3]) { #refresh of refresh once flag is set
+                               $self->{pubs}->{$pub} =3D 0; #refresh only
+                               next;
+                       }
+                       if (!defined($row[4])) { #no previous session exists, must refresh
+                               $self->{pubs}->{$pub} =3D 0; #refresh only
+                               next;
+                       }
+                       $self->{pubs}->{$pub} =3D 1; #OK for sync
+               }
+               $sth->finish;
+       }
+
+
+       $sql =3D "select pubname from ____publications____ order by sync_order";
+       my @op =3D $self->GetColList($sql);
+       my @orderpubs;
+
+       #loop through ordered pubs and remove non subscribed publications
+       foreach my $pub (@op) {
+               push @orderpubs, $pub if defined($self->{pubs}->{$pub});
+       }
+=09
+       $self->{orderpubs} =3D \@orderpubs;
+
+# Now we obtain a session version number, etc.
+
+       $self->{DBH}->{AutoCommit} =3D 0; #allows "transactions"
+       $self->{DBH}->{RaiseError} =3D 1; #script [or eval] will automatically die=
+ on errors
+
+       eval { #start DB transaction
+
+       #lock the version sequence until we determin that we have gotten
+       #a good  value.  Lock will be released on commit.
+               $self->{DBH}->do('lock ____version_seq____ in access exclusive mode');
+
+       # remove stale locks if they exist
+               my $sql =3D "delete from ____last_stable____ where username =3D $quser an=
+d nodename =3D $qnode";
+               $self->{DBH}->do($sql);
+
+       # increment version sequence & grab the next val as post_ver
+               my $sql =3D "select nextval('____version_seq____')";
+               my $sth =3D $self->{DBH}->prepare($sql);
+               $sth->execute;
+               ($self->{this_post_ver}) =3D $sth->fetchrow_array();
+               $sth->finish;
+       # grab max_ver from last_stable
+
+               $sql =3D "select min(version) from ____last_stable____";=20
+               $sth =3D $self->{DBH}->prepare($sql);
+               $sth->execute;
+               ($self->{max_ver}) =3D $sth->fetchrow_array();
+               $sth->finish;
+
+       # if there was no version in lock table, then take the ID that was in use
+       # when we started the session ($max_ver -1)
+
+               $self->{max_ver} =3D $self->{this_post_ver} -1 if (!defined($self->{max_v=
+er}));
+
+       # lock post_ver by placing it in last_stable
+               $self->{DBH}->do("insert into ____last_stable____ (version, username, nod=
+ename) values ($self->{this_post_ver}, $quser,$qnode)");
+
+       # increment version sequence again (discard result)
+               $sql =3D "select nextval('____version_seq____')";
+               $sth =3D $self->{DBH}->prepare($sql);
+               $sth->execute;
+               $sth->fetchrow_array();
+               $sth->finish;
+
+       }; #end eval/transaction
+
+       if ($@) { # part of transaction failed
+               return 'Start session failed';
+               $self->{DBH}->rollback;
+       } else { # all's well commit block
+               $self->{DBH}->commit;
+       }
+       $self->{DBH}->{AutoCommit} =3D 1;
+       $self->{DBH}->{RaiseError} =3D 0;
+
+       return undef;
+
+}
+
+#start changes should be called once before applying individual change requ=
+ests
+       # Requires publication and ref to columns that will be updated as arguments
+sub start_changes {
+       my $self =3D shift;
+       my $pub =3D shift || die 'Publication is required';
+       my $colref =3D shift || die 'Reference to column array is required';
+
+       $self->{status} =3D 'starting';
+
+       my $qpub =3D $self->{DBH}->quote($pub);
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+
+       my @cols =3D @{$colref};
+       my @subcols =3D $self->GetColList("select col_name from ____subscribed_col=
+s____ where username =3D $quser and nodename =3D $qnode and pubname =3D $qp=
+ub");
+       my %subcols;
+       foreach my $col (@subcols) {
+               $subcols{$col} =3D 1;
+       }
+       foreach my $col (@cols) {=09
+               return "User/node is not subscribed to column '$col'" if !$subcols{$col};
+       }
+
+       my $sql =3D "select pubname, readonly, last_session, post_ver, last_ver, w=
+hereclause, sanity_limit,=20
+sanity_delete, sanity_update, sanity_insert from ____subscribed____ where u=
+sername =3D $quser and pubname =3D $qpub and nodename =3D $qnode";
+       my ($junk, $readonly, $last_session, $post_ver, $last_ver, $whereclause, $=
+sanity_limit,=20
+$sanity_delete, $sanity_update, $sanity_insert) =3D $self->GetOneRow($sql);
+=09
+       return 'Publication is read only' if $readonly;
+
+       $sql =3D "select whereclause from ____publications____ where pubname =3D $=
+qpub";
+       my ($wc) =3D $self->GetOneRow($sql);
+       $whereclause =3D '('.$whereclause.')' if $whereclause;
+       $whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
+
+       my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
+__ where pubname =3D $qpub");
+
+       return 'Publication is not registered correctly' if !defined($table);
+
+       my %info;
+       $info{pub} =3D $pub;
+       $info{whereclause} =3D $whereclause;
+       $info{post_ver} =3D $post_ver;
+       $last_session =3D~ s/([+|-]\d\d?)$/ $1/;        #put a space before timezone=09
+       $last_session =3D str2time ($last_session); #convert to perltime (seconds =
+since 1970)
+       $info{last_session} =3D $last_session;
+       $info{last_ver} =3D $last_ver;
+       $info{table}  =3D $table;
+       $info{cols} =3D \@cols;
+
+       my $sql =3D "select count(oid) from $table";
+       $sql =3D $sql .' '.$whereclause if $whereclause;
+       my ($rowcount) =3D $self->GetOneRow($sql);
+
+       #calculate sanity levels (convert from % to number of rows)
+       # limits defined as less than 1 mean no limit
+       $info{sanitylimit} =3D $rowcount * ($sanity_limit / 100) if $sanity_limit =
+> 0;
+       $info{insertlimit} =3D $rowcount * ($sanity_insert / 100) if $sanity_inser=
+t > 0;
+       $info{updatelimit} =3D $rowcount * ($sanity_update / 100) if $sanity_updat=
+e > 0;
+       $info{deletelimit} =3D $rowcount * ($sanity_delete / 100) if $sanity_delet=
+e > 0;
+
+       $self->{sanitycount} =3D 0;
+       $self->{updatecount} =3D 0;
+       $self->{insertcount} =3D 0;
+       $self->{deletecount} =3D 0;
+
+       $self->{current} =3D \%info;
+
+       $self->{DBH}->{AutoCommit} =3D 0; #turn on transaction behavior so we can =
+roll back on sanity limits, etc.
+
+       $self->{status} =3D 'ready';
+
+       return undef;
+}
+
+#call this once all changes are submitted to commit them;
+sub end_changes {
+       my $self =3D shift;
+       return undef if $self->{status} ne 'ready';
+       $self->{DBH}->commit;
+       $self->{DBH}->{AutoCommit} =3D 1;
+       $self->{status} =3D 'success';
+       return undef;
+}
+
+#call apply_change once for each row level client update
+       # Accepts 4 params: rowid, action, timestamp and reference to data array
+       #       Note: timestamp can be undef, data can be undef
+       #               timestamp MUST be in perl time (secs since 1970)
+
+#this routine checks basic timestamp info and sanity limits, then passes th=
+e info along to do_action() for processing
+sub apply_change {
+       my $self =3D shift;
+       my $rowid =3D shift || return 'Row ID is required'; #don't die just for on=
+e bad row
+       my $action =3D shift || return 'Action is required'; #don't die just for o=
+ne bad row
+       my $timestamp =3D shift;
+       my $dataref =3D shift;
+       $action =3D lc($action);
+
+       $timestamp =3D str2time($timestamp) if $timestamp;
+
+       return 'Status failure, cannot accept changes: '.$self->{status} if $self-=
+>{status} ne 'ready';
+
+       my %info =3D %{$self->{current}};
+
+       $self->{sanitycount}++;
+       if ($info{sanitylimit} && $self->{sanitycount} > $info{sanitylimit}) {
+               # too many changes from client
+               my $ret =3D $self->sanity('limit');
+               return $ret if $ret;
+       }
+
+=09
+       if ($timestamp && $timestamp > time() + 3600) { # current time + one hour
+               #client's clock is way off, cannot submit changes in future
+               my $ret =3D $self->collide('future', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref, $self->{queue});
+               return $ret if $ret;
+       }
+
+       if ($timestamp && $timestamp < $info{last_session} - 3600) { # last sessio=
+n time less one hour
+               #client's clock is way off, cannot submit changes that occured before las=
+t sync date
+               my $ret =3D $self->collide('past', $info{table}, $rowid, $action, undef, =
+$timestamp, $dataref , $self->{queue});
+               return $ret if $ret;
+       }
+
+       my ($crow, $cver, $ctime); #current row,ver,time
+       if ($action ne 'insert') {
+               my $sql =3D "select ____rowid____, ____rowver____, ____stamp____ from $in=
+fo{table} where ____rowid____ =3D $rowid";
+               ($crow, $cver, $ctime) =3D $self->GetOneRow($sql);
+               if (!defined($crow)) {
+                       my $ret =3D $self->collide('norow', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref , $self->{queue});
+                       return $ret if $ret;=09=09
+               }
+
+               $ctime =3D~ s/([+|-]\d\d?)$/ $1/; #put space between timezone
+               $ctime =3D str2time($ctime) if $ctime; #convert to perl time
+
+               if ($timestamp) {
+                       if ($ctime < $timestamp) {
+                               my $ret =3D $self->collide('time', $info{table}, $rowid, $action, undef=
+, $timestamp, $dataref, $self->{queue} );=09=09
+                               return $ret if $ret;
+                       }
+
+               } else {
+                       if ($cver > $self->{this_post_ver}) {
+                               my $ret =3D $self->collide('version', $info{table}, $rowid, $action, un=
+def, $timestamp, $dataref, $self->{queue} );
+                               return $ret if $ret;
+                       }
+               }
+=09
+       }
+
+       if ($action eq 'insert') {
+               $self->{insertcount}++;
+               if ($info{insertlimit} && $self->{insertcount} > $info{insertlimit}) {
+                       # too many changes from client
+                       my $ret =3D $self->sanity('insert');
+                       return $ret if $ret;
+               }
+
+               my $qtable =3D $self->{DBH}->quote($info{table});
+               my ($rowidsequence) =3D '_'.$self->GetOneRow("select table_id from ____ta=
+bles____ where tablename =3D $qtable").'__rowid_seq';
+               return 'Table incorrectly registered, cannot get rowid sequence name: '.$=
+self->{DBH}->errstr() if not defined $rowidsequence;
+
+               my @data;
+               foreach my $val (@{$dataref}) {
+                       push @data, $self->{DBH}->quote($val);
+               }
+               my $sql =3D "insert into $info{table} (";
+               if ($timestamp) {
+                       $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____, ____stamp__=
+__) values (';
+                       $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.',\''.local=
+time($timestamp).'\')';
+               } else {
+                       $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____) values (';
+                       $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.')';
+               }
+               my $ret =3D $self->{DBH}->do($sql);
+               if (!$ret) {
+                       my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+                       return $ret if $ret;=09=09
+               }
+               my ($newrowid) =3D $self->GetOneRow("select currval('$rowidsequence')");
+               return 'Failed to get current rowid on inserted row'.$self->{DBH}->errstr=
+ if not defined $newrowid;
+               $self->changerowid($rowid, $newrowid);
+       }
+
+       if ($action eq 'update') {
+               $self->{updatecount}++;
+               if ($info{updatelimit} && $self->{updatecount} > $info{updatelimit}) {
+                       # too many changes from client
+                       my $ret =3D $self->sanity('update');
+                       return $ret if $ret;
+               }
+               my @data;
+               foreach my $val (@{$dataref}) {
+                       push @data, $self->{DBH}->quote($val);
+               }=09
+
+               my $sql =3D "update $info{table} set ";
+               my @cols =3D @{$info{cols}};
+               foreach my $col (@cols) {
+                       my $val =3D shift @data;
+                       $sql =3D $sql . "$col =3D $val,";
+               }
+               $sql =3D $sql." ____rowver____ =3D $self->{this_post_ver}";
+               $sql =3D $sql.", ____stamp____ =3D '".localtime($timestamp)."'" if $times=
+tamp;
+               $sql =3D $sql." where ____rowid____ =3D $rowid";
+               $sql =3D $sql." and $info{whereclause}" if $info{whereclause};
+               my $ret =3D $self->{DBH}->do($sql);
+               if (!$ret) {
+                       my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+                       return $ret if $ret;=09=09
+               }
+
+       }
+
+       if ($action eq 'delete') {
+               $self->{deletecount}++;
+               if ($info{deletelimit} && $self->{deletecount} > $info{deletelimit}) {
+                       # too many changes from client
+                       my $ret =3D $self->sanity('delete');
+                       return $ret if $ret;
+               }
+               if ($timestamp) {
+                       my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
+t_ver}, ____stamp____ =3D '".localtime($timestamp)."'  where ____rowid____ =
+=3D $rowid";
+                       $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+                       $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
+}->errstr;
+               } else {
+                       my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos=
+t_ver} where ____rowid____ =3D $rowid";
+                       $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+                       $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH=
+}->errstr;
+               }
+               my $sql =3D "delete from $info{table} where ____rowid____ =3D $rowid";
+               $sql =3D $sql . " where $info{whereclause}" if $info{whereclause};
+               my $ret =3D $self->{DBH}->do($sql);
+               if (!$ret) {
+                       my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,=
+ $action, undef, $timestamp, $dataref , $self->{queue});
+                       return $ret if $ret;=09=09
+               }
+}
+=09
+=09
+       return undef;
+}
+
+sub changerowid {
+       my $self =3D shift;
+       my $oldid =3D shift;
+       my $newid =3D shift;
+       $self->writeclient('changeid',"$oldid\t$newid");
+}
+
+#writes info to client
+sub writeclient {
+       my $self =3D shift;
+       my $type =3D shift;
+       my @info =3D @_;
+       print "$type: ",join("\t",@info),"\n";
+       return undef;
+}
+
+# Override this for custom behavior.  Default is to echo back the sanity fa=
+ilure reason.=20=20
+# If you want to override a collision, you can do so by returning undef.
+sub sanity {
+       my $self =3D shift;
+       my $reason =3D shift;
+       $self->{status} =3D 'sanity exceeded';
+       $self->{DBH}->rollback;
+       return $reason;
+}
+
+# Override this for custom behavior.  Default is to echo back the failure r=
+eason.=20=20
+# If you want to override a collision, you can do so by returning undef.
+sub collide {
+       my $self =3D shift;
+       my ($reason,$table,$rowid,$action,$rowver,$timestamp,$data, $queue) =3D @_;
+
+       my @data;
+       foreach my $val (@{$data}) {
+               push @data, $self->{DBH}->quote($val);
+       }=09
+
+       if ($reason =3D~ /integrity/i || $reason =3D~ /constraint/i) {
+               $self->{status} =3D 'intergrity violation';
+               $self->{DBH}->rollback;
+       }
+
+       my $datastring;
+       my @cols =3D @{$self->{current}->{cols}};
+       foreach my $col (@cols) {
+               my $val =3D shift @data;
+               $datastring =3D $datastring . "$col =3D $val,";
+       }
+       chop $datastring; #remove trailing comma
+
+       if ($queue eq 'server') {
+               $timestamp =3D localtime($timestamp) if defined($timestamp);
+               $rowid =3D $self->{DBH}->quote($rowid);
+               $rowid =3D 'null' if !defined($rowid);
+               $rowver =3D 'null' if !defined($rowver);
+               $timestamp =3D $self->{DBH}->quote($timestamp);
+               $data =3D $self->{DBH}->quote($data);
+               my $qtable =3D $self->{DBH}->quote($table);
+               my $qreason =3D $self->{DBH}->quote($reason);
+               my $qaction =3D $self->{DBH}->quote($action);
+               my $quser =3D $self->{DBH}->quote($self->{user});
+               my $qnode =3D $self->{DBH}->quote($self->{node});
+               $datastring =3D $self->{DBH}->quote($datastring);
+
+
+               my $sql =3D "insert into ____collision____ (rowid,
+tablename, rowver, stamp, data, reason, action, username,
+nodename, queue) values($rowid,$qtable, $rowver, $timestamp,$datastring,
+$qreason, $qaction,$quser, $qnode)";
+               $self->{DBH}->do($sql) || die 'Failed to write to collision table: '.$sel=
+f->{DBH}->errstr;
+
+       } else {
+
+               $self->writeclient('collision',$rowid,$table, $rowver, $timestamp,$reason=
+, $action,$self->{user}, $self->{node}, $data);
+
+       }
+       return $reason;
+}
+
+#calls get_updates once for each publication the user/node is subscribed to=
+ in correct sync_order
+sub get_all_updates {
+       my $self =3D shift;
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+
+       foreach my $pub (@{$self->{orderpubs}}) {
+               $self->get_updates($pub, 1); #request update as sync unless overrridden b=
+y flags
+       }
+
+}
+
+# Call this once for each table the client needs refreshed or sync'ed AFTER=
+ all inbound client changes have been posted
+#      Accepts publication and sync flag as arguments
+sub get_updates {
+       my $self =3D shift;
+       my $pub =3D shift || die 'Publication is required';
+       my $sync =3D shift;
+
+       my $qpub =3D $self->{DBH}->quote($pub);
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+
+       #enforce refresh and refreshonce flags
+       undef $sync if !$self->{pubs}->{$pub};=20
+
+
+       my %info =3D $self->{current};
+
+       my @cols =3D $self->GetColList("select col_name from ____subscribed_cols__=
+__ where username =3D $quser and nodename =3D $qnode and pubname =3D $qpub"=
+);;
+
+       my ($table) =3D $self->GetOneRow("select tablename from ____publications__=
+__ where pubname =3D $qpub");
+       return 'Table incorrectly registered for read' if !defined($table);
+       my $qtable =3D $self->{DBH}->quote($table);=09
+
+
+       my $sql =3D "select pubname, last_session, post_ver, last_ver, whereclause=
+ from ____subscribed____ where username =3D $quser and pubname =3D $qpub an=
+d nodename =3D $qnode";
+       my ($junk, $last_session, $post_ver, $last_ver, $whereclause) =3D $self->G=
+etOneRow($sql);
+
+       my ($wc) =3D $self->GetOneRow("select whereclause from ____publications___=
+_ where pubname =3D $qpub");
+
+       $whereclause =3D '('.$whereclause.')' if $whereclause;
+
+       $whereclause =3D $whereclause.' and ('.$wc.')' if $wc;
+
+
+       if ($sync) {
+               $self->writeclient('start synchronize', $pub);
+       } else {
+               $self->writeclient('start refresh', $pub);
+               $self->{DBH}->do("update ____subscribed____ set refreshonce =3D false whe=
+re pubname =3D $qpub and username =3D $quser and nodename =3D $qnode") || r=
+eturn 'Failed to clear RefreshOnce flag: '.$self->{DBH}->errstr;
+       }
+
+       $self->writeclient('columns',@cols);
+
+
+
+       my $sql =3D "select ____rowid____, ".join(',', @cols)." from $table";
+       if ($sync) {
+               $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver} and ____rowve=
+r____ > $last_ver)";
+               if (defined($self->{this_post_ver})) {
+                       $sql =3D $sql . " and (____rowver____ <> $post_ver)";
+               }
+       } else {
+               $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver})";
+       }
+       $sql =3D $sql." and $whereclause" if $whereclause;
+=09
+       my $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare S=
+QL for updates: '.$self->{DBH}->errstr;
+       $sth->execute || return 'Failed to execute SQL for updates: '.$self->{DBH}=
+->errstr;
+       my @row;
+       while (@row =3D $sth->fetchrow_array) {
+               $self->writeclient('update/insert',@row);
+       }
+
+       $sth->finish;
+
+       # now get deleted rows
+       if ($sync) {
+               $sql =3D "select rowid from ____deleted____ where (tablename =3D $qtable)=
+";
+               $sql =3D $sql." and (rowver <=3D $self->{max_ver} and rowver > $last_ver)=
+";
+               if (defined($self->{this_post_ver})) {
+                       $sql =3D $sql . " and (rowver <> $self->{this_post_ver})";
+               }
+               $sql =3D $sql." and $whereclause" if $whereclause;
+
+               $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL=
+ for deletes: '.$self->{DBH}->errstr;
+               $sth->execute || return 'Failed to execute SQL for deletes: '.$self->{DBH=
+}->errstr;
+               my @row;
+               while (@row =3D $sth->fetchrow_array) {
+                       $self->writeclient('delete',@row);
+               }
+
+               $sth->finish;
+       }
+
+       if ($sync) {
+               $self->writeclient('end synchronize', $pub);
+       } else {
+               $self->writeclient('end refresh', $pub);
+       }
+
+       my $qpub =3D $self->{DBH}->quote($pub);
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+
+       $self->{DBH}->do("update ____subscribed____ set last_ver =3D $self->{max_v=
+er}, last_session =3D now(), post_ver =3D $self->{this_post_ver} where user=
+name =3D $quser and nodename =3D $qnode and pubname =3D $qpub");
+       return undef;
+}
+
+
+# Call this once when everything else is done.  Does housekeeping.=20
+# (MAKE THIS AN OBJECT DESTRUCTOR?)
+sub DESTROY {
+       my $self =3D shift;
+
+#release version from lock table (including old ones)
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+       my $sql =3D "delete from ____last_stable____ where username =3D $quser and=
+ nodename =3D $qnode";
+       $self->{DBH}->do($sql);
+
+#clean up deleted table
+       my ($version) =3D $self->GetOneRow("select min(last_ver) from ____subscrib=
+ed____");
+       return undef if not defined $version;
+       $self->{DBH}->do("delete from ____deleted____ where rowver < $version") ||=
+ return 'Failed to prune deleted table'.$self->{DBH}->errstr;;
+
+
+#disconnect from DBD sessions
+       $self->{DBH}->disconnect;
+       $self->{DBLOG}->disconnect;
+       return undef;
+}
+
+############# Helper Subs ############
+sub GetColList {
+       my $self =3D shift;
+       my $sql =3D shift || die 'Must provide sql select statement';
+       my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+       $sth->execute || return undef;
+       my $val;
+       my @col;
+       while (($val) =3D $sth->fetchrow_array) {
+               push @col, $val;
+       }
+       $sth->finish;
+       return @col;
+}
+
+sub GetOneRow {
+       my $self =3D shift;
+       my $sql =3D shift || die 'Must provide sql select statement';
+       my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+       $sth->execute || return undef;
+       my @row =3D $sth->fetchrow_array;
+       $sth->finish;
+       return @row;
+}
+
+=20
+
+
+
+package SyncManager;
+
+use DBI;
+# new requires 3 arguments: dbi connection string, plus the corresponding u=
+sername and password
+
+sub new {
+       my $proto =3D shift;
+       my $class =3D ref($proto) || $proto;
+       my $self =3D {};
+
+       my $dbi =3D shift;
+       my $user =3D shift;
+       my $pass =3D shift;
+
+       $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect =
+to database: ".DBI->errstr();
+
+       $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:=
+ ".DBI->errstr();
+=09
+       return bless ($self, $class);
+}
+
+sub dblog {=20
+       my $self =3D shift;
+       my $msg =3D $self->{DBLOG}->quote($_[0]);
+       my $quser =3D $self->{DBH}->quote($self->{user});
+       my $qnode =3D $self->{DBH}->quote($self->{node});
+       $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp=
+, message) values($quser, $qnode, now(), $msg)");
+}
+
+#this should never need to be called, but it might if a node bails without =
+releasing their locks
+sub ReleaseAllLocks {
+       my $self =3D shift;
+       $self->{DBH}->do("delete from ____last_stable____)");
+}
+# Adds a publication to the system.  Also adds triggers, sequences, etc ass=
+ociated with the table if approproate.
+       # accepts two argument: the name of a physical table and the name under wh=
+ich to publish it=20
+       #       NOTE: the publication name is optional and will default to the table na=
+me if not supplied
+       # returns undef if ok, else error string;
+sub publish {
+       my $self =3D shift;
+       my $table =3D shift || die 'You must provide a table name (and optionally =
+a unique publication name)';
+       my $pub =3D shift;
+       $pub =3D $table if not defined($pub);
+
+       my $qpub =3D $self->{DBH}->quote($pub);
+       my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+       my ($junk) =3D $self->GetOneRow($sql);
+       return 'Publication already exists' if defined($junk);
+
+       my $qtable =3D $self->{DBH}->quote($table);
+
+       $sql =3D "select table_id, refcount from ____tables____ where tablename =
+=3D $qtable";
+       my ($id, $refcount) =3D $self->GetOneRow($sql);
+
+       if(!defined($id)) {
+               $self->{DBH}->do("insert into ____tables____ (tablename, refcount) values=
+ ($qtable,1)") || return 'Failed to register table: ' . $self->{DBH}->errst=
+r;
+               my $sql =3D "select table_id from ____tables____ where tablename =3D $qta=
+ble";
+               ($id) =3D $self->GetOneRow($sql);
+       }
+
+       if (defined($refcount)) {
+               $self->{DBH}->do("update ____tables____ set refcount =3D refcount+1 where=
+ table_id =3D $id") || return 'Failed to update refrence count: ' . $self->=
+{DBH}->errstr;
+       } else {
+=09=09
+               $id =3D '_'.$id.'_';=20
+
+               my @cols =3D $self->GetTableCols($table, 1); # 1 =3D get hidden cols too
+               my %skip;
+               foreach my $col (@cols) {
+                       $skip{$col} =3D 1;
+               }
+=09=09
+               if (!$skip{____rowver____}) {
+                       $self->{DBH}->do("alter table $table add column ____rowver____ int4"); #=
+don't fail here in case table is being republished, just accept the error s=
+ilently
+               }
+               $self->{DBH}->do("update $table set ____rowver____ =3D ____version_seq___=
+_.last_value - 1") || return 'Failed to initialize rowver: ' . $self->{DBH}=
+->errstr;
+
+               if (!$skip{____rowid____}) {
+                       $self->{DBH}->do("alter table $table add column ____rowid____ int4"); #d=
+on't fail here in case table is being republished, just accept the error si=
+lently
+               }
+
+               my $index =3D $id.'____rowid____idx';
+               $self->{DBH}->do("create index $index on $table(____rowid____)") || retur=
+n 'Failed to create rowid index: ' . $self->{DBH}->errstr;
+
+               my $sequence =3D $id.'_rowid_seq';
+               $self->{DBH}->do("create sequence $sequence") || return 'Failed to create=
+ rowver sequence: ' . $self->{DBH}->errstr;
+
+               $self->{DBH}->do("alter table $table alter column ____rowid____ set defau=
+lt nextval('$sequence')"); #don't fail here in case table is being republis=
+hed, just accept the error silently
+
+               $self->{DBH}->do("update $table set ____rowid____ =3D  nextval('$sequence=
+')") || return 'Failed to initialize rowid: ' . $self->{DBH}->errstr;
+
+               if (!$skip{____stamp____}) {
+                       $self->{DBH}->do("alter table $table add column ____stamp____ timestamp"=
+); #don't fail here in case table is being republished, just accept the err=
+or silently
+               }
+
+               $self->{DBH}->do("update $table set ____stamp____ =3D  now()") || return =
+'Failed to initialize stamp: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_ver_ins';
+               $self->{DBH}->do("create trigger $trigger before insert on $table for eac=
+h row execute procedure sync_insert_ver()") || return 'Failed to create tri=
+gger: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_ver_upd';
+               $self->{DBH}->do("create trigger $trigger before update on $table for eac=
+h row execute procedure sync_update_ver()") || return 'Failed to create tri=
+gger: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_del_row';
+               $self->{DBH}->do("create trigger $trigger after delete on $table for each=
+ row execute procedure sync_delete_row()") || return 'Failed to create trig=
+ger: ' . $self->{DBH}->errstr;
+       }
+
+       $self->{DBH}->do("insert into ____publications____ (pubname, tablename) va=
+lues ('$pub','$table')") || return 'Failed to create publication entry: '.$=
+self->{DBH}->errstr;
+
+       return undef;
+}
+
+
+# Removes a publication from the system.  Also drops triggers, sequences, e=
+tc associated with the table if approproate.
+       # accepts one argument: the name of a publication
+       # returns undef if ok, else error string;
+sub unpublish {
+       my $self =3D shift;
+       my $pub =3D shift || return 'You must provide a publication name';
+       my $qpub =3D $self->{DBH}->quote($pub);
+       my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+       my ($table) =3D $self->GetOneRow($sql);
+       return 'Publication does not exist' if !defined($table);
+
+       my $qtable =3D $self->{DBH}->quote($table);
+
+       $sql =3D "select table_id, refcount from ____tables____ where tablename =
+=3D $qtable";
+       my ($id, $refcount) =3D $self->GetOneRow($sql);
+       return 'Table: $table is not correctly registered!' if not defined($id);
+
+       $self->{DBH}->do("update ____tables____ set refcount =3D refcount -1 where=
+ tablename =3D $qtable") || return 'Failed to decrement reference count: ' =
+. $self->{DBH}->errstr;
+
+       $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub")=
+ || return 'Failed to delete user subscriptions: ' . $self->{DBH}->errstr;
+       $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
+pub") || return 'Failed to delete subscribed columns: ' . $self->{DBH}->err=
+str;
+       $self->{DBH}->do("delete from ____publications____ where tablename =3D $qt=
+able and pubname =3D $qpub") || return 'Failed to delete from publications:=
+ ' . $self->{DBH}->errstr;
+
+       #if this is the last reference, we want to drop triggers, etc;
+       if ($refcount <=3D 1) {
+               $id =3D "_".$id."_";
+
+               $self->{DBH}->do("alter table $table alter column ____rowver____ drop def=
+ault") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+               $self->{DBH}->do("alter table $table alter column ____rowid____ drop defa=
+ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+               $self->{DBH}->do("alter table $table alter column ____stamp____ drop defa=
+ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_ver_upd';
+               $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_ver_ins';
+               $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+               my $trigger =3D $id.'_del_row';
+               $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to =
+drop trigger: ' . $self->{DBH}->errstr;
+
+               my $sequence =3D $id.'_rowid_seq';
+               $self->{DBH}->do("drop sequence $sequence") || return 'Failed to drop seq=
+uence: ' . $self->{DBH}->errstr;
+
+               my $index =3D $id.'____rowid____idx';
+               $self->{DBH}->do("drop index $index") || return 'Failed to drop index: ' =
+. $self->{DBH}->errstr;
+               $self->{DBH}->do("delete from ____tables____ where tablename =3D $qtable"=
+) || return 'remove entry from tables: ' . $self->{DBH}->errstr;
+       }
+return undef;
+}
+
+
+
+
+
+#Subscribe user/node to a publication
+       # Accepts 3 arguements: Username, Nodename, Publication
+       #       NOTE: the remaining arguments can be supplied as column names to which =
+the user/node should be subscribed
+       # Return undef if ok, else returns an error string
+
+sub subscribe {
+       my $self =3D shift;
+       my $user =3D shift || die 'You must provide user, node and publication as =
+arguments';
+       my $node =3D shift || die 'You must provide user, node and publication as =
+arguments';
+       my $pub =3D shift || die 'You must provide user, node and publication as a=
+rguments';
+       my @cols =3D @_;
+
+       my $quser =3D $self->{DBH}->quote($user);
+       my $qnode =3D $self->{DBH}->quote($node);
+       my $qpub =3D $self->{DBH}->quote($pub);
+
+       my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+       my ($table) =3D $self->GetOneRow($sql);
+       return "Publication $pub does not exist." if not defined $table;
+       my $qtable =3D $self->{DBH}->quote($table);
+
+       @cols =3D $self->GetTableCols($table) if !@cols; # get defaults if cols we=
+re not spefified by caller
+
+       $self->{DBH}->do("insert into ____subscribed____ (username, nodename,pubna=
+me,last_ver,refreshonce) values('$user', '$node','$pub',0, true)") || retur=
+n 'Failes to create subscription: ' . $self->{DBH}->errstr;=09
+
+       foreach my $col (@cols) {
+               $self->{DBH}->do("insert into ____subscribed_cols____ (username, nodename=
+, pubname, col_name) values ('$user','$node','$pub','$col')") || return 'Fa=
+iles to subscribe column: ' . $self->{DBH}->errstr;=09
+       }
+
+       return undef;
+}
+
+
+#Unsubscribe user/node to a publication
+       # Accepts 3 arguements: Username, Nodename, Publication
+       # Return undef if ok, else returns an error string
+
+sub unsubscribe {
+       my $self =3D shift;
+       my $user =3D shift || die 'You must provide user, node and publication as =
+arguments';
+       my $node =3D shift || die 'You must provide user, node and publication as =
+arguments';
+       my $pub =3D shift || die 'You must provide user, node and publication as a=
+rguments';
+       my @cols =3D @_;
+
+       my $quser =3D $self->{DBH}->quote($user);
+       my $qnode =3D $self->{DBH}->quote($node);
+       my $qpub =3D $self->{DBH}->quote($pub);
+
+       my $sql =3D "select tablename from ____publications____ where pubname =3D =
+$qpub";
+       my $table =3D $self->GetOneRow($sql);
+       return "Publication $pub does not exist." if not defined $table;
+
+       $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q=
+pub and username =3D $quser and nodename =3D $qnode") || return 'Failed to =
+remove column subscription: '. $self->{DBH}->errstr;
+       $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub a=
+nd username =3D $quser and nodename =3D $qnode") || return 'Failed to remov=
+e subscription: '. $self->{DBH}->errstr;
+
+
+       return undef;
+}
+
+
+
+#INSTALL creates the necessary management tables.=20=20
+       #returns undef if everything is ok, else returns a string describing the e=
+rror;
+sub INSTALL {
+my $self =3D shift;
+
+#check to see if management tables are already installed
+
+my ($test) =3D $self->GetOneRow("select * from pg_class where relname =3D '=
+____publications____'");
+if (defined($test)) {
+       return 'It appears that synchronization manangement tables are already ins=
+talled here.  Please uninstall before reinstalling.';
+};
+
+
+
+#install the management tables, etc.
+
+$self->{DBH}->do("create table ____publications____ (pubname text primary k=
+ey,description text, tablename text, sync_order int4, whereclause text)") |=
+| return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____subscribed_cols____ (nodename text, user=
+name text, pubname text, col_name text, description text, primary key(noden=
+ame, username, pubname,col_name))") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____subscribed____ (nodename text, username =
+text, pubname text, last_session timestamp, post_ver int4, last_ver int4, w=
+hereclause text, sanity_limit int4 default 0, sanity_delete int4 default 0,=
+ sanity_update int4 default 0, sanity_insert int4 default 50, readonly bool=
+ean, disabled boolean, fullrefreshonly boolean, refreshonce boolean, primar=
+y key(nodename, username, pubname))") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____last_stable____ (version int4, username =
+text, nodename text, primary key(version, username, nodename))") || return =
+$self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____tables____ (tablename text, table_id int=
+4, refcount int4, primary key(tablename, table_id))") || return $self->{DBH=
+}->errstr();
+
+$self->{DBH}->do("create sequence ____table_id_seq____") || return $self->{=
+DBH}->errstr();
+
+$self->{DBH}->do("alter table ____tables____ alter column table_id set defa=
+ult nextval('____table_id_seq____')") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____deleted____ (rowid int4, tablename text,=
+ rowver int4, stamp timestamp, primary key (rowid, tablename))") || return =
+$self->{DBH}->errstr();
+
+$self->{DBH}->do("create table ____collision____ (rowid text, tablename tex=
+t, rowver int4, stamp timestamp, faildate timestamp default now(),data text=
+,reason text, action text, username text, nodename text,queue text)") || re=
+turn $self->{DBH}->errstr();
+
+$self->{DBH}->do("create sequence ____version_seq____") || return $self->{D=
+BH}->errstr();
+
+$self->{DBH}->do("create table ____sync_log____ (username text, nodename te=
+xt, stamp timestamp, message text)") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create function sync_insert_ver() returns opaque as
+'begin
+if new.____rowver____ isnull then
+new.____rowver____ :=3D ____version_seq____.last_value;
+end if;
+if new.____stamp____ isnull then
+new.____stamp____ :=3D now();
+end if;
+return NEW;
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+$self->{DBH}->do("create function sync_update_ver() returns opaque as
+'begin
+if new.____rowver____ =3D old.____rowver____ then
+new.____rowver____ :=3D ____version_seq____.last_value;
+end if;
+if new.____stamp____ =3D old.____stamp____ then
+new.____stamp____ :=3D now();
+end if;
+return NEW;
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+
+$self->{DBH}->do("create function sync_delete_row() returns opaque as=20
+'begin=20
+insert into ____deleted____ (rowid,tablename,rowver,stamp) values
+(old.____rowid____, TG_RELNAME, old.____rowver____,old.____stamp____);=20
+return old;=20
+end;' language 'plpgsql'") || return $self->{DBH}->errstr();
+
+return undef;
+}
+
+#removes all management tables & related stuff
+       #returns undef if ok, else returns an error message as a string
+sub UNINSTALL {
+my $self =3D shift;
+
+#Make sure all tables are unpublished first
+my $sth =3D $self->{DBH}->prepare("select pubname from ____publications____=
+");
+$sth->execute;
+my $pub;
+while (($pub) =3D $sth->fetchrow_array) {
+       $self->unpublish($pub);=09
+}
+$sth->finish;
+
+$self->{DBH}->do("drop table ____publications____") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop table ____subscribed_cols____") || return $self->{DB=
+H}->errstr();
+$self->{DBH}->do("drop table ____subscribed____") || return $self->{DBH}->e=
+rrstr();
+$self->{DBH}->do("drop table ____last_stable____") || return $self->{DBH}->=
+errstr();
+$self->{DBH}->do("drop table ____deleted____") || return $self->{DBH}->errs=
+tr();
+$self->{DBH}->do("drop table ____collision____") || return $self->{DBH}->er=
+rstr();
+$self->{DBH}->do("drop table ____tables____") || return $self->{DBH}->errst=
+r();
+$self->{DBH}->do("drop table ____sync_log____") || return $self->{DBH}->err=
+str();
+
+$self->{DBH}->do("drop sequence ____table_id_seq____") || return $self->{DB=
+H}->errstr();
+$self->{DBH}->do("drop sequence ____version_seq____") || return $self->{DBH=
+}->errstr();
+
+$self->{DBH}->do("drop function sync_insert_ver()") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop function sync_update_ver()") || return $self->{DBH}-=
+>errstr();
+$self->{DBH}->do("drop function sync_delete_row()") || return $self->{DBH}-=
+>errstr();
+
+return undef;
+
+}
+
+sub DESTROY {
+       my $self =3D shift;
+
+       $self->{DBH}->disconnect;
+       $self->{DBLOG}->disconnect;
+       return undef;
+}
+
+############# Helper Subs ############
+
+sub GetOneRow {
+       my $self =3D shift;
+       my $sql =3D shift || die 'Must provide sql select statement';
+       my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+       $sth->execute || return undef;
+       my @row =3D $sth->fetchrow_array;
+       $sth->finish;
+       return @row;
+}
+
+#call this with second non-zero value to get hidden columns
+sub GetTableCols {
+       my $self =3D shift;
+       my $table =3D shift || die 'Must provide table name';
+       my $wanthidden =3D shift;
+       my $sql =3D "select * from $table where 0 =3D 1";
+       my $sth =3D $self->{DBH}->prepare($sql) || return undef;
+       $sth->execute || return undef;
+       my @row =3D @{$sth->{NAME}};
+       $sth->finish;
+       return @row if $wanthidden;
+       my @cols;
+       foreach my $col (@row) {
+               next if $col eq '____rowver____';
+               next if $col eq '____stamp____';
+               next if $col eq '____rowid____';
+               push @cols, $col;=09
+       }
+       return @cols;
+}
+
+
+1; #happy require
+
+------=_NextPart_000_0062_01C0541E.125CAF30--
+
+