#!/usr/bin/perl
#

use warnings;
use strict;
use POSIX ":sys_wait_h";
use Time::HiRes 'usleep';
use File::Find;
  
use constant CHUNK_FORMAT => "%04d";

die "usage: $0 <number of parallel restorers>" unless @ARGV;

$| = 1;

my $workers = shift;
my $current_workers = 0;

sub find_largest_chunk {
 my $largest_chunk = -1;
 my $total_chunks = 0;
 find(sub {
  if (/^(\d{4})/) {
   $total_chunks++;
   if (int $1 > $largest_chunk) {
    $largest_chunk = int $1;
   }
  }
 }, '.');
 if ($largest_chunk == -1) {
  die "There is no largest chunk!"
 }
 if ($total_chunks != $largest_chunk+1) {
  die "Something has gone wrong. Some chunks are missing in the middle."
 }
 return $largest_chunk;
}

sub reaper {
 while (my $pid = waitpid(-1, WNOHANG) > 0) {
  $current_workers--;
 }
 $SIG{CHLD} = \&reaper;
}
$SIG{CHLD} = \&reaper;

my $n_chunks = find_largest_chunk;

my $n = 0;
my $fork_attempts = 0;
while ($n <= $n_chunks) {
 if ($current_workers == $workers) {
  usleep 100_000; # sleep 0.1 secs
  next;
 }

 my $pid = fork;
 unless (defined $pid) {
  $fork_attempts++;
  if ($fork_attempts >= 5) {
   while ($current_workers != 0) {
    usleep 100_000 # .1 sec
   }
   die "Failed forking ($fork_attempts attempts): $!";
  }
 }
 
 $current_workers++;

 if ($pid == 0) { # child
  $SIG{PIPE} = 'IGNORE';
  import_chunk($n);
  exit;
 }
 
 $n++;
}

while ($current_workers != 0) {
 usleep 100_000 # .1 sec
}

sub import_chunk {
 my $chunk_num = shift;

 my $chunk_file = glob(sprintf(CHUNK_FORMAT, $chunk_num) . '*');
 print "New child doing chunk $chunk_file\n";
 open my $chunk_fh, '<', $chunk_file or die "could not open $chunk_file: $!";
 open my $mysql, '|mysql' or die "chunk $chunk_file failed: $!";
 while (defined(my $line = <$chunk_fh>)) {
  print $mysql $line or die "failed writing chunk $chunk_file to mysql: $!"
 }
 close $mysql or die "mysql failed importing chunk $chunk_file: $!, status: $?";
 close $chunk_fh;
 print "Child $chunk_file finished working\n";
}


