DBIx-BulkUtil
view release on metacpan or search on metacpan
lib/DBIx/BulkUtil.pm view on Meta::CPAN
if ( $dir eq 'in' ) {
my $mode = $opts{Action} || "A";
if ( $mode eq 'T' ) {
my $sql = "TRUNCATE TABLE $bcp_table";
print "Executing: $sql\n";
$dbh->do($sql);
} elsif ($mode eq 'R') {
$self->delete($bcp_table, '', $commit_size);
}
confess "BCP file $file does not exist" unless -f $file;
# Save some work
# checking underscore ok, we just did -f above
unless ( -s _ ) {
print "$file is empty. Skipping bcp\n";
# Make any log file parsers happy
print "0 rows copied\n";
return 0;
}
# All this to decide whether or not to use '-E'
# Only use '-E' if there is an identity column
# And GenerateId is false
unless ( $opts{GenerateId} ) {
my $col_info = $self->column_info($table);
my $col_map = $col_info->{MAP};
if ($col_map) {
for my $c ( values %$col_map ) {
++$id_cnt and last if $c->{TYPE_NAME} =~ /identity/;
}
}
}
}
my ($action,$to_from) = ($dir eq 'in') ? ('Loading', 'from') : ('Exporting', 'to');
print "$action $server/$bcp_table $to_from $file\n";
my (@max_err_opt, @commit_opt, @header_opt, @id_opt);
my $max_err_cnt = $opts{MaxErrors} || 0;
if ( $dir eq 'in' ) {
@max_err_opt = (-m => $max_err_cnt);
@commit_opt = (-b => $commit_size);
@header_opt = (-F => $opts{Header}+1) if $opts{Header};
@id_opt = "-E" if $id_cnt;
}
my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
my $in_temp_dir = $opts{TempDir} || $opts{Debug};
my $temp_dir;
$temp_dir = $opts{TempDir} || "." if $in_temp_dir;
require File::Temp;
my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
my @unlink = $keep_temp ? (UNLINK => 0) : ();
my $error_file = File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".err",
@temp_dir, @unlink,
);
chmod(0664, $error_file->filename());
$error_file->close();
my @packet_size = $opts{PacketSize} ? ( -A => $opts{PacketSize} ) : ();
my @passthru = $opts{PassThru} ? @{$opts{PassThru}} : ();
my ( $fmt_file, $tmp_fmt_file );
if ( $opts{FormatFile} ) {
$fmt_file = $opts{FormatFile};
} elsif ( ( $opts{ColumnList} && $opts{ColumnList} ) || ( $opts{Filler} && @{$opts{Filler}} ) ) {
($tmp_fmt_file,$fmt_file) = $self->mk_fmt_file(
Table => $table,
Delimiter => $delimiter,
RowDelimiter => $row_delimiter,
ColumnList => $opts{ColumnList},
Filler => $opts{Filler},
TempDir => $opts{TempDir},
FormatFileName => $opts{FormatFileName},
KeepTempFiles => $keep_temp,
);
}
my @fmt_file_opt = $fmt_file ? ( -f => $fmt_file ) : '-c';
# UTF-8 doesn't work on HP - default is roman8 on HP
# Should probably make '-J' some kind of option, with maybe
# a map of OS types and default values. But leave that for
# a later date.
my @cmd = ( bcp => $bcp_table, $dir, $file,
-U => $user,
#-J => "utf8",
-S => $server,
-t => $delimiter,
-r => $row_delimiter,
-e => $error_file->filename(),
@header_opt,
@id_opt,
@commit_opt,
@max_err_opt,
@packet_size,
@passthru,
@fmt_file_opt,
);
print "Executing: @cmd\n";
push @cmd, -P => $self->{PASSWORD};
open(my $fh, "-|", @cmd) or confess "Can't exec bcp: $!";
my ($rows, $failed, $partially_failed);
local ($_, $.);
my $err_cnt = my $c_lib_err_cnt = my $srvr_err_cnt = 0;
while (<$fh>) {
print;
if ( /^(Server|C[TS]LIB) Message/ ) {
my $msg_type = $1;
if ( $msg_type eq 'CSLIB' ) {
if ( m|/N(\d+)| ) {
# Sybase says truncation is not an error, so we will too
# Or else we might get > 1 error on the same row
unless ( $1 == 36 ) {
$err_cnt++;
$c_lib_err_cnt++;
lib/DBIx/BulkUtil.pm view on Meta::CPAN
confess "BCP error - bcp failed [$exit_stat]: $!"
if $failed and !$partially_failed;
} else {
confess "BCP error - bcp returned status $exit_stat: $!";
}
}
confess "BCP error - bcp recieved signal $exit_sig" if $exit_sig > 0;
confess "BCP error - bcp coredumped" if $exit_core;
}
# Will miss error count exceeded error on 10.x
# But will catch other errors if load is aborted
# Or no rows are loaded.
confess "BCP error - no rows copied" if !defined($rows);
# CTLIB errors do not cause non-zero exit - so catch them here
confess "BCP error - max error count ($max_err_cnt) exceeded" if $err_cnt > $max_err_cnt;
$rows ||= 0;
return $rows;
}
{
no warnings 'once';
*bcp = \&bcp_in;
}
sub mk_fmt_file {
my $self = shift;
my %opts = @_;
my $table = $opts{Table} || die "Table required for mk_fmt_file";
my $col_info = $self->column_info($table);
my $db_col_list = $col_info->{LIST};
my %is_db_column;
$is_db_column{$_}++ for @$db_col_list;
my %is_filler;
if ( $opts{Filler} ) {
$is_filler{lc($_)}++ for @{$opts{Filler}};
}
my ($tmp_fmt_file,$fmt_file);
if ( $opts{FormatFileName} ) {
$fmt_file = $opts{FormatFileName};
} else {
require File::Temp;
my $keep_temp = $opts{KeepTempFiles} || $opts{Debug};
my $in_temp_dir = $opts{TempDir} || $opts{Debug};
my $temp_dir;
$temp_dir = $opts{TempDir} || "." if $in_temp_dir;
my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
my @unlink = ( $keep_temp || !defined(wantarray) ) ? (UNLINK => 0) : ();
$tmp_fmt_file = File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".fmt",
@temp_dir, @unlink,
);
$fmt_file = $tmp_fmt_file->filename();
chmod(0664, $tmp_fmt_file);
$tmp_fmt_file->close();
}
my $delim = $opts{Delimiter} || "|";
my $row_delim = $opts{RowDelimiter} || "\n";
# Need escaped text in fmt file
# for CR/LF
for ($delim,$row_delim) {
s/\n/\\n/g;
s/\r/\\r/g;
}
my @col_list = ( $opts{ColumnList} && @{$opts{ColumnList}} )
? @{$opts{ColumnList}}
: @{$col_info->{LIST}};
my $ncols = @col_list;
open( my $fh, ">", $fmt_file ) or confess "Failed to open $fmt_file: $!";
print $fh "10.0\n";
print $fh "$ncols\n";
my $col_map = $col_info->{MAP};
for my $i (1..$ncols) {
my $name = $col_list[$i-1];
my $d = ( $i == $ncols ) ? $row_delim : $delim;
my @row = ($i, 'SYBCHAR', 0);
if ($is_filler{lc($name)}) {
push @row, 0, qq["$d"], 0;
} elsif ($is_db_column{lc($name)}) {
my $info = $col_map->{lc($name)};
# Native Sybase date format size is 26 though metadata says 23
# For numbers, add extra for decimal
my $size =
( $info->{TYPE_NAME} =~ /date/ ) ? 26
: ( $info->{TYPE_NAME} =~ /char|text/ ) ? $info->{COLUMN_SIZE}
: $info->{COLUMN_SIZE} + 1;
push @row, $size, qq["$d"], $info->{ORDINAL_POSITION}, $name;
} else { confess "$name is neither a db nor filler column" }
print $fh join("\t", @row), "\n";
}
close $fh;
# Also return temp object so it will not be cleaned up yet
return
wantarray ? ($tmp_fmt_file, $fmt_file)
: $tmp_fmt_file ? $tmp_fmt_file
: $fmt_file;
}
sub bcp_out {
my $self = shift;
my @opts;
if (ref $_[-1]) {
@opts = pop @_;
}
lib/DBIx/BulkUtil.pm view on Meta::CPAN
my $partition = ( $table =~ s/:(\w+)$// ) ? $1 : '';
my $dbh = $self->{DBH};
my $stdin = $opts->{Stdin};
@files = "$table.bcp" if !@files && !$stdin;
my $has_stdin;
for my $file (@files) {
if ( $file eq "-" ) {
$has_stdin++;
next;
}
confess "BCP file $file does not exist" unless -f $file;
}
if ( $has_stdin && !$stdin ) {
$stdin = \*STDIN;
} elsif ( $stdin && !$has_stdin ) {
push @files, "-";
}
# Save some work, skip load on empty file
# Let sqlldr do a heavy handed truncate or delete
# if that is the chosen action
my @bcp_files = grep { $_ eq "-" or -s } @files;
if ( !@bcp_files ) {
if ( $action_opt eq 'A') {
print "$files[0],... is empty. Skipping sqlldr\n";
# Make any log file parsers happy
print "0 Rows successfully loaded\n";
return 0;
}
# Need some files if we run sqlldr
@bcp_files = @files;
}
require File::Temp;
my $constants = $opts->{Constants} || {};
my %const = map { uc($_) => $constants->{$_} } keys %$constants;
my $sizes = $opts->{CharSizes} || {};
my %char_sizes = map { uc($_) => $sizes->{$_} } keys %$sizes;
my $keep_temp = $opts->{KeepTempFiles} || $opts->{Debug};
my $in_temp_dir = $opts->{TempDir} || $opts->{Debug};
my $temp_dir;
$temp_dir = $opts->{TempDir} || "." if $in_temp_dir;
my @temp_dir = $in_temp_dir ? (DIR => $temp_dir) : ();
my @unlink = $keep_temp ? (UNLINK => 0) : ();
my $ctl_fh = File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".ctl",
@temp_dir, @unlink,
);
chmod(0664, $ctl_fh->filename());
my $bad_fh = File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".bad",
@temp_dir, @unlink,
);
chmod(0664, $bad_fh->filename());
my $log_fh = File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".log",
@temp_dir, @unlink,
);
chmod(0664, $log_fh->filename());
my $prm_fh = $stdin ? File::Temp->new(
TEMPLATE => "${table}_XXXXX",
SUFFIX => ".prm",
@temp_dir,
) : undef;
# NLS date format env variable does not work
# for sqlldr.
# So we must determine date fields and
# specify the format in the control file.
my $db = $self->{DBH}->{Name};
my $user = $dbh->{Username};
my ($schema, $tbl_name) = split /\./, uc($table);
if (!$tbl_name) {
$tbl_name = $schema;
$schema = $self->curr_schema();
}
my $sth = $dbh->column_info(undef, $schema, $tbl_name, undef);
my @info_names = @{$sth->{NAME_uc}};
my %row; $sth->bind_columns(\@row{@info_names});
my (@columns, %is_date, %char_sz, %is_lob);
print "ColumnName Type Size\n" if $opts->{Debug};
print "----------------\n" if $opts->{Debug};
while ($sth->fetch()) {
push @columns, $row{COLUMN_NAME};
print "$row{COLUMN_NAME}\t$row{TYPE_NAME}\t$row{COLUMN_SIZE}\n" if $opts->{Debug};
$char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : $row{COLUMN_SIZE} if $row{TYPE_NAME} =~ /CHAR/;
$char_sz{$row{COLUMN_NAME}} = exists($char_sizes{$row{COLUMN_NAME}}) ? $char_sizes{$row{COLUMN_NAME}} : 20_000_000, $is_lob{$row{COLUMN_NAME}} = 1 if $row{TYPE_NAME} =~ /TEXT|LOB|XML/;
$is_date{$row{COLUMN_NAME}} = $1 if $row{TYPE_NAME} =~ /(DATE|TIMESTAMP)/;
}
confess("Table $schema.$tbl_name not found in database $db") unless @columns;
# Find date formats in file, remove constants from column list
my %date_fmt;
my @file_columns = grep !defined($const{$_}),
( ( $opts->{ColumnList} && @{$opts->{ColumnList}} ) ? ( map uc, @{$opts->{ColumnList}} ) : @columns );
if (%is_date) {
# We don't want to sample rows from stdin
my @real_files = grep { $_ ne "-" } @files;
%date_fmt = $self->date_masks_from_file( \@real_files, \@file_columns, \%is_date, $opts)
if @real_files;
}
my $row_delim_str = $opts->{RowDelimiter} ? qq("str '$opts->{RowDelimiter}'"\n) : '';
my $delimiter = $opts->{Delimiter} || $self->{DELIMITER};
my $action = $action_map{$action_opt} || "APPEND";
my $direct_load_pre = '';
my $direct_load_post = '';
my $sqlldr_opts = '';
my $max_errors = $opts->{MaxErrors} || 0;
$sqlldr_opts .= "ERRORS=$max_errors";
$sqlldr_opts .= ", SKIP=$opts->{Header}" if $opts->{Header};
if ($opts->{DirectPath}) {
my $parallel = ( uc($opts->{DirectPath}) eq 'P' ) ? ", PARALLEL=TRUE" : '';
$direct_load_pre = "OPTIONS(DIRECT=TRUE$parallel, ROWS=1000000, $sqlldr_opts)\nUNRECOVERABLE\n";
$direct_load_post = "REENABLE DISABLED_CONSTRAINTS\n";
( run in 1.356 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )