Gearman

 view release on metacpan or  search on metacpan

lib/Gearman/Util.pm  view on Meta::CPAN

    my $num = shift;
    my $c   = $cmd{$num};
    return $c ? $c->[1] : undef;
}

=head2 pack_req_command($key, $arg)

B<return> request string

=cut

sub pack_req_command {
    return _pack_command("REQ", @_);
}

=head2 pack_res_command($cmd, $arg)

B<return> response string

=cut

sub pack_res_command {
    return _pack_command("RES", @_);
}

=head2 read_res_packet($sock, $err_ref, $timeout)

B<return> undef on closed socket or malformed packet

=cut

sub read_res_packet {
    warn " Entering read_res_packet" if DEBUG;
    my $sock       = shift;
    my $err_ref    = shift;
    my $timeout    = shift;
    my $time_start = Time::HiRes::time();
    unless (Scalar::Util::blessed($sock)) {
        # for the sake of Gearman::Client::Async
        # see https://github.com/p-alik/perl-Gearman/issues/37
        (ref($sock) eq "GLOB") || die "provided value is not a blessed object";
        ($$sock && $$sock eq '*Gearman::Worker::$sock')
            || die
            "provided value is not a GLOB of type Gearman::Worker::\$sock";
    } ## end unless (Scalar::Util::blessed...)

    my $err = sub {
        my $code = shift;
        Scalar::Util::blessed($sock) && $sock->close() if $sock->connected;
        $$err_ref = $code if ref $err_ref;
        return undef;
    };

    $sock->blocking(0);

    my $is = IO::Select->new($sock);

    my $readlen   = 12;
    my $offset    = 0;
    my $buf       = '';
    my $using_ssl = $sock->isa("IO::Socket::SSL");

    my ($magic, $type, $len);

    warn " Starting up event loop\n" if DEBUG;
    while (1) {
        if ($using_ssl && $sock->pending()) {
            warn "  We have @{[ $sock->pending() ]}  bytes...\n" if DEBUG;
        }
        else {
            my $time_remaining = undef;
            if (defined $timeout) {
                warn "  We have a timeout of $timeout\n" if DEBUG;
                $time_remaining = $time_start + $timeout - Time::HiRes::time();
                return $err->("timeout") if $time_remaining < 0;
            }

            $is->can_read($time_remaining) || next;
        } ## end else [ if ($using_ssl && $sock...)]
        warn "   Entering read loop\n" if DEBUG;

        my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
        if (!defined($ok)) {
            next;
        }
        elsif ($ok == 0) {
            return $err->($err_code);
        }

        if (!defined $type) {
            next unless length($buf) >= 12;
            my $header = substr($buf, 0, 12, '');
            ($magic, $type, $len) = unpack("a4NN", $header);
            return $err->("malformed_magic: '$magic'") unless $magic eq "\0RES";
            my $starting = length($buf);
            $readlen = $len - $starting;
            $offset  = $starting;

            if ($readlen) {
                my ($ok, $err_code)
                    = _read_sock($sock, \$buf, \$readlen, \$offset);
                if (!defined($ok)) {
                    next;
                }
                elsif ($ok == 0) {
                    return $err->($err_code);
                }
            } ## end if ($readlen)
        } ## end if (!defined $type)

        $type = $cmd{$type};
        return $err->("bogus_command") unless $type;
        return $err->("bogus_command_type") unless index($type->[0], "O") != -1;

        warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
            if DEBUG;

        $sock->blocking(1);

        return {
            type    => $type->[1],



( run in 1.024 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )