6 use sigtrap qw(handler closeout normal-signals);
9 use DBD::Pg qw(:pg_types);
12 Getopt::Long::Configure ("bundling");
13 use Time::HiRes qw( gettimeofday tv_interval );
14 use POSIX qw(strftime);
15 use Data::Dumper qw(Dumper);
22 my $sourceservice = 'source';
23 my $targetservice = 'target';
24 my $batchdelay = $ENV{DELAY} || 0;
28 my $batchstart = [gettimeofday];
29 my $copystart = [gettimeofday];
33 my $logstatuschange = 0;
34 my $sequences_only = 0; # -S to set true, will set sequences on target
36 my $status = 'initializing';
39 my $posting; # posting table row for table we're working on
41 my $quoted_pk; # quoted version of pk
51 my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
52 my $s = sprintf("%06i", $microseconds);
53 $time = strftime("%FT%T.$s ", gmtime($seconds));
61 $lastlog = "$time $line";
62 print $nologtime ? $line : "$time $line";
66 my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
67 my $at = strftime("%FT%T", gmtime($seconds));
68 my ($pkg, $fn, $line) = caller;
69 $status = "@_ ($at line $line)";
70 if ($logstatuschange) {
77 my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
78 my $s = sprintf("%06i", $microseconds);
80 print STDERR strftime("%FT%T.$s ", gmtime($seconds));
82 print STDERR sprintf('pid %d, sourcepid %d, %s: status: %s', $$, $sourcepid, $locked, $status);
83 print STDERR sprintf('pid %d, sourcepid %d, %s: last : %s', $$, $sourcepid, $locked, $lastlog);
88 timelog("$locked: pid $$ terminated by signal: status $status");
93 $logstatuschange = !$logstatuschange;
96 status('initializing');
97 $SIG{'USR1'} = \&logstatus;
98 $SIG{'USR2'} = \&togglelsc;
99 $SIG{'TERM'} = \&terminated;
104 's=s', \$sourceservice,
105 't=s', \$targetservice,
110 'S', \$sequences_only,
113 my $sourcedb = DBI->connect("dbi:Pg:service=$sourceservice");
114 my $targetdb = DBI->connect("dbi:Pg:service=$targetservice");
116 $sourcedb->{RaiseError} = 1;
117 $sourcedb->{AutoCommit} = 1;
118 $targetdb->{RaiseError} = 1;
119 $targetdb->{AutoCommit} = 1;
121 my $queuedetail = $sourcedb->prepare(q{
122 select id, coalesce(max(repmax), id) as through
124 where table_oid = ?::regclass and pgroup = ?
128 my $sourcemark = $sourcedb->prepare(q{select migration.mark(?,?,?,?)});
129 my $findqueries = $sourcedb->prepare(q{select * from migration.posting where table_oid = ?::regclass });
130 my $nextrow = $sourcedb->prepare(q{select * from migration.nextrow(?,?)});
131 my $countcopies = $sourcedb->prepare(q{select count(*) from pg_stat_activity where application_name = 'copyrows'});
133 # small race type condition here, if all processes login at once,
134 # the set their name, then they all check the count, they'll all
135 # exit. Could be avoided by taking a lock before the set name
136 # and releasing it after the max copy count, effectively serializing
137 # the max copyrows check.
138 # $sourcedb->begin_work;
139 # $sourcedb->do(q{lock table migration.queue});
140 # then after the check
141 # #sourcedb->rollback; # or commit, doesn't matter
144 # $sourcedb->begin_work;
145 # $sourcedb->do(q{lock table migration.posting});
148 $sourcedb->do(q{set application_name to 'copyrows'});
149 $targetdb->do(q{set application_name to 'copyrows'});
150 my $pidarr = $sourcedb->selectcol_arrayref('select pg_backend_pid()');
151 ($sourcepid) = @$pidarr;
153 # TODO grab a table lock to mutex this count
155 $countcopies->execute;
156 my ($count) = $countcopies->fetchrow_array;
157 if ($count > $maxcopies) {
158 timelog "$count copyrows already running, exiting($sourcepid)";
161 # $sourcedb->rollback;
166 status 'finding work';
169 $findwork = $sourcedb->prepare(q{
170 select * from migration.find_work(?,?)
173 $findwork = $sourcedb->prepare(q{
174 select * from migration.find_work()
177 $findwork->execute(@tables);
178 @locked = $findwork->fetchrow_array();
182 $locked = join('/', @locked);
183 $0 = "copyrows $locked";
184 timelog "$locked: locked";
186 timelog "unable to lock any tables. exiting.";
190 timelog "checking queue $locked";
194 timelog sprintf('%s/%d: %s queue %s-%s', @locked, $qd->{action}, $qd->{id}, $qd->{repmax});
196 timelog 'does not seem to be any work, mark and exit';
197 $sourcemark->execute(@locked, $qd->{repmax}, $qd->{epoch});
209 # read, write, update queries, keyed by action/oid
210 my %queries; # statement handles
211 my ($select, $update, $delete, $insert, $usecopy);
212 my %sql; # sql for queries
213 my %binary; # which columns should be bound to PG_BYTEA
217 # get table oid and migration id of next row to copy
218 # is this needed? Since we're locking the row (in effect),
219 # can't we just look at the min and max and loop it that way?
220 # would save a query to get the next row
221 # one thing at a time.
223 status "nextrow @locked";
224 $nextrow->execute(@locked);
225 my $queueitem = $nextrow->fetchrow_hashref;
228 return unless defined $queueitem->{id};
229 $queueitem->{action} = lc($queueitem->{action});
237 $findqueries->execute($oid);
238 my $q = $findqueries->fetchrow_hashref;
239 $findqueries->finish;
242 $quoted_pk = $sourcedb->quote_identifier($q->{keycol});
244 $sql{'select'} = $q->{'selectq'};
245 $sql{'update'} = $q->{'updateq'};
246 $sql{'insert'} = $q->{'insertq'};
247 $sql{'delete'} = $q->{'deleteq'};
249 $select = $queries{'select'} = $sourcedb->prepare($q->{'selectq'});
250 $update = $queries{'update'} = $targetdb->prepare($q->{'updateq'});
251 $insert = $queries{'insert'} = $targetdb->prepare($q->{'insertq'});
252 $delete = $queries{'delete'} = $targetdb->prepare($q->{'deleteq'});
254 $binary{'update'} = $q->{update};
255 $binary{'insert'} = $q->{selins};
256 $binary{'select'} = $q->{selins};
258 for (@{$binary{'update'}{$oid}}) {
259 $queries{'update'}{$oid}->bind_param($_, undef, PG_BYTEA);
262 for (@{$binary{'insert'}{$oid}}) {
263 $queries{'insert'}{$oid}->bind_param($_, undef, PG_BYTEA);
266 for (@{$binary{'select'}{$oid}}) {
267 $queries{'select'}{$oid}->bind_param($_, undef, PG_BYTEA);
275 return unless @batch;
276 my $elapsed = tv_interval($batchstart);
277 my $batchsize = scalar @batch;
278 my $rps = $batchsize / $elapsed;
279 timelog sprintf("%s: $batchvia batch of %d: RPS = %.2f",
280 $locked, $batchsize, $rps
288 my $epoch = $qi->{epoch};
289 my $action = $qi->{action};
293 status $action, 'row' , $id;
295 $action = lc($action);
296 my $q = $queries{$action};
297 die "unknown action $action" unless $q;
299 if ($action ne 'delete') {
300 #warn "select = $sql{select}\n";
301 status 'selecting row' , $id;
302 $select->execute($id);
303 @row = $select->fetchrow_array;
310 # TODO if we delete a row with the max migration_id
311 # and then later we promote, we may re-use an id and
312 # the sync back will be bad? no, because that id would
313 # have been deleted on the old source?
319 status $action, 'row' , $id, 'complete';
320 #warn "copied $oid $id\n";
323 warn "$action: @row\n";
330 status "marking $id, ", scalar @batch, ' rows in batch';
331 $sourcemark->execute(@locked, $id, $epoch);
335 $batchstart = [ gettimeofday ];
336 $sourcedb->begin_work;
337 $targetdb->begin_work;
355 my $trps = $rows / tv_interval($copystart);
356 timelog sprintf('%s: %d total rows copied (%.2f rps)', $locked, $rows, $trps);
358 timelog sprintf('%s: pid %d exiting (%.2f seconds)', $locked, $$, tv_interval($copystart));
364 my $qstart = $qi->{id};
365 my $qend = $qi->{repmax};
367 my $tabname = $posting->{tablename};
369 my $y = $qstart + $batch - 1;
376 timelog sprintf('%s: copy %d-%d', $locked, $x, $y);
379 $sourcedb->do("copy (select * from $tabname where $quoted_pk >= $x and $quoted_pk <= $y) to stdout (format text)");
380 $targetdb->do("copy $tabname from stdin (format text)");
381 while ($rv = $sourcedb->pg_getcopydata($row) >= 0) {
382 $targetdb->pg_putcopydata($row);
386 $targetdb->pg_putcopyend;
387 $sourcemark->execute(@locked, $y, 0);
392 timelog sprintf('%s: error: %s', $locked, $@);
398 return 0 unless $posting->{copyable};
399 return $qi->{action} eq 'insert' && $qi->{epoch} == 0;
402 status 'batching rows';
405 while ($queueitem = nextrow()) {
406 status 'have next row', $queueitem->{id};
407 if (canusecopy($queueitem)) {
408 #timelog 'using copy';
410 batchcopy($queueitem);
413 $batchvia = 'insert';
415 nextbatch if @batch >= $batch;
417 last if defined $maxrows && ($rows+@batch) >= $maxrows;
421 timelog "$locked: queue empty" unless defined $maxrows && $rows >= $maxrows;
425 timelog sprintf('%s: error: %s', $locked, $@);