#!/usr/bin/env perl use strict; use warnings; use sigtrap qw(handler closeout normal-signals); use DBI; use DBD::Pg qw(:pg_types); use Getopt::Long; Getopt::Long::Configure ("bundling"); use Time::HiRes qw( gettimeofday tv_interval ); use POSIX qw(strftime); use Data::Dumper qw(Dumper); $\ = "\n"; $| = 1; my $batch = 1000; my @exclude = (); my $sourceservice = 'source'; my $targetservice = 'target'; my $batchdelay = $ENV{DELAY} || 0; my $rows = 0; my $totaltime; my $maxrows; my $batchstart = [gettimeofday]; my $copystart = [gettimeofday]; my $elapsed; my $maxcopies = 6; my $nologtime = 0; my $logstatuschange = 0; my $sequences_only = 0; # -S to set true, will set sequences on target my $status = 'initializing'; my $lastlog; my $posting; # posting table row for table we're working on my $queueitem; my $quoted_pk; # quoted version of pk my $sourcepid = 0; my @locked; my $locked; sub timelog { my $time; my $line; my ($seconds, $microseconds) = Time::HiRes::gettimeofday(); my $s = sprintf("%06i", $microseconds); $time = strftime("%FT%T.$s ", gmtime($seconds)); if (@_ > 1) { $line = sprintf(@_); } else { $line = $_[0]; } $lastlog = "$time $line"; print $nologtime ? $line : "$time $line"; } sub status { my ($seconds, $microseconds) = Time::HiRes::gettimeofday(); my $at = strftime("%FT%T", gmtime($seconds)); my ($pkg, $fn, $line) = caller; $status = "@_ ($at line $line)"; if ($logstatuschange) { timelog($status); } } sub logstatus { unless ($nologtime) { my ($seconds, $microseconds) = Time::HiRes::gettimeofday(); my $s = sprintf("%06i", $microseconds); local $\ = ''; print STDERR strftime("%FT%T.$s ", gmtime($seconds)); } print STDERR sprintf('pid %d, sourcepid %d, %s: status: %s', $$, $sourcepid, $locked, $status); print STDERR sprintf('pid %d, sourcepid %d, %s: last : %s', $$, $sourcepid, $locked, $lastlog); } sub terminated { logstatus(); timelog("$locked: pid $$ terminated by signal: status $status"); closeout(); } sub togglelsc { $logstatuschange = !$logstatuschange; } status('initializing'); $SIG{'USR1'} = \&logstatus; $SIG{'USR2'} = \&togglelsc; $SIG{'TERM'} = \&terminated; GetOptions( 'b=i', \$batch, 'x=s@', \@exclude, 's=s', \$sourceservice, 't=s', \$targetservice, 'd=i', \$batchdelay, 'M=i', \$maxrows, 'c=i', \$maxcopies, 'T', \$nologtime, 'S', \$sequences_only, ); my $sourcedb = DBI->connect("dbi:Pg:service=$sourceservice"); my $targetdb = DBI->connect("dbi:Pg:service=$targetservice"); $sourcedb->{RaiseError} = 1; $sourcedb->{AutoCommit} = 1; $targetdb->{RaiseError} = 1; $targetdb->{AutoCommit} = 1; my $queuedetail = $sourcedb->prepare(q{ select id, coalesce(max(repmax), id) as through from migration.queue where table_oid = ?::regclass and pgroup = ? group by id }); my $sourcemark = $sourcedb->prepare(q{select migration.mark(?,?,?,?)}); my $findqueries = $sourcedb->prepare(q{select * from migration.posting where table_oid = ?::regclass }); my $nextrow = $sourcedb->prepare(q{select * from migration.nextrow(?,?)}); my $countcopies = $sourcedb->prepare(q{select count(*) from pg_stat_activity where application_name = 'copyrows'}); # small race type condition here, if all processes login at once, # the set their name, then they all check the count, they'll all # exit. Could be avoided by taking a lock before the set name # and releasing it after the max copy count, effectively serializing # the max copyrows check. # $sourcedb->begin_work; # $sourcedb->do(q{lock table migration.queue}); # then after the check # #sourcedb->rollback; # or commit, doesn't matter #if ($maxcopies) { # $sourcedb->begin_work; # $sourcedb->do(q{lock table migration.posting}); #} $sourcedb->do(q{set application_name to 'copyrows'}); $targetdb->do(q{set application_name to 'copyrows'}); my $pidarr = $sourcedb->selectcol_arrayref('select pg_backend_pid()'); ($sourcepid) = @$pidarr; # TODO grab a table lock to mutex this count if ($maxcopies) { $countcopies->execute; my ($count) = $countcopies->fetchrow_array; if ($count > $maxcopies) { timelog "$count copyrows already running, exiting($sourcepid)"; exit 0; } # $sourcedb->rollback; } my @tables = @ARGV; status 'finding work'; my $findwork; if (@tables) { $findwork = $sourcedb->prepare(q{ select * from migration.find_work(?,?) }); } else { $findwork = $sourcedb->prepare(q{ select * from migration.find_work() }); } $findwork->execute(@tables); @locked = $findwork->fetchrow_array(); $findwork->finish; if (@locked) { $locked = join('/', @locked); $0 = "copyrows $locked"; timelog "$locked: locked"; } else { timelog "unable to lock any tables. exiting."; exit 0; } timelog "checking queue $locked"; eval { my $qd = nextrow(); if ($qd) { timelog sprintf('%s/%d: %s queue %s-%s', @locked, $qd->{action}, $qd->{id}, $qd->{repmax}); } else { timelog 'does not seem to be any work, mark and exit'; $sourcemark->execute(@locked, $qd->{repmax}, $qd->{epoch}); $sourcemark->finish; closeout(); } }; if ($@) { timelog "$@"; exit 1; } my $table; # read, write, update queries, keyed by action/oid my %queries; # statement handles my ($select, $update, $delete, $insert, $usecopy); my %sql; # sql for queries my %binary; # which columns should be bound to PG_BYTEA getqueries(@locked); # get table oid and migration id of next row to copy # is this needed? Since we're locking the row (in effect), # can't we just look at the min and max and loop it that way? # would save a query to get the next row # one thing at a time. sub nextrow { status "nextrow @locked"; $nextrow->execute(@locked); my $queueitem = $nextrow->fetchrow_hashref; $nextrow->finish; return unless defined $queueitem->{id}; $queueitem->{action} = lc($queueitem->{action}); return $queueitem; } sub getqueries { my $oid = shift; $findqueries->execute($oid); my $q = $findqueries->fetchrow_hashref; $findqueries->finish; $posting = $q; $quoted_pk = $sourcedb->quote_identifier($q->{keycol}); $sql{'select'} = $q->{'selectq'}; $sql{'update'} = $q->{'updateq'}; $sql{'insert'} = $q->{'insertq'}; $sql{'delete'} = $q->{'deleteq'}; $select = $queries{'select'} = $sourcedb->prepare($q->{'selectq'}); $update = $queries{'update'} = $targetdb->prepare($q->{'updateq'}); $insert = $queries{'insert'} = $targetdb->prepare($q->{'insertq'}); $delete = $queries{'delete'} = $targetdb->prepare($q->{'deleteq'}); $binary{'update'} = $q->{update}; $binary{'insert'} = $q->{selins}; $binary{'select'} = $q->{selins}; for (@{$binary{'update'}{$oid}}) { $queries{'update'}{$oid}->bind_param($_, undef, PG_BYTEA); } for (@{$binary{'insert'}{$oid}}) { $queries{'insert'}{$oid}->bind_param($_, undef, PG_BYTEA); } for (@{$binary{'select'}{$oid}}) { $queries{'select'}{$oid}->bind_param($_, undef, PG_BYTEA); } } my @batch; my $batchvia = ''; sub logbatch { return unless @batch; my $elapsed = tv_interval($batchstart); my $batchsize = scalar @batch; my $rps = $batchsize / $elapsed; timelog sprintf("%s: $batchvia batch of %d: RPS = %.2f", $locked, $batchsize, $rps ); STDOUT->flush; } sub cprow { my $qi = shift; my $id = $qi->{id}; my $epoch = $qi->{epoch}; my $action = $qi->{action}; my @row; status $action, 'row' , $id; $action = lc($action); my $q = $queries{$action}; die "unknown action $action" unless $q; if ($action ne 'delete') { #warn "select = $sql{select}\n"; status 'selecting row' , $id; $select->execute($id); @row = $select->fetchrow_array; $select->finish; #return unless @row; } else { @row = ($id); } # TODO if we delete a row with the max migration_id # and then later we promote, we may re-use an id and # the sync back will be bad? no, because that id would # have been deleted on the old source? if (@row) { eval { $q->execute(@row); $q->finish; push @batch, $id; status $action, 'row' , $id, 'complete'; #warn "copied $oid $id\n"; }; if ($@) { warn "$action: @row\n"; die $@; } } else { push @batch, $id; } status "marking $id, ", scalar @batch, ' rows in batch'; $sourcemark->execute(@locked, $id, $epoch); } sub startbatch { $batchstart = [ gettimeofday ]; $sourcedb->begin_work; $targetdb->begin_work; } sub finishbatch { $targetdb->commit; $sourcedb->commit; logbatch(); $rows += @batch; @batch = (); } sub nextbatch { finishbatch; startbatch; } sub closeout { if ($rows) { my $trps = $rows / tv_interval($copystart); timelog sprintf('%s: %d total rows copied (%.2f rps)', $locked, $rows, $trps); } timelog sprintf('%s: pid %d exiting (%.2f seconds)', $locked, $$, tv_interval($copystart)); exit; } sub batchcopy { my $qi = shift; my $qstart = $qi->{id}; my $qend = $qi->{repmax}; my $tabname = $posting->{tablename}; my $x = $qstart; my $y = $qstart + $batch - 1; if ($y > $qend) { $y = $qend; } my $rv; my $row; timelog sprintf('%s: copy %d-%d', $locked, $x, $y); eval { #startbatch; $sourcedb->do("copy (select * from $tabname where $quoted_pk >= $x and $quoted_pk <= $y) to stdout (format text)"); $targetdb->do("copy $tabname from stdin (format text)"); while ($rv = $sourcedb->pg_getcopydata($row) >= 0) { $targetdb->pg_putcopydata($row); push @batch, 1; } die if $rv < 0; $targetdb->pg_putcopyend; $sourcemark->execute(@locked, $y, 0); $sourcemark->finish; }; if ($@) { chomp $@; timelog sprintf('%s: error: %s', $locked, $@); } }; sub canusecopy { my ($qi) = @_; return 0 unless $posting->{copyable}; return $qi->{action} eq 'insert' && $qi->{epoch} == 0; } status 'batching rows'; eval { startbatch; while ($queueitem = nextrow()) { status 'have next row', $queueitem->{id}; if (canusecopy($queueitem)) { #timelog 'using copy'; $batchvia = 'copy'; batchcopy($queueitem); nextbatch; } else { $batchvia = 'insert'; cprow($queueitem); nextbatch if @batch >= $batch; } last if defined $maxrows && ($rows+@batch) >= $maxrows; } $sourcemark->finish; finishbatch; timelog "$locked: queue empty" unless defined $maxrows && $rows >= $maxrows; }; if ($@) { chomp $@; timelog sprintf('%s: error: %s', $locked, $@); } closeout;