From 4fe8490b5dc62005943336ff2f65614eae01b8d2 Mon Sep 17 00:00:00 2001 From: Bruce Momjian Date: Fri, 29 Jun 2001 20:10:12 +0000 Subject: [PATCH] Add replication email. --- doc/TODO.detail/replication | 1734 ++++++++++++++++++++++++++++++++++- 1 file changed, 1727 insertions(+), 7 deletions(-) diff --git a/doc/TODO.detail/replication b/doc/TODO.detail/replication index 0c27a4f79d..564851b8f8 100644 --- a/doc/TODO.detail/replication +++ b/doc/TODO.detail/replication @@ -43,7 +43,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 10:01:18 1999 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA11295 for ; Fri, 24 Dec 1999 11:01:17 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id KAA20310 for ; Fri, 24 Dec 1999 10:39:18 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id KAA20310 for ; Fri, 24 Dec 1999 10:39:18 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id KAA61760; Fri, 24 Dec 1999 10:31:13 -0500 (EST) @@ -129,7 +129,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 18:31:03 1999 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id TAA26244 for ; Fri, 24 Dec 1999 19:31:02 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id TAA12730 for ; Fri, 24 Dec 1999 19:30:05 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id TAA12730 for ; Fri, 24 Dec 1999 19:30:05 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id TAA57851; Fri, 24 Dec 1999 19:23:31 -0500 (EST) @@ -212,7 +212,7 @@ From owner-pgsql-hackers@hub.org Fri Dec 24 21:31:10 1999 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id WAA02578 for ; Fri, 24 Dec 1999 22:31:09 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id WAA16641 for ; Fri, 24 Dec 1999 22:18:56 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id WAA16641 for ; Fri, 24 Dec 1999 22:18:56 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id WAA89135; Fri, 24 Dec 1999 22:11:12 -0500 (EST) @@ -486,7 +486,7 @@ From owner-pgsql-hackers@hub.org Sun Dec 26 08:31:09 1999 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA17976 for ; Sun, 26 Dec 1999 09:31:07 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id JAA23337 for ; Sun, 26 Dec 1999 09:28:36 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id JAA23337 for ; Sun, 26 Dec 1999 09:28:36 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id JAA90738; Sun, 26 Dec 1999 09:21:58 -0500 (EST) @@ -909,7 +909,7 @@ From owner-pgsql-hackers@hub.org Thu Dec 30 08:01:09 1999 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id JAA10317 for ; Thu, 30 Dec 1999 09:01:08 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id IAA02365 for ; Thu, 30 Dec 1999 08:37:10 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id IAA02365 for ; Thu, 30 Dec 1999 08:37:10 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id IAA87902; Thu, 30 Dec 1999 08:34:22 -0500 (EST) @@ -1006,7 +1006,7 @@ From owner-pgsql-patches@hub.org Sun Jan 2 23:01:38 2000 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA16274 for ; Mon, 3 Jan 2000 00:01:28 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id XAA02655 for ; Sun, 2 Jan 2000 23:45:55 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id XAA02655 for ; Sun, 2 Jan 2000 23:45:55 -0500 (EST) Received: from hub.org (hub.org [216.126.84.1]) by hub.org (8.9.3/8.9.3) with ESMTP id XAA13828; Sun, 2 Jan 2000 23:40:47 -0500 (EST) @@ -1424,7 +1424,7 @@ From owner-pgsql-hackers@hub.org Tue Jan 4 10:31:01 2000 Received: from renoir.op.net (root@renoir.op.net [207.29.195.4]) by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id LAA17522 for ; Tue, 4 Jan 2000 11:31:00 -0500 (EST) -Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.6 $) with ESMTP id LAA01541 for ; Tue, 4 Jan 2000 11:27:30 -0500 (EST) +Received: from hub.org (hub.org [216.126.84.1]) by renoir.op.net (o1/$Revision: 1.7 $) with ESMTP id LAA01541 for ; Tue, 4 Jan 2000 11:27:30 -0500 (EST) Received: from localhost (majordom@localhost) by hub.org (8.9.3/8.9.3) with SMTP id LAA09992; Tue, 4 Jan 2000 11:18:07 -0500 (EST) @@ -1905,3 +1905,1723 @@ Great. + If your life is a hard drive, | 830 Blythe Avenue + Christ can be your backup. | Drexel Hill, Pennsylvania 19026 +From pgsql-general-owner+M805@postgresql.org Tue Nov 21 23:53:04 2000 +Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28]) + by candle.pha.pa.us (8.9.0/8.9.0) with ESMTP id AAA19262 + for ; Wed, 22 Nov 2000 00:53:03 -0500 (EST) +Received: from mail.postgresql.org (webmail.postgresql.org [216.126.85.28]) + by mail.postgresql.org (8.11.1/8.11.1) with SMTP id eAM5qYs47249; + Wed, 22 Nov 2000 00:52:34 -0500 (EST) + (envelope-from pgsql-general-owner+M805@postgresql.org) +Received: from racerx.cabrion.com (racerx.cabrion.com [166.82.231.4]) + by mail.postgresql.org (8.11.1/8.11.1) with ESMTP id eAM5lJs46653 + for ; Wed, 22 Nov 2000 00:47:19 -0500 (EST) + (envelope-from rob@cabrion.com) +Received: from cabrionhome (gso163-25-211.triad.rr.com [24.163.25.211]) + by racerx.cabrion.com (8.8.7/8.8.7) with SMTP id AAA13731 + for ; Wed, 22 Nov 2000 00:45:20 -0500 +Message-ID: <006501c05447$fb9aa0c0$4100fd0a@cabrion.org> +From: "rob" +To: +Subject: [GENERAL] Synchronization Toolkit +Date: Wed, 22 Nov 2000 00:49:29 -0500 +MIME-Version: 1.0 +Content-Type: multipart/mixed; + boundary="----=_NextPart_000_0062_01C0541E.125CAF30" +X-Priority: 3 +X-MSMail-Priority: Normal +X-Mailer: Microsoft Outlook Express 5.50.4133.2400 +X-MimeOLE: Produced By Microsoft MimeOLE V5.50.4133.2400 +Precedence: bulk +Sender: pgsql-general-owner@postgresql.org +Status: OR + +This is a multi-part message in MIME format. + +------=_NextPart_000_0062_01C0541E.125CAF30 +Content-Type: text/plain; charset="iso-8859-1" +Content-Transfer-Encoding: 7bit + +Not to be confused with replication, my concept of synchronization is to +manage changes between a server table (or tables) and one or more mobile, +disconnected databases (i.e. PalmPilot, laptop, etc.). + +I read through the notes in the TODO for this topic and devised a tool kit +for doing synchronization. I hope that the Postgresql development community +will find this useful and will help me refine this concept by offering +insight, experience and some good old fashion hacking if you are so +inclined. + +The bottom of this message describes how to use the attached files. + +I look forward to your feedback. + +--rob + + +Methodology: + +I devised a concept that I call "session versioning". This means that every +time a row changes it does NOT get a new version. Rather it gets stamped +with the current session version common to all published tables. Clients, +when they connect for synchronization, will immediately increment this +common version number reserve the result as a "post version" and then +increment the session version again. This version number, implemented as a +sequence, is common to all synchronized tables and rows. + +Any time the server makes changes to the row gets stamped with the current +session version, when the client posts its changes it uses the reserved +"post version". The client then makes all it's changes stamping the changed +rows with it's reserved "post version" rather than the current version. The +reason why is explained later. It is important that the client post all its +own changes first so that it does not end up receiving records which changed +since it's last session that it is about to update anyway. + +Reserving the post version is a two step process. First, the number is +simply stored in a variable for later use. Second, the value is added to a +lock table (last_stable) to indicate to any concurrent sessions that rows +with higher version numbers are to be considered "unstable" at the moment +and they should not attempt to retrieve them at this time. Each client, +upon connection, will use the lowest value in this lock table (max_version) +to determine the upper boundary for versions it should retrieve. The lower +boundary is simply the previous session's "max_version" plus one. Thus +when the client retrieves changes is uses the following SQL "where" +expression: + +WHERE row_version >= max_version and row_version <= last_stable_version and +version <> this_post_version + +The point of reserving and locking a post version is important in that it +allows concurrent synchronization by multiple clients. The first, of many, +clients to connect basically dictates to all future clients that they must +not take any rows equal to or greater than the one which it just reserved +and locked. The reason the session version is incremented a second time is +so that the server may continue to post changes concurrent with any client +changes and be certain that these concurrent server changes will not taint +rows the client is about to retrieve. Once the client is finished with it's +session it removes the lock on it's post version. + +Partitioning data for use by each node is the next challenge we face. How +can we control which "slice" of data each client receives? A slice can be +horizontal or vertical within a table. Horizontal slices are easy, it's +just the where clause of an SQL statement that says "give me the rows that +match X criteria". We handle this by storing and appending a where clause +to each client's retrieval statement in addition to where clause described +above. Actually, two where clauses are stored and appended. One is per +client and one is per publication (table). + +We defined horizontal slices by filtering rows. Vertical slices are limits +by column. The tool kit does provide a mechanism for pseudo vertical +partitioning. When a client is "subscribed" to a publication, the toolkit +stores what columns that node is to receive during a session. These are +stored in the subscribed_cols table. While this does limit the number +columns transmitted, the insert/update/delete triggers do not recognize +changes based on columns. The "pseudo" nature of our vertical partitioning +is evident by example: + +Say you have a table with name, address and phone number as columns. You +restrict a client to see only name and address. This means that phone +number information will not be sent to the client during synchronization, +and the client can't attempt to alter the phone number of a given entry. +Great, but . . . if, on the server, the phone number (but not the name or +address) is changed, the entire row gets marked with a new version. This +means that the name and address will get sent to the client even though they +didn't change. + +Well, there's the flaw in vertical partitioning. Other than wasting +bandwidth, the extra row does no harm to the process. The workaround for +this is to highly normalize your schema when possible. + +Collisions are the next crux one encounters with synchronization. When two +clients retrieve the same row and both make (different)changes, which one is +correct? So far the system operates totally independent of time. This is +good because it doesn't rely on the server or client to keep accurate time. +We can just ignore time all together, but then we force our clients to +synchronize on a strict schedule in order to avoid (or reduce) collisions. +If every node synchronized immediately after making changes we could just +stop here. Unfortunately this isn't reality. Reality dictates that of two +clients: Client A & B will each pick up the same record on Monday. A will +make changes on Monday, then leave for vacation. B will make changes on +Wednesday because new information was gathered in A's absence. Client B +posts those changes Wednesday. Meanwhile, client A returns from vacation on +Friday and synchronizes his changes. A over writes B's changes even though +A made changes before the most recent information was posted by B. + +It is clear that we need some form of time stamp to cope with the above +example. While clocks aren't the most reliable, they are the only common +version control available to solve this problem. The system is set up to +accept (but not require) timestamps from clients and changes on the server +are time stamped. The system, when presented a time stamp with a row, will +compare them to figure out who wins in a tie. The system makes certain +"sanity" checks with regard to these time stamps. A client may not attempt +to post a change with a timestamp that is more than one hour in the future +(according to what the server thinks "now" is) nor one hour before it's last +synchronization date/time. The client row will be immediately placed into +the collision table if the timestamp is that far out of whack. +Implementations of the tool kit should take care to ensure that client & +server agree on what "now" is before attempting to submit changes with +timestamps. + +Time stamps are not required. Should a client be incapable of tracking +timestamps, etc. The system will assume that any server row which has been +changed since the client's last session will win a tie. This is quite error +prone, so timestamps are encouraged where possible. + +Inserts pose an interesting challenge. Since multiple clients cannot share +a sequence (often used as a primary key) while disconnected. They will be +responsible for their own unique "row_id" when inserting records. Inserts +accept any arbitrary key, and write back to the client a special kind of +update that gives the server's row_id. The client is responsible for making +sure that this update takes place locally. + +Deletes are the last portion of the process. When deletes occur, the +row_id, version, etc. are stored in a "deleted" table. These entries are +retrieved by the client using the same version filter as described above. +The table is pruned at the end of each session by deleting all records with +versions that are less than the lowest 'last_version' stored for each +client. + +Having wrapped up the synchronization process, I'll move on to describe some +points about managing clients, publications and the like. + +The tool kit is split into two objects: SyncManagement and Synchronization. +The Synchronization object exposes an API that client implementations use to +communicate and receive changes. The management functions handle system +install and uninstall in addition to publication of tables and client +subscriptions. + +Installation and uninstallation are handled by their corresponding functions +in the API. All system tables are prefixed and suffixed with four +underscores, in hopes that this avoids conflict with an existing tables. +Calling the install function more than once will generate an error message. +Uninstall will remove all related tables, sequences, functions and triggers +from the system. + +The first step, after installing the system, is to publish a table. A table +can be published more than once under different names. Simply provide a +unique name as the second argument to the publish function. Since object +names are restricted to 32 characters in Postgres, each table is given a +unique id and this id is used to create the trigger and sequence names. +Since one table can be published multiple times, but only needs one set of +triggers and one sequence for change management a reference count is kept so +that we know when to add/drop triggers and functions. By default, all +columns are published, but the third argument to the publish function +accepts an array reference of column names that allows you to specify a +limited set. Information about the table is stored in the "tables" table, +info about the publication is in the "publications" table and column names +are stored in "subscribed_cols" table. + +The next step is to subscribe a client to a table. A client is identified +by a user name and a node name. The subscribe function takes three +arguments: user, node & publication. The subscription process writes an +entry into the "subscribed" table with default values. Of note, the +"RefreshOnce" attribute is set to true whenever a table is published. This +indicates to the system that a full table refresh should be sent the next +time the client connects even if the client requests synchronization rather +than refresh. + +The toolkit does not, yet, provide a way to manage the whereclause stored at +either the publication or client level. To use or test this feature, you +will need to set the whereclause attributes manually. + +Tables and users can be unpublished and unsubscribed using the corresponding +functions within the tool kit's management interface. Because postgres +lacks an "ALTER TABLE DROP COLUMN" function, the unpublish function only +removes default values and indexes for those columns. + +The API isn't the most robust thing in the world right now. All functions +return undef on success and an error string otherwise (like DBD). I hope to +clean up the API considerably over the next month. The code has not been +field tested at this time. + + +The files attached are: + +1) SynKit.pm (A perl module that contains install/uninstall functions and a +simple api for synchronization & management) + +2) sync_install.pl (Sample code to demonstrate the installation, publishing +and subscribe process) + +3) sync_uninstall.pl (Sample code to demonstrate the uninstallation, +unpublishing and unsubscribe process) + + +To use them on Linux (don't know about Win32 but should work fine): + + - set up a test database and make SURE plpgsql is installed + + - install perl 5.05 along with Date::Parse(TimeDate-1.1) , DBI and DBD::Pg +modules [www.cpan.org] + + - copy all three attached files to a test directory + + - cd to your test directory + + - edit all three files and change the three DBI variables to suit your +system (they are clearly marked) + + - % perl sync_install.pl + + - check out the tables, functions & triggers installed + + - % perl sync.pl + + - check out the 'sync_test' table, do some updates/inserts/deletes and run +sync.pl again + NOTE: Sanity checks default to allow no more than 50% of the table +to be changed by the client in a single session. + If you delete all (or most of) the rows you will get errors when +you run sync.pl again! (by design) + + - % perl sync_uninstall.pl (when you are done) + + - check out the sample scripts and the perl module code (commented, but +not documented) + + + + + + +------=_NextPart_000_0062_01C0541E.125CAF30 +Content-Type: application/octet-stream; name="sync.pl" +Content-Transfer-Encoding: quoted-printable +Content-Disposition: attachment; filename="sync.pl" + + + +# This script depicts the syncronization process for two users. + + +## CHANGE THESE THREE VARIABLE TO MATCH YOUR SYSTEM ########### +my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; # +my $db_user =3D 'test'; # +my $db_pass =3D 'test'; # +################################################################# + +my $ret; #holds return value + +use SynKit; + +#create a synchronization object (pass dbi connection info) +my $s =3D Synchronize->new($dbi_connect_string,$db_user,$db_pass); + +#start a session by passing a user name, "node" identifier and a collision = +queue name (client or server) +$ret =3D $s->start_session('JOE','REMOTE_NODE_NAME','server'); +print "Handle this error: $ret\n\n" if $ret; + +#call this once before attempting to apply individual changes +$ret =3D $s->start_changes('sync_test',['name']); +print "Handle this error: $ret\n\n" if $ret; + +#call this for each change the client wants to make to the database +$ret =3D $s->apply_change(CLIENTROWID,'insert',undef,['ted']); +print "Handle this error: $ret\n\n" if $ret; + +#call this for each change the client wants to make to the database +$ret =3D $s->apply_change(CLIENTROWID,'insert','1973-11-10 11:25:00 AM -05= +',['tim']); +print "Handle this error: $ret\n\n" if $ret; + +#call this for each change the client wants to make to the database +$ret =3D $s->apply_change(999,'update',undef,['tom']); +print "Handle this error: $ret\n\n" if $ret; + +#call this for each change the client wants to make to the database +$ret =3D $s->apply_change(1,'update',undef,['tom']); +print "Handle this error: $ret\n\n" if $ret; + +#call this once after all changes have been submitted +$ret =3D $s->end_changes(); +print "Handle this error: $ret\n\n" if $ret; + +#call this to get updates from all subscribed tables +$ret =3D $s->get_all_updates(); +print "Handle this error: $ret\n\n" if $ret; + +print "\n\nSyncronization session is complete. (JOE) \n\n"; + + +# make some changes to the database (server perspective) + +print "\n\nMaking changes to the the database. (server side) \n\n"; + +use DBI; +my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass); + +$dbh->do("insert into sync_test values ('roger')"); +$dbh->do("insert into sync_test values ('john')"); +$dbh->do("insert into sync_test values ('harry')"); +$dbh->do("delete from sync_test where name =3D 'roger'"); +$dbh->do("update sync_test set name =3D 'tom' where name =3D 'harry'"); + +$dbh->disconnect; + + +#now do another session for a different user + +#start a session by passing a user name, "node" identifier and a collision = +queue name (client or server) +$ret =3D $s->start_session('KEN','ANOTHER_REMOTE_NODE_NAME','server'); +print "Handle this error: $ret\n\n" if $ret; + +#call this to get updates from all subscribed tables +$ret =3D $s->get_all_updates(); +print "Handle this error: $ret\n\n" if $ret; + +print "\n\nSynchronization session is complete. (KEN)\n\n"; + +print "Now look at your database and see what happend, make changes to the = +test table, etc. and run this again.\n\n"; + +------=_NextPart_000_0062_01C0541E.125CAF30 +Content-Type: application/octet-stream; name="sync_uninstall.pl" +Content-Transfer-Encoding: quoted-printable +Content-Disposition: attachment; filename="sync_uninstall.pl" + + +# this script uninstalls the synchronization system using the SyncManager o= +bject; + +use SynKit; + +### CHANGE THESE TO MATCH YOUR SYSTEM ######################## +my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; # +my $db_user =3D 'test'; # +my $db_pass =3D 'test'; # +################################################################# + + +my $ret; #holds return value + +#create an instance of the SyncManager object +my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass); + +# call this to unsubscribe a user/node (not necessary if you are uninstalli= +ng) +print $m->unsubscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test'); + +#call this to unpublish a table (not necessary if you are uninstalling) +print $m->unpublish('sync_test'); + +#call this to uninstall the syncronization system +# NOTE: this will automatically unpublish & unsubscribe all users +print $m->UNINSTALL; + +# now let's drop our little test table +use DBI; +my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass); +$dbh->do("drop table sync_test"); +$dbh->disconnect; + +print "\n\nI hope you enjoyed this little demonstration\n\n"; + + + +------=_NextPart_000_0062_01C0541E.125CAF30 +Content-Type: application/octet-stream; name="sync_install.pl" +Content-Transfer-Encoding: quoted-printable +Content-Disposition: attachment; filename="sync_install.pl" + + +# This script shows how to install the synchronization system=20 +# using the SyncManager object + +use SynKit; + +### CHANGE THESE TO MATCH YOUR SYSTEM ########################## +my $dbi_connect_string =3D 'dbi:Pg:dbname=3Dtest;host=3Dsnoopy'; # +my $db_user =3D 'test'; # +my $db_pass =3D 'test'; # +################################################################# +my $ret; #holds return value + + +#create an instance of the sync manager object +my $m =3D SyncManager->new($dbi_connect_string,$db_user,$db_pass); + +#Call this to install the syncronization management tables, etc. +$ret =3D $m->INSTALL; +die "Handle this error: $ret\n\n" if $ret; + + + +#create a test table for us to demonstrate with +use DBI; +my $dbh =3D DBI->connect($dbi_connect_string,$db_user,$db_pass); +$dbh->do("create table sync_test (name text)"); +$dbh->do("insert into sync_test values ('rob')"); +$dbh->do("insert into sync_test values ('rob')"); +$dbh->do("insert into sync_test values ('rob')"); +$dbh->do("insert into sync_test values ('ted')"); +$dbh->do("insert into sync_test values ('ted')"); +$dbh->do("insert into sync_test values ('ted')"); +$dbh->disconnect; + + + + +#call this to "publish" a table +$ret =3D $m->publish('sync_test'); +print "Handle this error: $ret\n\n" if $ret; + +#call this to "subscribe" a user/node to a publication (table) +$ret =3D $m->subscribe('JOE','REMOTE_NODE_NAME','sync_test'); +print "Handle this error: $ret\n\n" if $ret; + +#call this to "subscribe" a user/node to a publication (table) +$ret =3D $m->subscribe('KEN','ANOTHER_REMOTE_NODE_NAME','sync_test'); +print "Handle this error: $ret\n\n" if $ret; + + +print "Now you can do: 'perl sync.pl' a few times to play\n\n"; +print "Do 'perl sync_uninstall.pl' to uninstall the system\n"; + + +------=_NextPart_000_0062_01C0541E.125CAF30 +Content-Type: application/octet-stream; name="SynKit.pm" +Content-Transfer-Encoding: quoted-printable +Content-Disposition: attachment; filename="SynKit.pm" + +# Perl DB synchronization toolkit + +#created for postgres 7.0.2 + +use strict; + +BEGIN { + use vars qw($VERSION); + # set the version for version checking + $VERSION =3D 1.00; +} + + +package Synchronize; + +use DBI; + +use Date::Parse; + +# new requires 3 arguments: dbi connection string, plus the corresponding u= +sername and password to get connected to the database +sub new { + my $proto =3D shift; + my $class =3D ref($proto) || $proto; + my $self =3D {}; + + my $dbi =3D shift; + my $user =3D shift; + my $pass =3D shift; + + $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect = +to database: ".DBI->errstr(); + + $self->{user} =3D undef; + $self->{node} =3D undef; + $self->{status} =3D undef; # holds status of table update portion of sessi= +on + $self->{pubs} =3D {}; #holds hash of pubs available to sessiom with val = +=3D 1 if ok to request sync + $self->{orderpubs} =3D undef; #holds array ref of subscribed pubs ordered = +by sync_order + $self->{this_post_ver} =3D undef; #holds the version number under which th= +is session will post changes + $self->{max_ver} =3D undef; #holds the maximum safe version for getting up= +dates + $self->{current} =3D {}; #holds the current publication info to which chan= +ges are being applied + $self->{queue} =3D 'server'; # tells collide function what to do with coll= +isions. (default is to hold on server) + + $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:= + ".DBI->errstr();=20 + + + return bless ($self, $class); +} + +sub dblog {=20 + my $self =3D shift; + my $msg =3D $self->{DBLOG}->quote($_[0]); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp= +, message) values($quser, $qnode, now(), $msg)"); +} + + +#start_session establishes session wide information and other housekeeping = +chores + # Accepts username, nodename and queue (client or server) as arguments; + +sub start_session { + my $self =3D shift; + $self->{user} =3D shift || die 'Username is required'; + $self->{node} =3D shift || die 'Nodename is required'; + $self->{queue} =3D shift; + + + if ($self->{queue} ne 'server' && $self->{queue} ne 'client') { + die "You must provide a queue argument of either 'server' or 'client'"; + } + + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + + my $sql =3D "select pubname from ____subscribed____ where username =3D $qu= +ser and nodename =3D $qnode"; + my @pubs =3D $self->GetColList($sql); + + return 'User/Node has no subscriptions!' if !defined(@pubs); + + # go though the list and check permissions and rules for each + foreach my $pub (@pubs) { + my $qpub =3D $self->{DBH}->quote($pub); + my $sql =3D "select disabled, pubname, fullrefreshonly, refreshonce,post_= +ver from ____subscribed____ where username =3D $quser and pubname =3D $qpub= + and nodename =3D $qnode"; + my $sth =3D $self->{DBH}->prepare($sql) || die $self->{DBH}->errstr; + $sth->execute || die $self->{DBH}->errstr; + my @row; + while (@row =3D $sth->fetchrow_array) { + next if $row[0]; #publication is disabled + next if !defined($row[1]); #publication does not exist (should never occ= +ur) + if ($row[2] || $row[3]) { #refresh of refresh once flag is set + $self->{pubs}->{$pub} =3D 0; #refresh only + next; + } + if (!defined($row[4])) { #no previous session exists, must refresh + $self->{pubs}->{$pub} =3D 0; #refresh only + next; + } + $self->{pubs}->{$pub} =3D 1; #OK for sync + } + $sth->finish; + } + + + $sql =3D "select pubname from ____publications____ order by sync_order"; + my @op =3D $self->GetColList($sql); + my @orderpubs; + + #loop through ordered pubs and remove non subscribed publications + foreach my $pub (@op) { + push @orderpubs, $pub if defined($self->{pubs}->{$pub}); + } +=09 + $self->{orderpubs} =3D \@orderpubs; + +# Now we obtain a session version number, etc. + + $self->{DBH}->{AutoCommit} =3D 0; #allows "transactions" + $self->{DBH}->{RaiseError} =3D 1; #script [or eval] will automatically die= + on errors + + eval { #start DB transaction + + #lock the version sequence until we determin that we have gotten + #a good value. Lock will be released on commit. + $self->{DBH}->do('lock ____version_seq____ in access exclusive mode'); + + # remove stale locks if they exist + my $sql =3D "delete from ____last_stable____ where username =3D $quser an= +d nodename =3D $qnode"; + $self->{DBH}->do($sql); + + # increment version sequence & grab the next val as post_ver + my $sql =3D "select nextval('____version_seq____')"; + my $sth =3D $self->{DBH}->prepare($sql); + $sth->execute; + ($self->{this_post_ver}) =3D $sth->fetchrow_array(); + $sth->finish; + # grab max_ver from last_stable + + $sql =3D "select min(version) from ____last_stable____";=20 + $sth =3D $self->{DBH}->prepare($sql); + $sth->execute; + ($self->{max_ver}) =3D $sth->fetchrow_array(); + $sth->finish; + + # if there was no version in lock table, then take the ID that was in use + # when we started the session ($max_ver -1) + + $self->{max_ver} =3D $self->{this_post_ver} -1 if (!defined($self->{max_v= +er})); + + # lock post_ver by placing it in last_stable + $self->{DBH}->do("insert into ____last_stable____ (version, username, nod= +ename) values ($self->{this_post_ver}, $quser,$qnode)"); + + # increment version sequence again (discard result) + $sql =3D "select nextval('____version_seq____')"; + $sth =3D $self->{DBH}->prepare($sql); + $sth->execute; + $sth->fetchrow_array(); + $sth->finish; + + }; #end eval/transaction + + if ($@) { # part of transaction failed + return 'Start session failed'; + $self->{DBH}->rollback; + } else { # all's well commit block + $self->{DBH}->commit; + } + $self->{DBH}->{AutoCommit} =3D 1; + $self->{DBH}->{RaiseError} =3D 0; + + return undef; + +} + +#start changes should be called once before applying individual change requ= +ests + # Requires publication and ref to columns that will be updated as arguments +sub start_changes { + my $self =3D shift; + my $pub =3D shift || die 'Publication is required'; + my $colref =3D shift || die 'Reference to column array is required'; + + $self->{status} =3D 'starting'; + + my $qpub =3D $self->{DBH}->quote($pub); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + + my @cols =3D @{$colref}; + my @subcols =3D $self->GetColList("select col_name from ____subscribed_col= +s____ where username =3D $quser and nodename =3D $qnode and pubname =3D $qp= +ub"); + my %subcols; + foreach my $col (@subcols) { + $subcols{$col} =3D 1; + } + foreach my $col (@cols) {=09 + return "User/node is not subscribed to column '$col'" if !$subcols{$col}; + } + + my $sql =3D "select pubname, readonly, last_session, post_ver, last_ver, w= +hereclause, sanity_limit,=20 +sanity_delete, sanity_update, sanity_insert from ____subscribed____ where u= +sername =3D $quser and pubname =3D $qpub and nodename =3D $qnode"; + my ($junk, $readonly, $last_session, $post_ver, $last_ver, $whereclause, $= +sanity_limit,=20 +$sanity_delete, $sanity_update, $sanity_insert) =3D $self->GetOneRow($sql); +=09 + return 'Publication is read only' if $readonly; + + $sql =3D "select whereclause from ____publications____ where pubname =3D $= +qpub"; + my ($wc) =3D $self->GetOneRow($sql); + $whereclause =3D '('.$whereclause.')' if $whereclause; + $whereclause =3D $whereclause.' and ('.$wc.')' if $wc; + + my ($table) =3D $self->GetOneRow("select tablename from ____publications__= +__ where pubname =3D $qpub"); + + return 'Publication is not registered correctly' if !defined($table); + + my %info; + $info{pub} =3D $pub; + $info{whereclause} =3D $whereclause; + $info{post_ver} =3D $post_ver; + $last_session =3D~ s/([+|-]\d\d?)$/ $1/; #put a space before timezone=09 + $last_session =3D str2time ($last_session); #convert to perltime (seconds = +since 1970) + $info{last_session} =3D $last_session; + $info{last_ver} =3D $last_ver; + $info{table} =3D $table; + $info{cols} =3D \@cols; + + my $sql =3D "select count(oid) from $table"; + $sql =3D $sql .' '.$whereclause if $whereclause; + my ($rowcount) =3D $self->GetOneRow($sql); + + #calculate sanity levels (convert from % to number of rows) + # limits defined as less than 1 mean no limit + $info{sanitylimit} =3D $rowcount * ($sanity_limit / 100) if $sanity_limit = +> 0; + $info{insertlimit} =3D $rowcount * ($sanity_insert / 100) if $sanity_inser= +t > 0; + $info{updatelimit} =3D $rowcount * ($sanity_update / 100) if $sanity_updat= +e > 0; + $info{deletelimit} =3D $rowcount * ($sanity_delete / 100) if $sanity_delet= +e > 0; + + $self->{sanitycount} =3D 0; + $self->{updatecount} =3D 0; + $self->{insertcount} =3D 0; + $self->{deletecount} =3D 0; + + $self->{current} =3D \%info; + + $self->{DBH}->{AutoCommit} =3D 0; #turn on transaction behavior so we can = +roll back on sanity limits, etc. + + $self->{status} =3D 'ready'; + + return undef; +} + +#call this once all changes are submitted to commit them; +sub end_changes { + my $self =3D shift; + return undef if $self->{status} ne 'ready'; + $self->{DBH}->commit; + $self->{DBH}->{AutoCommit} =3D 1; + $self->{status} =3D 'success'; + return undef; +} + +#call apply_change once for each row level client update + # Accepts 4 params: rowid, action, timestamp and reference to data array + # Note: timestamp can be undef, data can be undef + # timestamp MUST be in perl time (secs since 1970) + +#this routine checks basic timestamp info and sanity limits, then passes th= +e info along to do_action() for processing +sub apply_change { + my $self =3D shift; + my $rowid =3D shift || return 'Row ID is required'; #don't die just for on= +e bad row + my $action =3D shift || return 'Action is required'; #don't die just for o= +ne bad row + my $timestamp =3D shift; + my $dataref =3D shift; + $action =3D lc($action); + + $timestamp =3D str2time($timestamp) if $timestamp; + + return 'Status failure, cannot accept changes: '.$self->{status} if $self-= +>{status} ne 'ready'; + + my %info =3D %{$self->{current}}; + + $self->{sanitycount}++; + if ($info{sanitylimit} && $self->{sanitycount} > $info{sanitylimit}) { + # too many changes from client + my $ret =3D $self->sanity('limit'); + return $ret if $ret; + } + +=09 + if ($timestamp && $timestamp > time() + 3600) { # current time + one hour + #client's clock is way off, cannot submit changes in future + my $ret =3D $self->collide('future', $info{table}, $rowid, $action, undef= +, $timestamp, $dataref, $self->{queue}); + return $ret if $ret; + } + + if ($timestamp && $timestamp < $info{last_session} - 3600) { # last sessio= +n time less one hour + #client's clock is way off, cannot submit changes that occured before las= +t sync date + my $ret =3D $self->collide('past', $info{table}, $rowid, $action, undef, = +$timestamp, $dataref , $self->{queue}); + return $ret if $ret; + } + + my ($crow, $cver, $ctime); #current row,ver,time + if ($action ne 'insert') { + my $sql =3D "select ____rowid____, ____rowver____, ____stamp____ from $in= +fo{table} where ____rowid____ =3D $rowid"; + ($crow, $cver, $ctime) =3D $self->GetOneRow($sql); + if (!defined($crow)) { + my $ret =3D $self->collide('norow', $info{table}, $rowid, $action, undef= +, $timestamp, $dataref , $self->{queue}); + return $ret if $ret;=09=09 + } + + $ctime =3D~ s/([+|-]\d\d?)$/ $1/; #put space between timezone + $ctime =3D str2time($ctime) if $ctime; #convert to perl time + + if ($timestamp) { + if ($ctime < $timestamp) { + my $ret =3D $self->collide('time', $info{table}, $rowid, $action, undef= +, $timestamp, $dataref, $self->{queue} );=09=09 + return $ret if $ret; + } + + } else { + if ($cver > $self->{this_post_ver}) { + my $ret =3D $self->collide('version', $info{table}, $rowid, $action, un= +def, $timestamp, $dataref, $self->{queue} ); + return $ret if $ret; + } + } +=09 + } + + if ($action eq 'insert') { + $self->{insertcount}++; + if ($info{insertlimit} && $self->{insertcount} > $info{insertlimit}) { + # too many changes from client + my $ret =3D $self->sanity('insert'); + return $ret if $ret; + } + + my $qtable =3D $self->{DBH}->quote($info{table}); + my ($rowidsequence) =3D '_'.$self->GetOneRow("select table_id from ____ta= +bles____ where tablename =3D $qtable").'__rowid_seq'; + return 'Table incorrectly registered, cannot get rowid sequence name: '.$= +self->{DBH}->errstr() if not defined $rowidsequence; + + my @data; + foreach my $val (@{$dataref}) { + push @data, $self->{DBH}->quote($val); + } + my $sql =3D "insert into $info{table} ("; + if ($timestamp) { + $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____, ____stamp__= +__) values ('; + $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.',\''.local= +time($timestamp).'\')'; + } else { + $sql =3D $sql . join(',',@{$info{cols}}) . ',____rowver____) values ('; + $sql =3D $sql . join (',',@data) .','.$self->{this_post_ver}.')'; + } + my $ret =3D $self->{DBH}->do($sql); + if (!$ret) { + my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,= + $action, undef, $timestamp, $dataref , $self->{queue}); + return $ret if $ret;=09=09 + } + my ($newrowid) =3D $self->GetOneRow("select currval('$rowidsequence')"); + return 'Failed to get current rowid on inserted row'.$self->{DBH}->errstr= + if not defined $newrowid; + $self->changerowid($rowid, $newrowid); + } + + if ($action eq 'update') { + $self->{updatecount}++; + if ($info{updatelimit} && $self->{updatecount} > $info{updatelimit}) { + # too many changes from client + my $ret =3D $self->sanity('update'); + return $ret if $ret; + } + my @data; + foreach my $val (@{$dataref}) { + push @data, $self->{DBH}->quote($val); + }=09 + + my $sql =3D "update $info{table} set "; + my @cols =3D @{$info{cols}}; + foreach my $col (@cols) { + my $val =3D shift @data; + $sql =3D $sql . "$col =3D $val,"; + } + $sql =3D $sql." ____rowver____ =3D $self->{this_post_ver}"; + $sql =3D $sql.", ____stamp____ =3D '".localtime($timestamp)."'" if $times= +tamp; + $sql =3D $sql." where ____rowid____ =3D $rowid"; + $sql =3D $sql." and $info{whereclause}" if $info{whereclause}; + my $ret =3D $self->{DBH}->do($sql); + if (!$ret) { + my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,= + $action, undef, $timestamp, $dataref , $self->{queue}); + return $ret if $ret;=09=09 + } + + } + + if ($action eq 'delete') { + $self->{deletecount}++; + if ($info{deletelimit} && $self->{deletecount} > $info{deletelimit}) { + # too many changes from client + my $ret =3D $self->sanity('delete'); + return $ret if $ret; + } + if ($timestamp) { + my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos= +t_ver}, ____stamp____ =3D '".localtime($timestamp)."' where ____rowid____ = +=3D $rowid"; + $sql =3D $sql . " where $info{whereclause}" if $info{whereclause}; + $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH= +}->errstr; + } else { + my $sql =3D "update $info{table} set ____rowver____ =3D $self->{this_pos= +t_ver} where ____rowid____ =3D $rowid"; + $sql =3D $sql . " where $info{whereclause}" if $info{whereclause}; + $self->{DBH}->do($sql) || return 'Predelete update failed: '.$self->{DBH= +}->errstr; + } + my $sql =3D "delete from $info{table} where ____rowid____ =3D $rowid"; + $sql =3D $sql . " where $info{whereclause}" if $info{whereclause}; + my $ret =3D $self->{DBH}->do($sql); + if (!$ret) { + my $ret =3D $self->collide($self->{DBH}->errstr(), $info{table}, $rowid,= + $action, undef, $timestamp, $dataref , $self->{queue}); + return $ret if $ret;=09=09 + } +} +=09 +=09 + return undef; +} + +sub changerowid { + my $self =3D shift; + my $oldid =3D shift; + my $newid =3D shift; + $self->writeclient('changeid',"$oldid\t$newid"); +} + +#writes info to client +sub writeclient { + my $self =3D shift; + my $type =3D shift; + my @info =3D @_; + print "$type: ",join("\t",@info),"\n"; + return undef; +} + +# Override this for custom behavior. Default is to echo back the sanity fa= +ilure reason.=20=20 +# If you want to override a collision, you can do so by returning undef. +sub sanity { + my $self =3D shift; + my $reason =3D shift; + $self->{status} =3D 'sanity exceeded'; + $self->{DBH}->rollback; + return $reason; +} + +# Override this for custom behavior. Default is to echo back the failure r= +eason.=20=20 +# If you want to override a collision, you can do so by returning undef. +sub collide { + my $self =3D shift; + my ($reason,$table,$rowid,$action,$rowver,$timestamp,$data, $queue) =3D @_; + + my @data; + foreach my $val (@{$data}) { + push @data, $self->{DBH}->quote($val); + }=09 + + if ($reason =3D~ /integrity/i || $reason =3D~ /constraint/i) { + $self->{status} =3D 'intergrity violation'; + $self->{DBH}->rollback; + } + + my $datastring; + my @cols =3D @{$self->{current}->{cols}}; + foreach my $col (@cols) { + my $val =3D shift @data; + $datastring =3D $datastring . "$col =3D $val,"; + } + chop $datastring; #remove trailing comma + + if ($queue eq 'server') { + $timestamp =3D localtime($timestamp) if defined($timestamp); + $rowid =3D $self->{DBH}->quote($rowid); + $rowid =3D 'null' if !defined($rowid); + $rowver =3D 'null' if !defined($rowver); + $timestamp =3D $self->{DBH}->quote($timestamp); + $data =3D $self->{DBH}->quote($data); + my $qtable =3D $self->{DBH}->quote($table); + my $qreason =3D $self->{DBH}->quote($reason); + my $qaction =3D $self->{DBH}->quote($action); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + $datastring =3D $self->{DBH}->quote($datastring); + + + my $sql =3D "insert into ____collision____ (rowid, +tablename, rowver, stamp, data, reason, action, username, +nodename, queue) values($rowid,$qtable, $rowver, $timestamp,$datastring, +$qreason, $qaction,$quser, $qnode)"; + $self->{DBH}->do($sql) || die 'Failed to write to collision table: '.$sel= +f->{DBH}->errstr; + + } else { + + $self->writeclient('collision',$rowid,$table, $rowver, $timestamp,$reason= +, $action,$self->{user}, $self->{node}, $data); + + } + return $reason; +} + +#calls get_updates once for each publication the user/node is subscribed to= + in correct sync_order +sub get_all_updates { + my $self =3D shift; + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + + foreach my $pub (@{$self->{orderpubs}}) { + $self->get_updates($pub, 1); #request update as sync unless overrridden b= +y flags + } + +} + +# Call this once for each table the client needs refreshed or sync'ed AFTER= + all inbound client changes have been posted +# Accepts publication and sync flag as arguments +sub get_updates { + my $self =3D shift; + my $pub =3D shift || die 'Publication is required'; + my $sync =3D shift; + + my $qpub =3D $self->{DBH}->quote($pub); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + + #enforce refresh and refreshonce flags + undef $sync if !$self->{pubs}->{$pub};=20 + + + my %info =3D $self->{current}; + + my @cols =3D $self->GetColList("select col_name from ____subscribed_cols__= +__ where username =3D $quser and nodename =3D $qnode and pubname =3D $qpub"= +);; + + my ($table) =3D $self->GetOneRow("select tablename from ____publications__= +__ where pubname =3D $qpub"); + return 'Table incorrectly registered for read' if !defined($table); + my $qtable =3D $self->{DBH}->quote($table);=09 + + + my $sql =3D "select pubname, last_session, post_ver, last_ver, whereclause= + from ____subscribed____ where username =3D $quser and pubname =3D $qpub an= +d nodename =3D $qnode"; + my ($junk, $last_session, $post_ver, $last_ver, $whereclause) =3D $self->G= +etOneRow($sql); + + my ($wc) =3D $self->GetOneRow("select whereclause from ____publications___= +_ where pubname =3D $qpub"); + + $whereclause =3D '('.$whereclause.')' if $whereclause; + + $whereclause =3D $whereclause.' and ('.$wc.')' if $wc; + + + if ($sync) { + $self->writeclient('start synchronize', $pub); + } else { + $self->writeclient('start refresh', $pub); + $self->{DBH}->do("update ____subscribed____ set refreshonce =3D false whe= +re pubname =3D $qpub and username =3D $quser and nodename =3D $qnode") || r= +eturn 'Failed to clear RefreshOnce flag: '.$self->{DBH}->errstr; + } + + $self->writeclient('columns',@cols); + + + + my $sql =3D "select ____rowid____, ".join(',', @cols)." from $table"; + if ($sync) { + $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver} and ____rowve= +r____ > $last_ver)"; + if (defined($self->{this_post_ver})) { + $sql =3D $sql . " and (____rowver____ <> $post_ver)"; + } + } else { + $sql =3D $sql." where (____rowver____ <=3D $self->{max_ver})"; + } + $sql =3D $sql." and $whereclause" if $whereclause; +=09 + my $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare S= +QL for updates: '.$self->{DBH}->errstr; + $sth->execute || return 'Failed to execute SQL for updates: '.$self->{DBH}= +->errstr; + my @row; + while (@row =3D $sth->fetchrow_array) { + $self->writeclient('update/insert',@row); + } + + $sth->finish; + + # now get deleted rows + if ($sync) { + $sql =3D "select rowid from ____deleted____ where (tablename =3D $qtable)= +"; + $sql =3D $sql." and (rowver <=3D $self->{max_ver} and rowver > $last_ver)= +"; + if (defined($self->{this_post_ver})) { + $sql =3D $sql . " and (rowver <> $self->{this_post_ver})"; + } + $sql =3D $sql." and $whereclause" if $whereclause; + + $sth =3D $self->{DBH}->prepare($sql) || return 'Failed to get prepare SQL= + for deletes: '.$self->{DBH}->errstr; + $sth->execute || return 'Failed to execute SQL for deletes: '.$self->{DBH= +}->errstr; + my @row; + while (@row =3D $sth->fetchrow_array) { + $self->writeclient('delete',@row); + } + + $sth->finish; + } + + if ($sync) { + $self->writeclient('end synchronize', $pub); + } else { + $self->writeclient('end refresh', $pub); + } + + my $qpub =3D $self->{DBH}->quote($pub); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + + $self->{DBH}->do("update ____subscribed____ set last_ver =3D $self->{max_v= +er}, last_session =3D now(), post_ver =3D $self->{this_post_ver} where user= +name =3D $quser and nodename =3D $qnode and pubname =3D $qpub"); + return undef; +} + + +# Call this once when everything else is done. Does housekeeping.=20 +# (MAKE THIS AN OBJECT DESTRUCTOR?) +sub DESTROY { + my $self =3D shift; + +#release version from lock table (including old ones) + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + my $sql =3D "delete from ____last_stable____ where username =3D $quser and= + nodename =3D $qnode"; + $self->{DBH}->do($sql); + +#clean up deleted table + my ($version) =3D $self->GetOneRow("select min(last_ver) from ____subscrib= +ed____"); + return undef if not defined $version; + $self->{DBH}->do("delete from ____deleted____ where rowver < $version") ||= + return 'Failed to prune deleted table'.$self->{DBH}->errstr;; + + +#disconnect from DBD sessions + $self->{DBH}->disconnect; + $self->{DBLOG}->disconnect; + return undef; +} + +############# Helper Subs ############ +sub GetColList { + my $self =3D shift; + my $sql =3D shift || die 'Must provide sql select statement'; + my $sth =3D $self->{DBH}->prepare($sql) || return undef; + $sth->execute || return undef; + my $val; + my @col; + while (($val) =3D $sth->fetchrow_array) { + push @col, $val; + } + $sth->finish; + return @col; +} + +sub GetOneRow { + my $self =3D shift; + my $sql =3D shift || die 'Must provide sql select statement'; + my $sth =3D $self->{DBH}->prepare($sql) || return undef; + $sth->execute || return undef; + my @row =3D $sth->fetchrow_array; + $sth->finish; + return @row; +} + +=20 + + + +package SyncManager; + +use DBI; +# new requires 3 arguments: dbi connection string, plus the corresponding u= +sername and password + +sub new { + my $proto =3D shift; + my $class =3D ref($proto) || $proto; + my $self =3D {}; + + my $dbi =3D shift; + my $user =3D shift; + my $pass =3D shift; + + $self->{DBH} =3D DBI->connect($dbi,$user,$pass) || die "Failed to connect = +to database: ".DBI->errstr(); + + $self->{DBLOG}=3D DBI->connect($dbi,$user,$pass) || die "cannot log to DB:= + ".DBI->errstr(); +=09 + return bless ($self, $class); +} + +sub dblog {=20 + my $self =3D shift; + my $msg =3D $self->{DBLOG}->quote($_[0]); + my $quser =3D $self->{DBH}->quote($self->{user}); + my $qnode =3D $self->{DBH}->quote($self->{node}); + $self->{DBLOG}->do("insert into ____sync_log____ (username, nodename,stamp= +, message) values($quser, $qnode, now(), $msg)"); +} + +#this should never need to be called, but it might if a node bails without = +releasing their locks +sub ReleaseAllLocks { + my $self =3D shift; + $self->{DBH}->do("delete from ____last_stable____)"); +} +# Adds a publication to the system. Also adds triggers, sequences, etc ass= +ociated with the table if approproate. + # accepts two argument: the name of a physical table and the name under wh= +ich to publish it=20 + # NOTE: the publication name is optional and will default to the table na= +me if not supplied + # returns undef if ok, else error string; +sub publish { + my $self =3D shift; + my $table =3D shift || die 'You must provide a table name (and optionally = +a unique publication name)'; + my $pub =3D shift; + $pub =3D $table if not defined($pub); + + my $qpub =3D $self->{DBH}->quote($pub); + my $sql =3D "select tablename from ____publications____ where pubname =3D = +$qpub"; + my ($junk) =3D $self->GetOneRow($sql); + return 'Publication already exists' if defined($junk); + + my $qtable =3D $self->{DBH}->quote($table); + + $sql =3D "select table_id, refcount from ____tables____ where tablename = +=3D $qtable"; + my ($id, $refcount) =3D $self->GetOneRow($sql); + + if(!defined($id)) { + $self->{DBH}->do("insert into ____tables____ (tablename, refcount) values= + ($qtable,1)") || return 'Failed to register table: ' . $self->{DBH}->errst= +r; + my $sql =3D "select table_id from ____tables____ where tablename =3D $qta= +ble"; + ($id) =3D $self->GetOneRow($sql); + } + + if (defined($refcount)) { + $self->{DBH}->do("update ____tables____ set refcount =3D refcount+1 where= + table_id =3D $id") || return 'Failed to update refrence count: ' . $self->= +{DBH}->errstr; + } else { +=09=09 + $id =3D '_'.$id.'_';=20 + + my @cols =3D $self->GetTableCols($table, 1); # 1 =3D get hidden cols too + my %skip; + foreach my $col (@cols) { + $skip{$col} =3D 1; + } +=09=09 + if (!$skip{____rowver____}) { + $self->{DBH}->do("alter table $table add column ____rowver____ int4"); #= +don't fail here in case table is being republished, just accept the error s= +ilently + } + $self->{DBH}->do("update $table set ____rowver____ =3D ____version_seq___= +_.last_value - 1") || return 'Failed to initialize rowver: ' . $self->{DBH}= +->errstr; + + if (!$skip{____rowid____}) { + $self->{DBH}->do("alter table $table add column ____rowid____ int4"); #d= +on't fail here in case table is being republished, just accept the error si= +lently + } + + my $index =3D $id.'____rowid____idx'; + $self->{DBH}->do("create index $index on $table(____rowid____)") || retur= +n 'Failed to create rowid index: ' . $self->{DBH}->errstr; + + my $sequence =3D $id.'_rowid_seq'; + $self->{DBH}->do("create sequence $sequence") || return 'Failed to create= + rowver sequence: ' . $self->{DBH}->errstr; + + $self->{DBH}->do("alter table $table alter column ____rowid____ set defau= +lt nextval('$sequence')"); #don't fail here in case table is being republis= +hed, just accept the error silently + + $self->{DBH}->do("update $table set ____rowid____ =3D nextval('$sequence= +')") || return 'Failed to initialize rowid: ' . $self->{DBH}->errstr; + + if (!$skip{____stamp____}) { + $self->{DBH}->do("alter table $table add column ____stamp____ timestamp"= +); #don't fail here in case table is being republished, just accept the err= +or silently + } + + $self->{DBH}->do("update $table set ____stamp____ =3D now()") || return = +'Failed to initialize stamp: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_ver_ins'; + $self->{DBH}->do("create trigger $trigger before insert on $table for eac= +h row execute procedure sync_insert_ver()") || return 'Failed to create tri= +gger: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_ver_upd'; + $self->{DBH}->do("create trigger $trigger before update on $table for eac= +h row execute procedure sync_update_ver()") || return 'Failed to create tri= +gger: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_del_row'; + $self->{DBH}->do("create trigger $trigger after delete on $table for each= + row execute procedure sync_delete_row()") || return 'Failed to create trig= +ger: ' . $self->{DBH}->errstr; + } + + $self->{DBH}->do("insert into ____publications____ (pubname, tablename) va= +lues ('$pub','$table')") || return 'Failed to create publication entry: '.$= +self->{DBH}->errstr; + + return undef; +} + + +# Removes a publication from the system. Also drops triggers, sequences, e= +tc associated with the table if approproate. + # accepts one argument: the name of a publication + # returns undef if ok, else error string; +sub unpublish { + my $self =3D shift; + my $pub =3D shift || return 'You must provide a publication name'; + my $qpub =3D $self->{DBH}->quote($pub); + my $sql =3D "select tablename from ____publications____ where pubname =3D = +$qpub"; + my ($table) =3D $self->GetOneRow($sql); + return 'Publication does not exist' if !defined($table); + + my $qtable =3D $self->{DBH}->quote($table); + + $sql =3D "select table_id, refcount from ____tables____ where tablename = +=3D $qtable"; + my ($id, $refcount) =3D $self->GetOneRow($sql); + return 'Table: $table is not correctly registered!' if not defined($id); + + $self->{DBH}->do("update ____tables____ set refcount =3D refcount -1 where= + tablename =3D $qtable") || return 'Failed to decrement reference count: ' = +. $self->{DBH}->errstr; + + $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub")= + || return 'Failed to delete user subscriptions: ' . $self->{DBH}->errstr; + $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q= +pub") || return 'Failed to delete subscribed columns: ' . $self->{DBH}->err= +str; + $self->{DBH}->do("delete from ____publications____ where tablename =3D $qt= +able and pubname =3D $qpub") || return 'Failed to delete from publications:= + ' . $self->{DBH}->errstr; + + #if this is the last reference, we want to drop triggers, etc; + if ($refcount <=3D 1) { + $id =3D "_".$id."_"; + + $self->{DBH}->do("alter table $table alter column ____rowver____ drop def= +ault") || return 'Failed to alter column default: ' . $self->{DBH}->errstr; + $self->{DBH}->do("alter table $table alter column ____rowid____ drop defa= +ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr; + $self->{DBH}->do("alter table $table alter column ____stamp____ drop defa= +ult") || return 'Failed to alter column default: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_ver_upd'; + $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to = +drop trigger: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_ver_ins'; + $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to = +drop trigger: ' . $self->{DBH}->errstr; + + my $trigger =3D $id.'_del_row'; + $self->{DBH}->do("drop trigger $trigger on $table") || return 'Failed to = +drop trigger: ' . $self->{DBH}->errstr; + + my $sequence =3D $id.'_rowid_seq'; + $self->{DBH}->do("drop sequence $sequence") || return 'Failed to drop seq= +uence: ' . $self->{DBH}->errstr; + + my $index =3D $id.'____rowid____idx'; + $self->{DBH}->do("drop index $index") || return 'Failed to drop index: ' = +. $self->{DBH}->errstr; + $self->{DBH}->do("delete from ____tables____ where tablename =3D $qtable"= +) || return 'remove entry from tables: ' . $self->{DBH}->errstr; + } +return undef; +} + + + + + +#Subscribe user/node to a publication + # Accepts 3 arguements: Username, Nodename, Publication + # NOTE: the remaining arguments can be supplied as column names to which = +the user/node should be subscribed + # Return undef if ok, else returns an error string + +sub subscribe { + my $self =3D shift; + my $user =3D shift || die 'You must provide user, node and publication as = +arguments'; + my $node =3D shift || die 'You must provide user, node and publication as = +arguments'; + my $pub =3D shift || die 'You must provide user, node and publication as a= +rguments'; + my @cols =3D @_; + + my $quser =3D $self->{DBH}->quote($user); + my $qnode =3D $self->{DBH}->quote($node); + my $qpub =3D $self->{DBH}->quote($pub); + + my $sql =3D "select tablename from ____publications____ where pubname =3D = +$qpub"; + my ($table) =3D $self->GetOneRow($sql); + return "Publication $pub does not exist." if not defined $table; + my $qtable =3D $self->{DBH}->quote($table); + + @cols =3D $self->GetTableCols($table) if !@cols; # get defaults if cols we= +re not spefified by caller + + $self->{DBH}->do("insert into ____subscribed____ (username, nodename,pubna= +me,last_ver,refreshonce) values('$user', '$node','$pub',0, true)") || retur= +n 'Failes to create subscription: ' . $self->{DBH}->errstr;=09 + + foreach my $col (@cols) { + $self->{DBH}->do("insert into ____subscribed_cols____ (username, nodename= +, pubname, col_name) values ('$user','$node','$pub','$col')") || return 'Fa= +iles to subscribe column: ' . $self->{DBH}->errstr;=09 + } + + return undef; +} + + +#Unsubscribe user/node to a publication + # Accepts 3 arguements: Username, Nodename, Publication + # Return undef if ok, else returns an error string + +sub unsubscribe { + my $self =3D shift; + my $user =3D shift || die 'You must provide user, node and publication as = +arguments'; + my $node =3D shift || die 'You must provide user, node and publication as = +arguments'; + my $pub =3D shift || die 'You must provide user, node and publication as a= +rguments'; + my @cols =3D @_; + + my $quser =3D $self->{DBH}->quote($user); + my $qnode =3D $self->{DBH}->quote($node); + my $qpub =3D $self->{DBH}->quote($pub); + + my $sql =3D "select tablename from ____publications____ where pubname =3D = +$qpub"; + my $table =3D $self->GetOneRow($sql); + return "Publication $pub does not exist." if not defined $table; + + $self->{DBH}->do("delete from ____subscribed_cols____ where pubname =3D $q= +pub and username =3D $quser and nodename =3D $qnode") || return 'Failed to = +remove column subscription: '. $self->{DBH}->errstr; + $self->{DBH}->do("delete from ____subscribed____ where pubname =3D $qpub a= +nd username =3D $quser and nodename =3D $qnode") || return 'Failed to remov= +e subscription: '. $self->{DBH}->errstr; + + + return undef; +} + + + +#INSTALL creates the necessary management tables.=20=20 + #returns undef if everything is ok, else returns a string describing the e= +rror; +sub INSTALL { +my $self =3D shift; + +#check to see if management tables are already installed + +my ($test) =3D $self->GetOneRow("select * from pg_class where relname =3D '= +____publications____'"); +if (defined($test)) { + return 'It appears that synchronization manangement tables are already ins= +talled here. Please uninstall before reinstalling.'; +}; + + + +#install the management tables, etc. + +$self->{DBH}->do("create table ____publications____ (pubname text primary k= +ey,description text, tablename text, sync_order int4, whereclause text)") |= +| return $self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____subscribed_cols____ (nodename text, user= +name text, pubname text, col_name text, description text, primary key(noden= +ame, username, pubname,col_name))") || return $self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____subscribed____ (nodename text, username = +text, pubname text, last_session timestamp, post_ver int4, last_ver int4, w= +hereclause text, sanity_limit int4 default 0, sanity_delete int4 default 0,= + sanity_update int4 default 0, sanity_insert int4 default 50, readonly bool= +ean, disabled boolean, fullrefreshonly boolean, refreshonce boolean, primar= +y key(nodename, username, pubname))") || return $self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____last_stable____ (version int4, username = +text, nodename text, primary key(version, username, nodename))") || return = +$self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____tables____ (tablename text, table_id int= +4, refcount int4, primary key(tablename, table_id))") || return $self->{DBH= +}->errstr(); + +$self->{DBH}->do("create sequence ____table_id_seq____") || return $self->{= +DBH}->errstr(); + +$self->{DBH}->do("alter table ____tables____ alter column table_id set defa= +ult nextval('____table_id_seq____')") || return $self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____deleted____ (rowid int4, tablename text,= + rowver int4, stamp timestamp, primary key (rowid, tablename))") || return = +$self->{DBH}->errstr(); + +$self->{DBH}->do("create table ____collision____ (rowid text, tablename tex= +t, rowver int4, stamp timestamp, faildate timestamp default now(),data text= +,reason text, action text, username text, nodename text,queue text)") || re= +turn $self->{DBH}->errstr(); + +$self->{DBH}->do("create sequence ____version_seq____") || return $self->{D= +BH}->errstr(); + +$self->{DBH}->do("create table ____sync_log____ (username text, nodename te= +xt, stamp timestamp, message text)") || return $self->{DBH}->errstr(); + +$self->{DBH}->do("create function sync_insert_ver() returns opaque as +'begin +if new.____rowver____ isnull then +new.____rowver____ :=3D ____version_seq____.last_value; +end if; +if new.____stamp____ isnull then +new.____stamp____ :=3D now(); +end if; +return NEW; +end;' language 'plpgsql'") || return $self->{DBH}->errstr(); + +$self->{DBH}->do("create function sync_update_ver() returns opaque as +'begin +if new.____rowver____ =3D old.____rowver____ then +new.____rowver____ :=3D ____version_seq____.last_value; +end if; +if new.____stamp____ =3D old.____stamp____ then +new.____stamp____ :=3D now(); +end if; +return NEW; +end;' language 'plpgsql'") || return $self->{DBH}->errstr(); + + +$self->{DBH}->do("create function sync_delete_row() returns opaque as=20 +'begin=20 +insert into ____deleted____ (rowid,tablename,rowver,stamp) values +(old.____rowid____, TG_RELNAME, old.____rowver____,old.____stamp____);=20 +return old;=20 +end;' language 'plpgsql'") || return $self->{DBH}->errstr(); + +return undef; +} + +#removes all management tables & related stuff + #returns undef if ok, else returns an error message as a string +sub UNINSTALL { +my $self =3D shift; + +#Make sure all tables are unpublished first +my $sth =3D $self->{DBH}->prepare("select pubname from ____publications____= +"); +$sth->execute; +my $pub; +while (($pub) =3D $sth->fetchrow_array) { + $self->unpublish($pub);=09 +} +$sth->finish; + +$self->{DBH}->do("drop table ____publications____") || return $self->{DBH}-= +>errstr(); +$self->{DBH}->do("drop table ____subscribed_cols____") || return $self->{DB= +H}->errstr(); +$self->{DBH}->do("drop table ____subscribed____") || return $self->{DBH}->e= +rrstr(); +$self->{DBH}->do("drop table ____last_stable____") || return $self->{DBH}->= +errstr(); +$self->{DBH}->do("drop table ____deleted____") || return $self->{DBH}->errs= +tr(); +$self->{DBH}->do("drop table ____collision____") || return $self->{DBH}->er= +rstr(); +$self->{DBH}->do("drop table ____tables____") || return $self->{DBH}->errst= +r(); +$self->{DBH}->do("drop table ____sync_log____") || return $self->{DBH}->err= +str(); + +$self->{DBH}->do("drop sequence ____table_id_seq____") || return $self->{DB= +H}->errstr(); +$self->{DBH}->do("drop sequence ____version_seq____") || return $self->{DBH= +}->errstr(); + +$self->{DBH}->do("drop function sync_insert_ver()") || return $self->{DBH}-= +>errstr(); +$self->{DBH}->do("drop function sync_update_ver()") || return $self->{DBH}-= +>errstr(); +$self->{DBH}->do("drop function sync_delete_row()") || return $self->{DBH}-= +>errstr(); + +return undef; + +} + +sub DESTROY { + my $self =3D shift; + + $self->{DBH}->disconnect; + $self->{DBLOG}->disconnect; + return undef; +} + +############# Helper Subs ############ + +sub GetOneRow { + my $self =3D shift; + my $sql =3D shift || die 'Must provide sql select statement'; + my $sth =3D $self->{DBH}->prepare($sql) || return undef; + $sth->execute || return undef; + my @row =3D $sth->fetchrow_array; + $sth->finish; + return @row; +} + +#call this with second non-zero value to get hidden columns +sub GetTableCols { + my $self =3D shift; + my $table =3D shift || die 'Must provide table name'; + my $wanthidden =3D shift; + my $sql =3D "select * from $table where 0 =3D 1"; + my $sth =3D $self->{DBH}->prepare($sql) || return undef; + $sth->execute || return undef; + my @row =3D @{$sth->{NAME}}; + $sth->finish; + return @row if $wanthidden; + my @cols; + foreach my $col (@row) { + next if $col eq '____rowver____'; + next if $col eq '____stamp____'; + next if $col eq '____rowid____'; + push @cols, $col;=09 + } + return @cols; +} + + +1; #happy require + +------=_NextPart_000_0062_01C0541E.125CAF30-- + + -- 2.40.0