Gerrit-Client
view release on metacpan or search on metacpan
lib/Gerrit/Client/ForEach.pm view on Meta::CPAN
#############################################################################
##
## Copyright (C) 2012-2014 Rohan McGovern <rohan@mcgovern.id.au>
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public
## License as published by the Free Software Foundation; either
## version 2.1 of the License, or (at your option) any later version.
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public
## License along with this library; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
##
##
#############################################################################
package Gerrit::Client::ForEach;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Util;
use Capture::Tiny qw(capture_merged);
use Data::Alias;
use Data::Dumper;
use English qw(-no_match_vars);
use File::chdir;
use Gerrit::Client;
use Scalar::Util qw(weaken);
# counter of how many connections we have per server
my %CONNECTION_COUNTER;
# counter of how many worker processes we have
my $WORKER_COUNTER;
# 1 when fetching into a gitdir
my %GITDIR_FETCHING;
sub _giturl_counter {
my ($giturl) = @_;
my $gerriturl = Gerrit::Client::_gerrit_parse_url($giturl)->{gerrit};
return \$CONNECTION_COUNTER{$gerriturl};
}
sub _debug_print {
return Gerrit::Client::_debug_print(@_);
}
sub _handle_for_each_event {
my ( $self, $event ) = @_;
return unless $event->{type} eq 'patchset-created';
if (my $wanted = $self->{args}{wanted}) {
if (!$wanted->( $event->{change}, $event->{patchSet})) {
return;
}
}
$self->_enqueue_event($event);
return $self->_dequeue_soon();
}
# Git command generators; these are methods so that they can be
# overridden for testing
sub _git_bare_clone_cmd
{
my (undef, $giturl, $gitdir) = @_;
return (@Gerrit::Client::GIT, 'clone', '--bare', $giturl, $gitdir);
}
sub _git_clone_cmd
{
my (undef, $giturl, $gitdir) = @_;
return (@Gerrit::Client::GIT, 'clone', $giturl, $gitdir);
}
sub _git_fetch_cmd
{
my (undef, $giturl, $gitdir, @refs) = @_;
return (@Gerrit::Client::GIT, '--git-dir', $gitdir, 'fetch', '-v', $giturl,
(map { "+$_:$_" } @refs));
}
sub _git_reset_cmd
{
my (undef, $ref, $mode) = @_;
lib/Gerrit/Client/ForEach.pm view on Meta::CPAN
$workdir ||=
File::Temp->newdir("$self->{args}{workdir}/$project/work.XXXXXX");
my $bare = !$self->{args}{git_work_tree};
return
unless $self->_ensure_cmd(
event => $event,
queue => $out,
name => 'git clone for workdir',
cmd => [ $bare
? $self->_git_bare_clone_cmd( $gitdir, $workdir )
: $self->_git_clone_cmd( $gitdir, $workdir ) ],
onlyif => sub { !-d( $bare ? "$workdir/objects" : "$workdir/.git") },
counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
);
return
unless $self->_ensure_cmd(
event => $event,
queue => $out,
name => 'git fetch for workdir',
cmd => [ $self->_git_fetch_cmd( 'origin', $bare ? $workdir : "$workdir/.git", $ref ) ],
wd => $workdir,
counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
);
return $self->_ensure_cmd(
event => $event,
queue => $out,
name => 'git reset for workdir',
cmd => [ $self->_git_reset_cmd( $ref, $bare ? '--soft' : '--hard' ) ],
wd => $workdir,
counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
);
}
sub _ensure_cmd {
my ( $self, %args ) = @_;
my $event = $args{event};
my $name = $args{name};
# capture output by default so that we can include it in error messages
if ( !exists( $args{saveoutput} ) ) {
$args{saveoutput} = 1;
}
my $donekey = "_cmd_${name}_done";
my $cvkey = "_cmd_${name}_cv";
my $statuskey = "_cmd_${name}_status";
my $outputkey = "_cmd_${name}_output";
return 1 if ( $event->{$donekey} );
my $onlyif = $args{onlyif} || sub { 1 };
my $queue = $args{queue};
my $weakself = $self;
weaken($weakself);
alias my $cmdcv = $event->{$cvkey};
my $cmd = $args{cmd};
my $cmdstr;
{
local $LIST_SEPARATOR = '] [';
$cmdstr = "[@{$cmd}]";
}
if ( !$cmdcv ) {
# not done and not started; only needs doing if 'onlyif' returns false
if ( !$onlyif->() ) {
$event->{$donekey} = 1;
return 1;
}
# Don't run the command if it counts as a connection and we'd have
# too many
my ($counter, $count_max) = @{ $args{counter} || [] };
my $uncounter;
if ($counter) {
if ( ($$counter||0) >= $count_max ) {
_debug_print(
"$cmdstr: delaying execution, would surpass limit\n");
push @{$queue}, $event;
return;
}
$$counter++;
$uncounter = guard { $$counter-- };
}
my $lock = $args{lock};
my $unlock;
if ($lock) {
if ($$lock) {
_debug_print(
"$cmdstr: delaying execution, lock held elsewhere\n");
push @{$queue}, $event;
return;
}
$$lock++;
$unlock = guard { $$lock-- };
}
my $printoutput = sub { _debug_print( "$cmdstr: ", @_ ) };
my $handleoutput = $printoutput;
if ( $args{saveoutput} ) {
$handleoutput = sub {
$printoutput->(@_);
$event->{$outputkey} .= $_[0] if $_[0];
};
}
my %run_cmd_args = (
'>' => $handleoutput,
'2>' => $handleoutput,
);
lib/Gerrit/Client/ForEach.pm view on Meta::CPAN
$cmdcv = AnyEvent::Util::run_cmd( $cmd, %run_cmd_args, );
$cmdcv->cb(
sub {
my ($cv) = @_;
undef $uncounter;
undef $unlock;
return unless $weakself;
my $status = $cv->recv();
if ( $status && !$args{allownonzero} ) {
$self->{args}{on_error}->( "$name exited with status $status\n"
. ( $event->{$outputkey} ? $event->{$outputkey} : q{} ) );
}
else {
$event->{$donekey} = 1;
}
$event->{$statuskey} = $status;
$weakself->_dequeue_soon();
}
);
push @{$queue}, $event;
return;
}
if ( !$cmdcv->ready ) {
push @{$queue}, $event;
return;
}
$self->{args}{on_error}->("dropped event due to failed command: $cmdstr\n");
return;
}
sub _do_cb_sub {
my ( $self, $sub, $event ) = @_;
my $returned;
my $run = sub {
local $CWD = $event->{_workdir};
$returned = $sub->( $event->{change}, $event->{patchSet} );
};
my $output;
if ($self->{args}{review}) {
$output = &capture_merged( $run );
} else {
$run->();
}
return {
returned => $returned,
output => $output
};
}
sub _do_cb_forksub {
my ( $self, $sub, $event, $queue ) = @_;
my $weakself = $self;
weaken($weakself);
if ($event->{_forksub_result}) {
return $event->{_forksub_result};
}
if ( $event->{_forked} ) {
push @{$queue}, $event;
return;
}
$event->{_forked} = 1;
&fork_call(
\&_do_cb_sub,
$self,
$sub,
$event,
sub {
return unless $weakself;
my ($result) = $_[0];
if (!$result) {
if ($@) {
$result = {output => $@};
} else {
$result = {output => $!};
}
}
$event->{_forksub_result} = $result;
$weakself->_dequeue_soon();
}
);
push @{$queue}, $event;
return;
}
sub _do_cb_cmd {
my ( $self, $cmd, $event, $out ) = @_;
return if ( $event->{_done} );
my $project = $event->{change}{project};
my $ref = $event->{patchSet}{ref};
if ( !$event->{_cmd} ) {
if ( $cmd && ref($cmd) eq 'CODE' ) {
$cmd = [ $cmd->( $event->{change}, $event->{patchSet} ) ];
}
$event->{_cmd} = $cmd;
local $LIST_SEPARATOR = '] [';
_debug_print "on_patchset_cmd for $project $ref: [@{$cmd}]\n";
}
return unless $self->_ensure_cmd(
event => $event,
queue => $out,
name => 'on_patchset_cmd',
cmd => $event->{_cmd},
wd => $event->{_workdir},
saveoutput => $self->{args}{review},
allownonzero => 1,
lib/Gerrit/Client/ForEach.pm view on Meta::CPAN
}
return unless $result;
if ($Gerrit::Client::DEBUG) {
_debug_print 'callback result: ' . Dumper($result);
}
# Ensure we shan't review it again
$self->_mark_commit_reviewed($event);
my $review = $self->{args}{review};
return unless $review;
if ( $review =~ m{\A\d+\z} ) {
$review = 'code_review';
}
if ( my $cb = $self->{args}{on_review} ) {
return
unless $cb->(
$event->{change}, $event->{patchSet},
$result->{output}, $result->{score},
$result->{returned}
);
}
if ( !$result->{output} && !$result->{score} && !$result->{returned}) {
# no review to be done
return;
}
my (%review_args) = (
message => $result->{output},
project => $event->{change}{project},
branch => $event->{change}{branch},
change => $event->{change}{id},
);
for my $arg (qw(ssh_url http_url http_auth_cb)) {
$review_args{$arg} = $self->{args}{$arg};
}
if ($result->{returned}) {
if (ref($result->{returned}) eq 'HASH') {
$review_args{reviewInput} = $result->{returned};
} else {
$review_args{$review} = $result->{returned};
}
}
Gerrit::Client::review(
$event->{patchSet}{revision},
%review_args
);
}
sub _dequeue_soon {
my ($self) = @_;
my $weakself = $self;
weaken($weakself);
$self->{_dequeue_timer} ||= AE::timer( .1, 0,
sub {
return unless $weakself;
delete $weakself->{_dequeue_timer};
$weakself->_dequeue();
}
);
}
sub _dequeue {
my ($self) = @_;
if ($Gerrit::Client::DEBUG) {
_debug_print 'queue before processing: ', Dumper( $self->{queue} );
}
my $weakself = $self;
weaken($weakself);
my @queue = @{ $self->{queue} || [] };
my @newqueue;
while (my $event = shift @queue) {
next unless $self->_ensure_git_cloned( $event, \@newqueue );
next unless $self->_ensure_git_fetched( $event, \@newqueue, \@queue );
next unless $self->_ensure_git_workdir_uptodate( $event, \@newqueue );
$self->_do_callback( $event, \@newqueue );
}
$self->{queue} = \@newqueue;
if ($Gerrit::Client::DEBUG) {
_debug_print 'queue after processing: ', Dumper( $self->{queue} );
}
return;
}
1;
( run in 0.971 second using v1.01-cache-2.11-cpan-39bf76dae61 )