]> granicus.if.org Git - pgm/blob - copyrows
add table docs
[pgm] / copyrows
1 #!/usr/bin/env perl
2
3 use strict;
4 use warnings;
5
6 use sigtrap qw(handler closeout normal-signals);
7
8 use DBI;
9 use DBD::Pg qw(:pg_types);
10
11 use Getopt::Long;
12 Getopt::Long::Configure ("bundling");
13 use Time::HiRes qw( gettimeofday tv_interval );
14 use POSIX qw(strftime);
15 use Data::Dumper qw(Dumper);
16
17 $\ = "\n";
18 $| = 1;
19
20 my $batch = 1000;
21 my @exclude = ();
22 my $sourceservice = 'source';
23 my $targetservice = 'target';
24 my $batchdelay = $ENV{DELAY} || 0;
25 my $rows = 0;
26 my $totaltime;
27 my $maxrows;
28 my $batchstart = [gettimeofday];
29 my $copystart = [gettimeofday];
30 my $elapsed;
31 my $maxcopies = 6;
32 my $nologtime = 0;
33 my $logstatuschange = 0;
34 my $sequences_only = 0; # -S to set true, will set sequences on target
35
36 my $status = 'initializing';
37 my $lastlog;
38
39 my $posting; # posting table row for table we're working on
40 my $queueitem;
41 my $quoted_pk; # quoted version of pk
42 my $sourcepid = 0;
43
44 my @locked;
45 my $locked;
46
47 sub timelog {
48         my $time;
49         my $line;
50
51         my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
52         my $s = sprintf("%06i", $microseconds);
53         $time = strftime("%FT%T.$s ", gmtime($seconds));
54
55         if (@_ > 1) {
56                 $line = sprintf(@_);
57         } else {
58                 $line = $_[0];
59         }
60
61         $lastlog = "$time $line";
62         print $nologtime ? $line : "$time $line";
63 }
64
65 sub status {
66         my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
67         my $at = strftime("%FT%T", gmtime($seconds));
68         my ($pkg, $fn, $line) = caller;
69         $status = "@_ ($at line $line)";
70         if ($logstatuschange) {
71                 timelog($status);
72         }
73 }
74
75 sub logstatus {
76         unless ($nologtime) {
77                 my ($seconds, $microseconds) = Time::HiRes::gettimeofday();
78                 my $s = sprintf("%06i", $microseconds);
79                 local $\ = '';
80                 print STDERR strftime("%FT%T.$s ", gmtime($seconds));
81         }
82         print STDERR sprintf('pid %d, sourcepid %d, %s: status: %s', $$, $sourcepid, $locked, $status);
83         print STDERR sprintf('pid %d, sourcepid %d, %s: last  : %s', $$, $sourcepid, $locked, $lastlog);
84 }
85
86 sub terminated {
87         logstatus();
88         timelog("$locked: pid $$ terminated by signal: status $status");
89         closeout();
90 }
91
92 sub togglelsc {
93         $logstatuschange = !$logstatuschange;
94 }
95
96 status('initializing');
97 $SIG{'USR1'} = \&logstatus;
98 $SIG{'USR2'} = \&togglelsc;
99 $SIG{'TERM'} = \&terminated;
100
101 GetOptions(
102         'b=i', \$batch,
103         'x=s@', \@exclude,
104         's=s', \$sourceservice,
105         't=s', \$targetservice,
106         'd=i', \$batchdelay,
107         'M=i', \$maxrows,
108         'c=i', \$maxcopies,
109         'T', \$nologtime,
110         'S', \$sequences_only,
111 );
112
113 my $sourcedb = DBI->connect("dbi:Pg:service=$sourceservice");
114 my $targetdb = DBI->connect("dbi:Pg:service=$targetservice");
115
116 $sourcedb->{RaiseError} = 1;
117 $sourcedb->{AutoCommit} = 1;
118 $targetdb->{RaiseError} = 1;
119 $targetdb->{AutoCommit} = 1;
120
121 my $queuedetail = $sourcedb->prepare(q{
122         select id, coalesce(max(repmax), id) as through
123         from migration.queue
124         where table_oid = ?::regclass and pgroup = ?
125         group by id
126 });
127
128 my $sourcemark = $sourcedb->prepare(q{select migration.mark(?,?,?,?)});
129 my $findqueries = $sourcedb->prepare(q{select * from migration.posting where table_oid = ?::regclass });
130 my $nextrow = $sourcedb->prepare(q{select * from migration.nextrow(?,?)});
131 my $countcopies = $sourcedb->prepare(q{select count(*) from pg_stat_activity where application_name = 'copyrows'});
132
133 # small race type condition here, if all processes login at once,
134 # the set their name, then they all check the count, they'll all
135 # exit.  Could be avoided by taking a lock before the set name
136 # and releasing it after the max copy count, effectively serializing
137 # the max copyrows check.
138 # $sourcedb->begin_work;
139 # $sourcedb->do(q{lock table migration.queue});
140 # then after the check
141 # #sourcedb->rollback; # or commit, doesn't matter
142
143 #if ($maxcopies) {
144 #       $sourcedb->begin_work;
145 #       $sourcedb->do(q{lock table migration.posting});
146 #}
147
148 $sourcedb->do(q{set application_name to 'copyrows'});
149 $targetdb->do(q{set application_name to 'copyrows'});
150 my $pidarr = $sourcedb->selectcol_arrayref('select pg_backend_pid()');
151 ($sourcepid) = @$pidarr;
152
153 # TODO grab a table lock to mutex this count
154 if ($maxcopies) {
155         $countcopies->execute;
156         my ($count) = $countcopies->fetchrow_array;
157         if ($count > $maxcopies) {
158                 timelog "$count copyrows already running, exiting($sourcepid)";
159                 exit 0;
160         }
161 #       $sourcedb->rollback;
162 }
163
164 my @tables = @ARGV;
165
166 status 'finding work';
167 my $findwork;
168 if (@tables) {
169         $findwork = $sourcedb->prepare(q{
170                 select * from migration.find_work(?,?)
171         });
172 } else {
173         $findwork = $sourcedb->prepare(q{
174                 select * from migration.find_work()
175         });
176 }
177 $findwork->execute(@tables);
178 @locked = $findwork->fetchrow_array();
179 $findwork->finish;
180
181 if (@locked) {
182         $locked = join('/', @locked);
183         $0 = "copyrows $locked";
184         timelog "$locked: locked";
185 } else {
186         timelog "unable to lock any tables.  exiting.";
187         exit 0;
188 }
189
190 timelog "checking queue $locked";
191 eval {
192         my $qd = nextrow();
193         if ($qd) {
194                 timelog sprintf('%s/%d: %s queue %s-%s', @locked, $qd->{action}, $qd->{id}, $qd->{repmax});
195         } else {
196                 timelog 'does not seem to be any work, mark and exit';
197                 $sourcemark->execute(@locked, $qd->{repmax}, $qd->{epoch});
198                 $sourcemark->finish;
199                 closeout();
200         }
201 };
202 if ($@) {
203         timelog "$@";
204         exit 1;
205 }
206
207 my $table;
208
209 # read, write, update queries, keyed by action/oid
210 my %queries; # statement handles
211 my ($select, $update, $delete, $insert, $usecopy);
212 my %sql; # sql for queries
213 my %binary; # which columns should be bound to PG_BYTEA
214
215 getqueries(@locked);
216
217 # get table oid and migration id of next row to copy
218 # is this needed?  Since we're locking the row (in effect),
219 # can't we just look at the min and max and loop it that way?
220 # would save a query to get the next row
221 # one thing at a time.
222 sub nextrow {
223         status "nextrow @locked";
224         $nextrow->execute(@locked);
225         my $queueitem = $nextrow->fetchrow_hashref;
226         $nextrow->finish;
227
228         return unless defined $queueitem->{id};
229         $queueitem->{action} = lc($queueitem->{action});
230
231         return $queueitem;
232 }
233
234 sub getqueries {
235         my $oid = shift;
236
237         $findqueries->execute($oid);
238         my $q = $findqueries->fetchrow_hashref;
239         $findqueries->finish;
240         $posting = $q;
241
242         $quoted_pk = $sourcedb->quote_identifier($q->{keycol});
243
244         $sql{'select'} = $q->{'selectq'};
245         $sql{'update'} = $q->{'updateq'};
246         $sql{'insert'} = $q->{'insertq'};
247         $sql{'delete'} = $q->{'deleteq'};
248
249         $select = $queries{'select'} = $sourcedb->prepare($q->{'selectq'});
250         $update = $queries{'update'} = $targetdb->prepare($q->{'updateq'});
251         $insert = $queries{'insert'} = $targetdb->prepare($q->{'insertq'});
252         $delete = $queries{'delete'} = $targetdb->prepare($q->{'deleteq'});
253
254         $binary{'update'} = $q->{update};
255         $binary{'insert'} = $q->{selins};
256         $binary{'select'} = $q->{selins};
257
258         for (@{$binary{'update'}{$oid}}) {
259                 $queries{'update'}{$oid}->bind_param($_, undef, PG_BYTEA);
260         }
261
262         for (@{$binary{'insert'}{$oid}}) {
263                 $queries{'insert'}{$oid}->bind_param($_, undef, PG_BYTEA);
264         }
265
266         for (@{$binary{'select'}{$oid}}) {
267                 $queries{'select'}{$oid}->bind_param($_, undef, PG_BYTEA);
268         }
269 }
270
271 my @batch;
272
273 my $batchvia = '';
274 sub logbatch {
275         return unless @batch;
276         my $elapsed = tv_interval($batchstart);
277         my $batchsize = scalar @batch;
278         my $rps = $batchsize / $elapsed;
279         timelog sprintf("%s: $batchvia batch of %d: RPS = %.2f",
280                 $locked, $batchsize, $rps
281         );
282         STDOUT->flush;
283 }
284
285 sub cprow {
286         my $qi = shift;
287         my $id = $qi->{id};
288         my $epoch = $qi->{epoch};
289         my $action = $qi->{action};
290
291         my @row;
292
293         status $action, 'row' , $id;
294
295         $action = lc($action);
296         my $q = $queries{$action};
297         die "unknown action $action" unless $q;
298
299         if ($action ne 'delete') {
300                 #warn "select = $sql{select}\n";
301                 status 'selecting row' , $id;
302                 $select->execute($id);
303                 @row = $select->fetchrow_array;
304                 $select->finish;
305                 #return unless @row;
306         } else {
307                 @row = ($id);
308         }
309
310         # TODO if we delete a row with the max migration_id
311         # and then later we promote, we may re-use an id and
312         # the sync back will be bad?  no, because that id would
313         # have been deleted on the old source?
314         if (@row) {
315                 eval {
316                         $q->execute(@row);
317                         $q->finish;
318                         push @batch, $id;
319                         status $action, 'row' , $id, 'complete';
320                         #warn "copied $oid $id\n";
321                 };
322                 if ($@) {
323                         warn "$action: @row\n"; 
324                         die $@;
325                 }
326         } else {
327                 push @batch, $id;
328         }
329         
330         status "marking $id, ", scalar @batch, ' rows in batch';
331         $sourcemark->execute(@locked, $id, $epoch);
332 }
333
334 sub startbatch {
335         $batchstart = [ gettimeofday ];
336         $sourcedb->begin_work;
337         $targetdb->begin_work;
338 }
339
340 sub finishbatch {
341         $targetdb->commit;
342         $sourcedb->commit;
343         logbatch();
344         $rows += @batch;
345         @batch = ();
346 }
347
348 sub nextbatch {
349         finishbatch;
350         startbatch;
351 }
352
353 sub closeout {
354         if ($rows) {
355                 my $trps = $rows / tv_interval($copystart);
356                 timelog sprintf('%s: %d total rows copied (%.2f rps)', $locked, $rows, $trps);
357         }
358         timelog sprintf('%s: pid %d exiting (%.2f seconds)', $locked, $$, tv_interval($copystart));
359         exit;
360 }
361
362 sub batchcopy {
363         my $qi = shift;
364         my $qstart = $qi->{id};
365         my $qend = $qi->{repmax};
366
367         my $tabname = $posting->{tablename};
368         my $x = $qstart;
369         my $y = $qstart + $batch - 1;
370         if ($y > $qend) {
371                 $y = $qend;
372         }
373         my $rv;
374         my $row;
375
376         timelog sprintf('%s: copy %d-%d', $locked, $x, $y);
377         eval {
378                 #startbatch;
379                 $sourcedb->do("copy (select * from $tabname where $quoted_pk >= $x and $quoted_pk <= $y) to stdout (format text)");
380                 $targetdb->do("copy $tabname from stdin (format text)");
381                 while ($rv = $sourcedb->pg_getcopydata($row) >= 0) {
382                         $targetdb->pg_putcopydata($row);
383                         push @batch, 1;
384                 }
385                 die if $rv < 0;
386                 $targetdb->pg_putcopyend;
387                 $sourcemark->execute(@locked, $y, 0);
388                 $sourcemark->finish;
389         };
390         if ($@) {
391                 chomp $@;
392                 timelog sprintf('%s: error: %s', $locked, $@);
393         }
394 };
395
396 sub canusecopy {
397         my ($qi) = @_;
398         return 0 unless $posting->{copyable};
399         return $qi->{action} eq 'insert' && $qi->{epoch} == 0;
400 }
401
402 status 'batching rows';
403 eval {
404         startbatch;
405         while ($queueitem = nextrow()) {
406                 status 'have next row', $queueitem->{id};
407                 if (canusecopy($queueitem)) {
408                         #timelog 'using copy';
409                         $batchvia = 'copy';
410                         batchcopy($queueitem);
411                         nextbatch;
412                 } else {
413                         $batchvia = 'insert';
414                         cprow($queueitem);
415                         nextbatch if @batch >= $batch;
416                 }
417                 last if defined $maxrows && ($rows+@batch) >= $maxrows;
418         }
419         $sourcemark->finish;
420         finishbatch;
421         timelog "$locked: queue empty" unless defined $maxrows && $rows >= $maxrows;
422 };
423 if ($@) {
424         chomp $@;
425         timelog sprintf('%s: error: %s', $locked, $@);
426 }
427
428 closeout;