Gearman
view release on metacpan or search on metacpan
lib/Gearman/Util.pm view on Meta::CPAN
my $num = shift;
my $c = $cmd{$num};
return $c ? $c->[1] : undef;
}
=head2 pack_req_command($key, $arg)
B<return> request string
=cut
sub pack_req_command {
return _pack_command("REQ", @_);
}
=head2 pack_res_command($cmd, $arg)
B<return> response string
=cut
sub pack_res_command {
return _pack_command("RES", @_);
}
=head2 read_res_packet($sock, $err_ref, $timeout)
B<return> undef on closed socket or malformed packet
=cut
sub read_res_packet {
warn " Entering read_res_packet" if DEBUG;
my $sock = shift;
my $err_ref = shift;
my $timeout = shift;
my $time_start = Time::HiRes::time();
unless (Scalar::Util::blessed($sock)) {
# for the sake of Gearman::Client::Async
# see https://github.com/p-alik/perl-Gearman/issues/37
(ref($sock) eq "GLOB") || die "provided value is not a blessed object";
($$sock && $$sock eq '*Gearman::Worker::$sock')
|| die
"provided value is not a GLOB of type Gearman::Worker::\$sock";
} ## end unless (Scalar::Util::blessed...)
my $err = sub {
my $code = shift;
Scalar::Util::blessed($sock) && $sock->close() if $sock->connected;
$$err_ref = $code if ref $err_ref;
return undef;
};
$sock->blocking(0);
my $is = IO::Select->new($sock);
my $readlen = 12;
my $offset = 0;
my $buf = '';
my $using_ssl = $sock->isa("IO::Socket::SSL");
my ($magic, $type, $len);
warn " Starting up event loop\n" if DEBUG;
while (1) {
if ($using_ssl && $sock->pending()) {
warn " We have @{[ $sock->pending() ]} bytes...\n" if DEBUG;
}
else {
my $time_remaining = undef;
if (defined $timeout) {
warn " We have a timeout of $timeout\n" if DEBUG;
$time_remaining = $time_start + $timeout - Time::HiRes::time();
return $err->("timeout") if $time_remaining < 0;
}
$is->can_read($time_remaining) || next;
} ## end else [ if ($using_ssl && $sock...)]
warn " Entering read loop\n" if DEBUG;
my ($ok, $err_code) = _read_sock($sock, \$buf, \$readlen, \$offset);
if (!defined($ok)) {
next;
}
elsif ($ok == 0) {
return $err->($err_code);
}
if (!defined $type) {
next unless length($buf) >= 12;
my $header = substr($buf, 0, 12, '');
($magic, $type, $len) = unpack("a4NN", $header);
return $err->("malformed_magic: '$magic'") unless $magic eq "\0RES";
my $starting = length($buf);
$readlen = $len - $starting;
$offset = $starting;
if ($readlen) {
my ($ok, $err_code)
= _read_sock($sock, \$buf, \$readlen, \$offset);
if (!defined($ok)) {
next;
}
elsif ($ok == 0) {
return $err->($err_code);
}
} ## end if ($readlen)
} ## end if (!defined $type)
$type = $cmd{$type};
return $err->("bogus_command") unless $type;
return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
warn " Fully formed res packet, returning; type=$type->[1] len=$len\n"
if DEBUG;
$sock->blocking(1);
return {
type => $type->[1],
( run in 1.024 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )