Gerrit-Client

 view release on metacpan or  search on metacpan

lib/Gerrit/Client/ForEach.pm  view on Meta::CPAN

#############################################################################
##
## Copyright (C) 2012-2014 Rohan McGovern <rohan@mcgovern.id.au>
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public
## License as published by the Free Software Foundation; either
## version 2.1 of the License, or (at your option) any later version.
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
## Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public
## License along with this library; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
##
##
#############################################################################

package Gerrit::Client::ForEach;
use strict;
use warnings;

use AnyEvent;
use AnyEvent::Util;
use Capture::Tiny qw(capture_merged);
use Data::Alias;
use Data::Dumper;
use English qw(-no_match_vars);
use File::chdir;
use Gerrit::Client;
use Scalar::Util qw(weaken);

# counter of how many connections we have per server
my %CONNECTION_COUNTER;

# counter of how many worker processes we have
my $WORKER_COUNTER;

# 1 when fetching into a gitdir
my %GITDIR_FETCHING;

sub _giturl_counter {
  my ($giturl) = @_;
  my $gerriturl = Gerrit::Client::_gerrit_parse_url($giturl)->{gerrit};
  return \$CONNECTION_COUNTER{$gerriturl};
}

sub _debug_print {
  return Gerrit::Client::_debug_print(@_);
}

sub _handle_for_each_event {
  my ( $self, $event ) = @_;

  return unless $event->{type} eq 'patchset-created';

  if (my $wanted = $self->{args}{wanted}) {
    if (!$wanted->( $event->{change}, $event->{patchSet})) {
      return;
    }
  }

  $self->_enqueue_event($event);

  return $self->_dequeue_soon();
}

# Git command generators; these are methods so that they can be
# overridden for testing
sub _git_bare_clone_cmd
{
  my (undef, $giturl, $gitdir) = @_;
  return (@Gerrit::Client::GIT, 'clone', '--bare', $giturl, $gitdir);
}

sub _git_clone_cmd
{
  my (undef, $giturl, $gitdir) = @_;
  return (@Gerrit::Client::GIT, 'clone', $giturl, $gitdir);
}

sub _git_fetch_cmd
{
  my (undef, $giturl, $gitdir, @refs) = @_;
  return (@Gerrit::Client::GIT, '--git-dir', $gitdir, 'fetch', '-v', $giturl,
          (map { "+$_:$_" } @refs));
}

sub _git_reset_cmd
{
  my (undef, $ref, $mode) = @_;

lib/Gerrit/Client/ForEach.pm  view on Meta::CPAN


  $workdir ||=
    File::Temp->newdir("$self->{args}{workdir}/$project/work.XXXXXX");

  my $bare = !$self->{args}{git_work_tree};

  return
    unless $self->_ensure_cmd(
    event => $event,
    queue => $out,
    name  => 'git clone for workdir',
    cmd => [ $bare
               ? $self->_git_bare_clone_cmd( $gitdir, $workdir )
               : $self->_git_clone_cmd( $gitdir, $workdir ) ],
    onlyif => sub { !-d( $bare ? "$workdir/objects" : "$workdir/.git") },
    counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
    );

  return
    unless $self->_ensure_cmd(
    event => $event,
    queue => $out,
    name  => 'git fetch for workdir',
    cmd => [ $self->_git_fetch_cmd( 'origin', $bare ? $workdir : "$workdir/.git", $ref ) ],
    wd    => $workdir,
    counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
  );

  return $self->_ensure_cmd(
    event => $event,
    queue => $out,
    name  => 'git reset for workdir',
    cmd => [ $self->_git_reset_cmd( $ref, $bare ? '--soft' : '--hard' ) ],
    wd    => $workdir,
    counter => [ \$WORKER_COUNTER, $Gerrit::Client::MAX_FORKS ],
  );
}

sub _ensure_cmd {
  my ( $self, %args ) = @_;

  my $event = $args{event};
  my $name  = $args{name};

  # capture output by default so that we can include it in error messages
  if ( !exists( $args{saveoutput} ) ) {
    $args{saveoutput} = 1;
  }

  my $donekey   = "_cmd_${name}_done";
  my $cvkey     = "_cmd_${name}_cv";
  my $statuskey = "_cmd_${name}_status";
  my $outputkey = "_cmd_${name}_output";

  return 1 if ( $event->{$donekey} );

  my $onlyif = $args{onlyif} || sub { 1 };
  my $queue = $args{queue};

  my $weakself = $self;
  weaken($weakself);

  alias my $cmdcv = $event->{$cvkey};
  my $cmd = $args{cmd};
  my $cmdstr;
  {
    local $LIST_SEPARATOR = '] [';
    $cmdstr = "[@{$cmd}]";
  }

  if ( !$cmdcv ) {

    # not done and not started; only needs doing if 'onlyif' returns false
    if ( !$onlyif->() ) {
      $event->{$donekey} = 1;
      return 1;
    }

    # Don't run the command if it counts as a connection and we'd have
    # too many
    my ($counter, $count_max) = @{ $args{counter} || [] };
    my $uncounter;
    if ($counter) {
      if ( ($$counter||0) >= $count_max ) {
        _debug_print(
          "$cmdstr: delaying execution, would surpass limit\n");
        push @{$queue}, $event;
        return;
      }
      $$counter++;
      $uncounter = guard { $$counter-- };
    }

    my $lock = $args{lock};
    my $unlock;
    if ($lock) {
      if ($$lock) {
        _debug_print(
          "$cmdstr: delaying execution, lock held elsewhere\n");
        push @{$queue}, $event;
        return;
      }
      $$lock++;
      $unlock = guard { $$lock-- };
    }

    my $printoutput = sub { _debug_print( "$cmdstr: ", @_ ) };
    my $handleoutput = $printoutput;

    if ( $args{saveoutput} ) {
      $handleoutput = sub {
        $printoutput->(@_);
        $event->{$outputkey} .= $_[0] if $_[0];
      };
    }

    my %run_cmd_args = (
      '>'  => $handleoutput,
      '2>' => $handleoutput,
    );

lib/Gerrit/Client/ForEach.pm  view on Meta::CPAN


    $cmdcv = AnyEvent::Util::run_cmd( $cmd, %run_cmd_args, );
    $cmdcv->cb(
      sub {
        my ($cv) = @_;
        undef $uncounter;
        undef $unlock;
        return unless $weakself;

        my $status = $cv->recv();
        if ( $status && !$args{allownonzero} ) {
          $self->{args}{on_error}->( "$name exited with status $status\n"
              . ( $event->{$outputkey} ? $event->{$outputkey} : q{} ) );
        }
        else {
          $event->{$donekey} = 1;
        }
        $event->{$statuskey} = $status;
        $weakself->_dequeue_soon();
      }
    );
    push @{$queue}, $event;
    return;
  }

  if ( !$cmdcv->ready ) {
    push @{$queue}, $event;
    return;
  }

  $self->{args}{on_error}->("dropped event due to failed command: $cmdstr\n");
  return;
}

sub _do_cb_sub {
  my ( $self, $sub, $event ) = @_;

  my $returned;
  my $run = sub {
    local $CWD = $event->{_workdir};
    $returned = $sub->( $event->{change}, $event->{patchSet} );
  };

  my $output;
  if ($self->{args}{review}) {
    $output = &capture_merged( $run );
  } else {
    $run->();
  }

  return {
    returned => $returned,
    output => $output
  };
}

sub _do_cb_forksub {
  my ( $self, $sub, $event, $queue ) = @_;

  my $weakself = $self;
  weaken($weakself);

  if ($event->{_forksub_result}) {
    return $event->{_forksub_result};
  }

  if ( $event->{_forked} ) {
    push @{$queue}, $event;
    return;
  }

  $event->{_forked} = 1;
  &fork_call(
    \&_do_cb_sub,
    $self,
    $sub,
    $event,
    sub {
      return unless $weakself;

      my ($result) = $_[0];
      if (!$result) {
        if ($@) {
          $result = {output => $@};
        } else {
          $result = {output => $!};
        }
      }
      $event->{_forksub_result} = $result;
      $weakself->_dequeue_soon();
    }
  );
  push @{$queue}, $event;
  return;
}

