DBIx-BulkUtil

 view release on metacpan or  search on metacpan

lib/DBIx/BulkUtil.pm  view on Meta::CPAN

  if ( $dir eq 'in' ) {
    my $mode = $opts{Action} || "A";
    if ( $mode eq 'T' ) {
      my $sql = "TRUNCATE TABLE $bcp_table";
      print "Executing: $sql\n";
      $dbh->do($sql);
    } elsif ($mode eq 'R') {
      $self->delete($bcp_table, '', $commit_size);
    }
    confess "BCP file $file does not exist" unless -f $file;

    # Save some work
    # checking underscore ok, we just did -f above
    unless ( -s _ ) {
      print "$file is empty. Skipping bcp\n";

      # Make any log file parsers happy
      print "0 rows copied\n";
      return 0;
    }

    # All this to decide whether or not to use '-E'
    # Only use '-E' if there is an identity column
    # And GenerateId is false
    unless ( $opts{GenerateId} ) {
      my $col_info = $self->column_info($table);
      my $col_map = $col_info->{MAP};
      if ($col_map) {
        for my $c ( values %$col_map ) {
          ++$id_cnt and last if $c->{TYPE_NAME} =~ /identity/;
        }
      }
    }
  }

  my ($action,$to_from) = ($dir eq 'in') ? ('Loading', 'from') : ('Exporting', 'to');
  print "$action $server/$bcp_table $to_from $file\n";

  my (@max_err_opt, @commit_opt, @header_opt, @id_opt);
  my $max_err_cnt = $opts{MaxErrors} || 0;
  if ( $dir eq 'in' ) {
    @max_err_opt = (-m => $max_err_cnt);
    @commit_opt  = (-b => $commit_size);
    @header_opt  = (-F => $opts{Header}+1) if $opts{Header};
    @id_opt = "-E" if $id_cnt;
  }

  my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
  my $in_temp_dir = $opts{TempDir}     || $opts{Debug};
  my $temp_dir;
  $temp_dir = $opts{TempDir} || "." if $in_temp_dir;

  require File::Temp;
  my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
  my @unlink  = $keep_temp ? (UNLINK => 0) : ();
  my $error_file = File::Temp->new(
    TEMPLATE => "${table}_XXXXX",
    SUFFIX   => ".err",
    @temp_dir, @unlink,
  );
  chmod(0664, $error_file->filename());
  $error_file->close();

  my @packet_size = $opts{PacketSize} ? ( -A => $opts{PacketSize} ) : ();
  my @passthru    = $opts{PassThru}   ? @{$opts{PassThru}} : ();

  my ( $fmt_file, $tmp_fmt_file );
  if ( $opts{FormatFile} ) {
    $fmt_file = $opts{FormatFile};
  } elsif ( ( $opts{ColumnList} && $opts{ColumnList} ) || ( $opts{Filler} && @{$opts{Filler}} ) ) {
    ($tmp_fmt_file,$fmt_file) = $self->mk_fmt_file(
      Table          => $table,
      Delimiter      => $delimiter,
      RowDelimiter   => $row_delimiter,
      ColumnList     => $opts{ColumnList},
      Filler         => $opts{Filler},
      TempDir        => $opts{TempDir},
      FormatFileName => $opts{FormatFileName},
      KeepTempFiles  => $keep_temp,
    );
  }
  my @fmt_file_opt = $fmt_file ? ( -f => $fmt_file ) : '-c';

  # UTF-8 doesn't work on HP - default is roman8 on HP
  # Should probably make '-J' some kind of option, with maybe
  # a map of OS types and default values. But leave that for
  # a later date.
  my @cmd = ( bcp => $bcp_table, $dir, $file,
    -U => $user,
    #-J => "utf8",
    -S => $server,
    -t => $delimiter,
    -r => $row_delimiter,
    -e => $error_file->filename(),
    @header_opt,
    @id_opt,
    @commit_opt,
    @max_err_opt,
    @packet_size,
    @passthru,
    @fmt_file_opt,
  );
  print "Executing: @cmd\n";
  push @cmd, -P => $self->{PASSWORD};
  open(my $fh, "-|", @cmd) or confess "Can't exec bcp: $!";

  my ($rows, $failed, $partially_failed);
  local ($_, $.);

  my $err_cnt = my $c_lib_err_cnt = my $srvr_err_cnt = 0;
  while (<$fh>) {
    print;
    if ( /^(Server|C[TS]LIB) Message/ ) {
      my $msg_type = $1;
      if ( $msg_type eq 'CSLIB' ) {
        if ( m|/N(\d+)| ) {
          # Sybase says truncation is not an error, so we will too
          # Or else we might get > 1 error on the same row
          unless ( $1 == 36 ) {
            $err_cnt++;
            $c_lib_err_cnt++;

lib/DBIx/BulkUtil.pm  view on Meta::CPAN

        confess "BCP error - bcp failed [$exit_stat]: $!"
          if $failed and !$partially_failed;

      } else {
        confess "BCP error - bcp returned status $exit_stat: $!";
      }
    }

    confess "BCP error - bcp recieved signal $exit_sig"      if $exit_sig > 0;
    confess "BCP error - bcp coredumped"                     if $exit_core;
  }

  # Will miss error count exceeded error on 10.x
  # But will catch other errors if load is aborted
  # Or no rows are loaded.
  confess "BCP error - no rows copied" if !defined($rows);

  # CTLIB errors do not cause non-zero exit - so catch them here
  confess "BCP error - max error count ($max_err_cnt) exceeded" if $err_cnt > $max_err_cnt;
  $rows ||= 0;
  return $rows;
}

{
no warnings 'once';
*bcp = \&bcp_in;
}

sub mk_fmt_file {
  my $self = shift;
  my %opts = @_;

  my $table = $opts{Table} || die "Table required for mk_fmt_file";
  my $col_info = $self->column_info($table);
  my $db_col_list = $col_info->{LIST};
  my %is_db_column;
  $is_db_column{$_}++ for @$db_col_list;
  my %is_filler;
  if ( $opts{Filler} ) {
    $is_filler{lc($_)}++ for @{$opts{Filler}};
  }

  my ($tmp_fmt_file,$fmt_file);
  if ( $opts{FormatFileName} ) {
    $fmt_file = $opts{FormatFileName};
  } else {
    require File::Temp;
    my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
    my $in_temp_dir = $opts{TempDir}     || $opts{Debug};
    my $temp_dir;

    $temp_dir = $opts{TempDir} || "." if $in_temp_dir;
    my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
    my @unlink  = ( $keep_temp || !defined(wantarray) ) ? (UNLINK => 0) : ();
    $tmp_fmt_file = File::Temp->new(
      TEMPLATE => "${table}_XXXXX",
      SUFFIX   => ".fmt",
      @temp_dir, @unlink,
    );
    $fmt_file = $tmp_fmt_file->filename();
    chmod(0664, $tmp_fmt_file);
    $tmp_fmt_file->close();
  }

  my $delim = $opts{Delimiter} || "|";
  my $row_delim = $opts{RowDelimiter} || "\n";

  # Need escaped text in fmt file
  # for CR/LF
  for ($delim,$row_delim) {
    s/\n/\\n/g;
    s/\r/\\r/g;
  }

  my @col_list = ( $opts{ColumnList} && @{$opts{ColumnList}} )
    ? @{$opts{ColumnList}}
    : @{$col_info->{LIST}};

  my $ncols = @col_list;
  open( my $fh, ">", $fmt_file ) or confess "Failed to open $fmt_file: $!";
  print $fh "10.0\n";
  print $fh "$ncols\n";

  my $col_map = $col_info->{MAP};
  for my $i (1..$ncols) {
    my $name = $col_list[$i-1];
    my $d = ( $i == $ncols ) ? $row_delim : $delim;
    my @row = ($i, 'SYBCHAR', 0);
    if ($is_filler{lc($name)}) {
      push @row, 0, qq["$d"], 0;
    } elsif ($is_db_column{lc($name)}) {
      my $info = $col_map->{lc($name)};

      # Native Sybase date format size is 26 though metadata says 23
      # For numbers, add extra for decimal
      my $size =
        ( $info->{TYPE_NAME} =~ /date/ ) ? 26
      : ( $info->{TYPE_NAME} =~ /char|text/ ) ? $info->{COLUMN_SIZE}
      : $info->{COLUMN_SIZE} + 1;
      push @row, $size, qq["$d"], $info->{ORDINAL_POSITION}, $name;
    } else { confess "$name is neither a db nor filler column" }
    print $fh join("\t", @row), "\n";
  }

  close $fh;


  # Also return temp object so it will not be cleaned up yet
  return
    wantarray ?  ($tmp_fmt_file, $fmt_file)
  : $tmp_fmt_file ? $tmp_fmt_file
  : $fmt_file;

}

sub bcp_out {
  my $self = shift;
  my @opts;
  if (ref $_[-1]) {
    @opts = pop @_;
  }

lib/DBIx/BulkUtil.pm  view on Meta::CPAN


  my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';

  my $dbh = $self->{DBH};

  my $stdin = $opts->{Stdin};
  @files = "$table.bcp" if !@files && !$stdin;

  my $has_stdin;
  for my $file (@files) {
    if ( $file eq "-" ) {
      $has_stdin++;
      next;
    }
    confess "BCP file $file does not exist" unless -f $file;
  }

  if ( $has_stdin && !$stdin ) {
    $stdin = \*STDIN;
  } elsif ( $stdin && !$has_stdin ) {
    push @files, "-";
  }

  # Save some work, skip load on empty file
  # Let sqlldr do a heavy handed truncate or delete
  # if that is the chosen action
  my @bcp_files = grep { $_ eq "-" or -s } @files;

  if ( !@bcp_files ) {
    if ( $action_opt eq 'A') {
      print "$files[0],... is empty. Skipping sqlldr\n";

      # Make any log file parsers happy
      print "0 Rows successfully loaded\n";
      return 0;
    }

    # Need some files if we run sqlldr
    @bcp_files = @files;
  }
  require File::Temp;

  my $constants = $opts->{Constants} || {};
  my %const = map { uc($_) => $constants->{$_} } keys %$constants;

  my $sizes = $opts->{CharSizes} || {};
  my %char_sizes = map { uc($_) => $sizes->{$_} } keys %$sizes;

  my $keep_temp = $opts->{KeepTempFiles} || $opts->{Debug};
  my $in_temp_dir = $opts->{TempDir}     || $opts->{Debug};
  my $temp_dir;
  $temp_dir = $opts->{TempDir} || "." if $in_temp_dir;

  my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
  my @unlink  = $keep_temp ? (UNLINK => 0) : ();
  my $ctl_fh = File::Temp->new(
    TEMPLATE => "${table}_XXXXX",
    SUFFIX   => ".ctl",
    @temp_dir, @unlink,
  );
  chmod(0664, $ctl_fh->filename());
  my $bad_fh = File::Temp->new(
    TEMPLATE => "${table}_XXXXX",
    SUFFIX   => ".bad",
    @temp_dir, @unlink,
  );
  chmod(0664, $bad_fh->filename());
  my $log_fh = File::Temp->new(
    TEMPLATE => "${table}_XXXXX",
    SUFFIX   => ".log",
    @temp_dir, @unlink,
  );
  chmod(0664, $log_fh->filename());
  my $prm_fh = $stdin ? File::Temp->new(
    TEMPLATE => "${table}_XXXXX",
    SUFFIX   => ".prm",
    @temp_dir,
  ) : undef;

  # NLS date format env variable does not work
  # for sqlldr.
  # So we must determine date fields and
  # specify the format in the control file.
  my $db = $self->{DBH}->{Name};
  my $user = $dbh->{Username};
  my ($schema, $tbl_name) = split /\./, uc($table);
  if (!$tbl_name) {
    $tbl_name = $schema;
    $schema = $self->curr_schema();
  }

  my $sth = $dbh->column_info(undef, $schema, $tbl_name, undef);
  my @info_names = @{$sth->{NAME_uc}};
  my %row; $sth->bind_columns(\@row{@info_names});
  my (@columns, %is_date, %char_sz, %is_lob);
  print "ColumnName  Type Size\n" if $opts->{Debug};
  print "----------------\n" if $opts->{Debug};
  while ($sth->fetch()) {
    push @columns, $row{COLUMN_NAME};
    print "$row{COLUMN_NAME}\t$row{TYPE_NAME}\t$row{COLUMN_SIZE}\n" if $opts->{Debug};
    $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : $row{COLUMN_SIZE} if $row{TYPE_NAME} =~ /CHAR/;
    $char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : 20_000_000, $is_lob{$row{COLUMN_NAME}} = 1 if $row{TYPE_NAME} =~ /TEXT|LOB|XML/;
    $is_date{$row{COLUMN_NAME}} = $1 if $row{TYPE_NAME} =~ /(DATE|TIMESTAMP)/;
  }
  confess("Table $schema.$tbl_name not found in database $db") unless @columns;

  # Find date formats in file, remove constants from column list
  my %date_fmt;
  my @file_columns = grep !defined($const{$_}),
    ( ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? ( map uc, @{$opts->{ColumnList}} ) : @columns );
  if (%is_date) {
    # We don't want to sample rows from stdin
    my @real_files = grep { $_ ne "-" } @files;
    %date_fmt = $self->date_masks_from_file( \@real_files, \@file_columns, \%is_date, $opts)
      if @real_files;
  }

  my $row_delim_str = $opts->{RowDelimiter} ? qq("str '$opts->{RowDelimiter}'"\n) : '';

  my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
  my $action = $action_map{$action_opt} || "APPEND";
  my $direct_load_pre  = '';
  my $direct_load_post = '';

  my $sqlldr_opts = '';
  my $max_errors = $opts->{MaxErrors} || 0;
  $sqlldr_opts .= "ERRORS=$max_errors";
  $sqlldr_opts .= ", SKIP=$opts->{Header}" if $opts->{Header};

  if ($opts->{DirectPath}) {
    my $parallel = ( uc($opts->{DirectPath}) eq 'P' ) ?  ", PARALLEL=TRUE" : '';
    $direct_load_pre = "OPTIONS(DIRECT=TRUE$parallel, ROWS=1000000, $sqlldr_opts)\nUNRECOVERABLE\n";
    $direct_load_post = "REENABLE DISABLED_CONSTRAINTS\n";



( run in 1.356 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )