DBIx-BatchChunker

 view release on metacpan or  search on metacpan

lib/DBIx/BatchChunker.pm  view on Meta::CPAN

        }
        else {
            # Bulk work (or DML)
            if ($self->dbic_storage) {
                $self->_dbic_block_runner( run => sub {
                    $self->loop_state->_mark_chunk_timer;  # reset timer on retries

                    my $sth = $self->dbic_storage->dbh->prepare(@prepare_args);
                    $sth->execute(@execute_args);

                    $self->coderef->($self, $sth) if $self->coderef;
                });
            }
            else {
                $conn->run(sub {
                    $self->loop_state->_mark_chunk_timer;  # reset timer on retries

                    my $sth = $_->prepare(@prepare_args);
                    $sth->execute(@execute_args);

                    $self->coderef->($self, $sth) if $self->coderef;
                });
            }
        }
    }
    elsif (defined $rs && $coderef) {
        ### ResultSet with coderef

        if ($self->single_rows) {
            # Transactional work
            $self->_dbic_block_runner( txn => sub {
                # reset timer/$rs on retries
                $self->loop_state->_mark_chunk_timer;
                $chunk_rs->reset;

                while (my $row = $chunk_rs->next) { $self->coderef->($self, $row) }
            });
        }
        else {
            # Bulk work
            $self->_dbic_block_runner( run => sub {
                # reset timer/$rs on retries
                $self->loop_state->_mark_chunk_timer;
                $chunk_rs->reset;

                $self->coderef->($self, $chunk_rs);
            });
        }
    }
    else {
        ### Something a bit more free-form

        $self->$coderef($ls->start, $ls->end);
    }

    return 1;
}

#pod =head2 _process_past_max_checker
#pod
#pod Checks to make sure the current endpoint is actually the end, by checking the database.
#pod Its return value determines whether the block should be processed or not.
#pod
#pod See L</process_past_max>.
#pod
#pod =cut

sub _process_past_max_checker {
    my ($self) = @_;
    my $ls = $self->loop_state;
    my $progress = $ls->progress_bar;

    return 1 unless $self->process_past_max;
    return 1 unless $ls->end >= $self->max_id;

    # No checks for DIY, if they didn't include a max_stmt
    unless (defined $self->rsc || $self->max_stmt) {
        # There's no way to size this, so add one more chunk
        $ls->end($self->max_id + $ls->chunk_size);
        return 1;
    }

    # Run another MAX check
    $progress->message('Reached end; re-checking max ID') if $self->verbose;
    my $new_max_id;
    if (defined( my $rsc = $self->rsc )) {
        $self->_dbic_block_runner( run => sub {
            $new_max_id = $rsc->max;
        });
    }
    elsif ($self->dbic_storage) {
        $self->_dbic_block_runner( run => sub {
            ($new_max_id) = $self->dbic_storage->dbh->selectrow_array(@{ $self->max_stmt });
        });
    }
    else {
        ($new_max_id) = $self->dbi_connector->run(sub {
            $_->selectrow_array(@{ $self->max_stmt });
        });
    }
    $ls->_mark_chunk_timer;  # the above query shouldn't impact runtimes

    # Convert $new_max_id if necessary
    $new_max_id = Math::BigInt->new($new_max_id) if $self->_check_bignums($new_max_id);

    if (!$new_max_id || $new_max_id eq '0E0') {
        # No max: No affected rows to change
        $progress->message('No max ID found; nothing left to process...') if $self->verbose;
        $ls->end($self->max_id);

        $ls->prev_check('no max');
        return 0;
    }
    elsif ($new_max_id > $self->max_id) {
        # New max ID
        $progress->message( sprintf 'New max ID set from %s to %s', $self->max_id, $new_max_id ) if $self->verbose;
        $self->max_id($new_max_id);
        $progress->target( $new_max_id - $self->min_id + 1 );
        $progress->update( $progress->last_update );
    }
    elsif ($new_max_id == $self->max_id) {

lib/DBIx/BatchChunker.pm  view on Meta::CPAN

        $ls->prev_check('skipped rows');
        return 0;
    }
    elsif ($ls->end - $ls->start <= 0) {
        # Down to a single ID: We _have_ to process it
        $ls->prev_check('at a single ID');

        # Complain, because this can be dangerous with a wild enough Row:ID ratio
        if ($ls->chunk_count > 1) {
            $progress->message('WARNING: Processing a single ID with many rows attached because resizing cannot proceed any further.');
            $progress->message('Consider flipping the relationship so that IDs and row counts are 1:1.');
        }

        return 1;
    }
    elsif ($chunk_percent > 1 + $self->min_chunk_percent) {
        # Too many rows: Backtrack to the previous range and try to bisect
        $self->_print_chunk_status('shrunk');
        $ls->_mark_chunk_timer;
        $ls->_decrease_multiplier;
        $ls->prev_check('too many rows');
        return 0;
    }
    elsif ($self->target_time && $count_check_time > $self->target_time * 1.05) {
        # COUNT statement too slow: Backtrack to the previous range and try to bisect

        # This is a rare failure, so print a warning
        my $integer = $self->cldr->decimal_formatter;
        my $decimal = $self->cldr->decimal_formatter(
            minimum_fraction_digits => 2,
            maximum_fraction_digits => 2,
        );
        $progress->message( sprintf(
            'WARNING: COUNT statement was too slow; took %5s sec to return %s rows.',
            $decimal->format($count_check_time),
            $integer->format( $ls->chunk_count )
        ) );

        $self->_print_chunk_status('shrunk');
        $ls->_mark_chunk_timer;
        $ls->_decrease_multiplier;
        $ls->prev_check('COUNT too slow');
        return 0;
    }

    # The above four are more important than skipping the count checks.  Better to
    # have too few rows than too many.  The single ID check prevents infinite loops
    # from bisecting, though.

    elsif ($ls->checked_count > 10) {
        # Checked too many times: Just process it
        $ls->prev_check('too many checks');
        return 1;
    }
    elsif ($ls->end >= $self->max_id) {
        # At the end: Just process it
        $ls->prev_check('at max_id');
        return 1;
    }
    elsif ($chunk_percent < $self->min_chunk_percent) {
        # Too few rows: Keep the start ID and accelerate towards a better endpoint
        $self->_print_chunk_status('expanded');
        $ls->_mark_chunk_timer;
        $ls->_increase_multiplier;
        $ls->prev_check('too few rows');
        return 0;
    }

    $ls->prev_check('nothing wrong');
    return 1;
}

#pod =head2 _runtime_checker
#pod
#pod Stores the previously processed chunk's runtime, and then adjusts C<chunk_size> as
#pod necessary.
#pod
#pod See L</target_time>.
#pod
#pod =cut

sub _runtime_checker {
    my ($self) = @_;
    my $ls = $self->loop_state;
    return unless $self->target_time;
    return unless $ls->chunk_size && $ls->prev_runtime;  # prevent DIV/0

    my $timings = $ls->last_timings;

    my $new_timing = {
        runtime     => $ls->prev_runtime,
        chunk_count => $ls->chunk_count || $ls->chunk_size,
    };
    $new_timing->{chunk_per} = $new_timing->{chunk_count} / $ls->chunk_size;

    # Rowtime: a measure of how much of the chunk_size actually impacted the runtime
    $new_timing->{rowtime} = $new_timing->{runtime} / $new_timing->{chunk_per};

    # Store the last five processing times
    push @$timings, $new_timing;
    shift @$timings if @$timings > 5;

    # Figure out the averages and adjustment factor
    my $ttl = scalar @$timings;
    my $avg_rowtime   = sum(map { $_->{rowtime} } @$timings) / $ttl;
    my $adjust_factor = $self->target_time / $avg_rowtime;

    my $new_target_chunk_size = $ls->chunk_size;
    my $adjective;
    if    ($adjust_factor > 1.05) {
        # Too fast: Raise the chunk size

        return unless $ttl >= 5;                                          # must have a full set of timings
        return if any { $_->{runtime} >= $self->target_time } @$timings;  # must ALL have low runtimes

        $new_target_chunk_size *= min(2, $adjust_factor);  # never more than double
        $adjective = 'fast';
    }
    elsif ($adjust_factor < 0.95) {
        # Too slow: Lower the chunk size

lib/DBIx/BatchChunker.pm  view on Meta::CPAN

argument set, this method calculates the min/max IDs of those objects.  It fills in the
L</min_id> and L</max_id> attributes, based on the ID data, and then returns 1.

If either of the min/max statements don't return any ID data, this method will return 0.

=head2 execute

    my $batch_chunker = DBIx::BatchChunker->new(
        # ...other attributes for calculate_ranges...

        dbi_connector => $conn,          # DBIx::Connector::Retry object
        stmt          => $do_stmt,       # INSERT/UPDATE/DELETE $stmt with BETWEEN placeholders
        ### OR ###
        dbi_connector => $conn,          # DBIx::Connector::Retry object
        stmt          => $select_stmt,   # SELECT $stmt with BETWEEN placeholders
        count_stmt    => $count_stmt,    # SELECT COUNT $stmt to be used for min_chunk_percent; optional
        coderef       => $coderef,       # called code that does the actual work
        ### OR ###
        rs      => $account_rs,          # base ResultSet, which gets filtered with -between later on
        id_name => 'account_id',         # can be looked up if not provided
        coderef => $coderef,             # called code that does the actual work
        ### OR ###
        coderef => $coderef,             # DIY database work; just pass the $start/$end IDs

        ### Optional but recommended ###
        sleep             => 0.25, # number of seconds to sleep each chunk; defaults to 0
        process_past_max  => 1,    # use this if processing the whole table
        single_rows       => 1,    # does $coderef get a single $row or the whole $chunk_rs / $stmt
        min_chunk_percent => 0.25, # minimum row count of chunk size percentage; defaults to 0.5 (or 50%)
        target_time       => 5,    # target runtime for dynamic chunk size scaling; default is 5 seconds
        max_runtime       => 12 * 60 * 60, # stop processing after 12 hours

        progress_name => 'Updating Accounts',  # easier than creating your own progress_bar

        ### Optional ###
        progress_bar     => $progress,  # defaults to "Processing $source_name" bar
        verbose          => 1,          # displays timing stats on each chunk
    );

    $batch_chunker->execute if $batch_chunker->calculate_ranges;

Applies the configured DB changes in chunks.  Runs through the loop, processing a
statement handle, ResultSet, and/or coderef as it goes.  Each loop iteration processes a
chunk of work, determined by L</chunk_size>.

The L</calculate_ranges> method should be run first to fill in L</min_id> and L</max_id>.
If either of these are missing, the function will assume L</calculate_ranges> couldn't
find them and warn about it.

More details can be found in the L</Processing Modes> and L</ATTRIBUTES> sections.

=head1 PRIVATE METHODS

=head2 _process_block

Runs the DB work and passes it to the coderef.  Its return value determines whether the
block should be processed or not.

=head2 _process_past_max_checker

Checks to make sure the current endpoint is actually the end, by checking the database.
Its return value determines whether the block should be processed or not.

See L</process_past_max>.

=head2 _chunk_count_checker

Checks the chunk count to make sure it's properly sized.  If not, it will try to shrink
or expand the current chunk (in C<chunk_size> increments) as necessary.  Its return value
determines whether the block should be processed or not.

See L</min_chunk_percent>.

This is not to be confused with the L</_runtime_checker>, which adjusts C<chunk_size>
after processing, based on previous run times.

=head2 _runtime_checker

Stores the previously processed chunk's runtime, and then adjusts C<chunk_size> as
necessary.

See L</target_time>.

=head2 _increment_progress

Increments the progress bar.

=head2 _print_chunk_status

Prints out a standard chunk status line, if L</verbose> is enabled.  What it prints is
generally uniform, but it depends on the processing action.  Most of the data is
pulled from L</loop_state>.

=head1 CAVEATS

=head2 Big Number Support

If the module detects that the ID numbers are no longer safe for standard Perl NV
storage, it will automatically switch to using L<Math::BigInt> and L<Math::BigFloat> for
big number support.  If any blessed numbers are already being used to define the
attributes, this will also switch on the support.

=head2 String-based IDs

If you're working with C<VARCHAR> types or other string-based IDs to represent integers,
these may be subject to whatever string-based comparison rules your RDBMS uses when
calculating with C<MIN>/C<MAX> or using C<BETWEEN>.  Row counting and chunk size scaling
will try to compensate, but will be mixing string-based comparisons from the RDBMS and
Perl-based integer math.

Using the C<CAST> function may help, but it may also cause critical indexes to be
ignored, especially if the function is used on the left-hand side against the column.
Strings with the exact same length may be safe from comparison weirdness, but YMMV.

Non-integer inputs from ID columns, such as GUIDs or other alphanumeric strings, are not
currently supported.  They would have to be converted to integers via SQL, and doing so
may run into a similar risk of having your RDBMS ignore indexes.

=head1 SEE ALSO

L<DBIx::BulkLoader::Mysql>, L<DBIx::Class::BatchUpdate>, L<DBIx::BulkUtil>



( run in 0.652 second using v1.01-cache-2.11-cpan-13bb782fe5a )