sub _do_cb_cmd {
  my ( $self, $cmd, $event, $out ) = @_;

  return if ( $event->{_done} );

  my $project = $event->{change}{project};
  my $ref     = $event->{patchSet}{ref};

  if ( !$event->{_cmd} ) {
    if ( $cmd && ref($cmd) eq 'CODE' ) {
      $cmd = [ $cmd->( $event->{change}, $event->{patchSet} ) ];
    }
    $event->{_cmd} = $cmd;
    local $LIST_SEPARATOR = '] [';
    _debug_print "on_patchset_cmd for $project $ref: [@{$cmd}]\n";
  }

  return unless $self->_ensure_cmd(
    event => $event,
    queue => $out,
    name  => 'on_patchset_cmd',
    cmd   => $event->{_cmd},
    wd    => $event->{_workdir},
    saveoutput => $self->{args}{review},
    allownonzero => 1,

lib/Gerrit/Client/ForEach.pm  view on Meta::CPAN

  }

  return unless $result;

  if ($Gerrit::Client::DEBUG) {
    _debug_print 'callback result: ' . Dumper($result);
  }

  # Ensure we shan't review it again
  $self->_mark_commit_reviewed($event);

  my $review = $self->{args}{review};
  return unless $review;

  if ( $review =~ m{\A\d+\z} ) {
    $review = 'code_review';
  }

  if ( my $cb = $self->{args}{on_review} ) {
    return
      unless $cb->(
      $event->{change},  $event->{patchSet},
      $result->{output}, $result->{score},
      $result->{returned}
      );
  }

  if ( !$result->{output} && !$result->{score} && !$result->{returned}) {
    # no review to be done
    return;
  }

  my (%review_args) = (
    message => $result->{output},
    project => $event->{change}{project},
    branch  => $event->{change}{branch},
    change  => $event->{change}{id},
  );

  for my $arg (qw(ssh_url http_url http_auth_cb)) {
    $review_args{$arg} = $self->{args}{$arg};
  }

  if ($result->{returned}) {
    if (ref($result->{returned}) eq 'HASH') {
      $review_args{reviewInput} = $result->{returned};
    } else {
      $review_args{$review} = $result->{returned};
    }
  }

  Gerrit::Client::review(
    $event->{patchSet}{revision},
    %review_args
  );
}

sub _dequeue_soon {
  my ($self) = @_;
  my $weakself = $self;
  weaken($weakself);
  $self->{_dequeue_timer} ||= AE::timer( .1, 0,
                                         sub {
                                           return unless $weakself;
                                           delete $weakself->{_dequeue_timer};
                                           $weakself->_dequeue();
                                         }
                                       );
}

sub _dequeue {
  my ($self) = @_;

  if ($Gerrit::Client::DEBUG) {
    _debug_print 'queue before processing: ', Dumper( $self->{queue} );
  }

  my $weakself = $self;
  weaken($weakself);

  my @queue = @{ $self->{queue} || [] };
  my @newqueue;
  while (my $event = shift @queue) {
    next unless $self->_ensure_git_cloned( $event, \@newqueue );
    next unless $self->_ensure_git_fetched( $event, \@newqueue, \@queue );
    next unless $self->_ensure_git_workdir_uptodate( $event, \@newqueue );
    $self->_do_callback( $event, \@newqueue );
  }

  $self->{queue} = \@newqueue;

  if ($Gerrit::Client::DEBUG) {
    _debug_print 'queue after processing: ', Dumper( $self->{queue} );
  }

  return;
}

1;



( run in 0.971 second using v1.01-cache-2.11-cpan-39bf76dae61 )