DTA-CAB
view release on metacpan or search on metacpan
CAB/Socket.pm view on Meta::CPAN
return undef if (!defined($cfh));
return $s->newClient(fh=>$cfh);
}
return undef;
}
## $rc = $qs->handleClient($cli)
## $rc = $qs->handleClient($cli, %callbacks)
## + handle a single client request
## + each client request is a STRING message (command)
## - request arguments (if required) are sent as separate messages following the command request
## - server response (if any) depends on command sent
## + this method parses client request command $cmd and dispatches to
## - the function $callbacks{lc($cmd)}->($qs,$cli,\$cmd), if defined
## - the method $qs->can("handle_".lc($cmd))->($qs,$cli,\$cmd), if available
## - the function $callbacks{DEFAULT}->($qs,$cli,\$cmd), if defined
## - the method $qs->can("handle_DEFAULT")->($qs,$cli,\$cmd)
## + returns whatever the handler subroutine does
sub handleClient {
my ($qs,$cli,%callbacks) = @_;
my $creq = $cli->get();
$qs->vlog($qs->{logRequest}, "client request: $$creq");
if (!ref($creq) || ref($creq) ne 'SCALAR' || ref($$creq)) {
$qs->logconfess("could not parse client request");
}
my $cmd = lc($$creq);
my ($sub);
if (defined($sub=$callbacks{$cmd})) {
return $sub->($qs,$cli,$creq);
}
elsif (defined($sub=$qs->can("handle_${cmd}"))) {
return $sub->($qs,$cli,$creq);
}
elsif (defined($sub=$callbacks{DEFAULT})) {
return $sub->($qs,$cli,$creq);
}
elsif (defined($sub=$qs->can("handle_DEFAULT"))) {
return $sub->($qs,$cli,$creq);
}
##-- should never get here
$qs->logconfess("could not dispatch client request $$creq");
return undef;
}
##--------------------------------------------------------------
## Server Methods: Request Handling
## undef = $qs->handle_DEFAULT($cli,\$cmd)
## + default implementation just logcluck()s and returns undef
sub handle_DEFAULT {
$_[0]->logcluck("cannot handle client client request ${$_[2]}");
return undef;
}
##==============================================================================
## Protocol
## + all socket messages are of the form pack('NN/a*', $flags, $message_data)
## + $flags is a bitmask of DTA::CAB::Socket flags ($sf_* constants)
## + length element (second 'N' of pack format) is always 0 for serialized references
## + $message_data is one of the following:
## - if ($flags & $sf_ref) -> a reference written with nstore_fd(); will be decoded
## - elsif ($flags & $sf_u8) -> a UTF-8 encoded string; will be decoded
## - elsif ($flags & $sf_undef) -> a literal undef value
## - elsif ($flags & $sf_eoq) -> undef as end-of-queue marker
##--------------------------------------------------------------
## Protocol: Constants
our $sf_eoq = 0x1;
our $sf_undef = 0x2;
our $sf_u8 = 0x4;
our $sf_ref = 0x8;
##--------------------------------------------------------------
## Protocol: Write
## $s = $s->put_header($flags,$len)
## + write a message header to the socket
sub put_header {
$_[0]->vtrace("put_header", @_[1..$#_]);
syswrite($_[0]{fh}, pack('NN', @_[1,2]), 8)==8
or $_[0]->logconfess("put_header(): could not write message header to socket: $!");
return $_[0];
}
## $s = $s->put_data(\$data, $len)
## $s = $s->put_data( $data, $len)
## + write some raw data bytes to the socket (header should already have been sent)
sub put_data {
$_[0]->vtrace("put_data", @_[1..$#_]);
return if (!defined($_[0]));
use bytes;
my $ref = ref($_[1]) ? $_[1] : \$_[1];
my $len = defined($_[2]) ? $_[2] : length($$ref);
if ($len > 0) {
syswrite($_[0]{fh}, $$ref, $len)==$len
or $_[0]->logconfess("put_data(): could not write message data to socket: $!");
}
return $_[0];
}
## $s = $s->put_msg($flags,$len, $data)
## $s = $s->put_msg($flags,$len,\$data)
## + write a whole message to the socket
sub put_msg {
$_[0]->put_header(@_[1,2]) && $_[0]->put_data(@_[3,2]);
}
## $s = $s->put_ref($ref)
## + write a reference to the socket with Storable::nstore() (length written as 0)
sub put_ref {
$_[0]->vtrace("put_ref", @_[1..$#_]);
$_[0]->put_header( $sf_ref | (defined($_[1]) ? 0 : $sf_undef), 0 );
return $_[0] if (!defined($_[1]));
Storable::nstore_fd($_[1], $_[0]{fh})
or $_[0]->logconfess("put_ref(): nstore_fd() failed for $_[1]: $!");
return $_[0];
}
## $s = $s->put_str(\$str)
## $s = $s->put_str( $str)
## + write a raw string message to the socket
CAB/Socket.pm view on Meta::CPAN
## Protocol: Read
## $nbytes_read = $s->safe_sysread(\$bufr, $nbytes)
## + safe wrapper for CORE::sysread which avoids EINTR ("Interrupted system call") errors
sub safe_sysread {
my ($s,$bufr,$nbytes) = @_;
my ($rc);
my $nread = 0;
while ($nbytes > 0) {
if ( !($rc = CORE::sysread($s->{fh},$$bufr,$nbytes,$nread)) ) {
next if ($! == EINTR);
return undef; ##-- other error
}
$nbytes -= $rc;
$nread += $rc;
}
return $nread;
}
## ($flags,$len) = $s->get_header(); ##-- list context
## $header_packed = $s->get_header(); ##-- scalar context
## + gets header from socket
sub get_header {
$_[0]->vtrace("get_header", @_[1..$#_]);
my ($hdr);
#CORE::sysread($_[0]{fh}, $hdr, 8)==8
$_[0]->safe_sysread(\$hdr,8)==8
or $_[0]->logconfess("get_header(): could not read message header from socket: $!");
return wantarray ? unpack('NN',$hdr) : $hdr;
}
## \$buf = $s->get_data($len)
## \$buf = $s->get_data($len,\$buf)
## + reads $len bytes of data from the socket
sub get_data {
$_[0]->vtrace("get_data", @_[1..$#_]);
my ($s,$len,$bufr) = @_;
$bufr = \(my $buf) if (!defined($bufr));
$$bufr = undef;
if ($len > 0) {
#CORE::sysread($s->{fh}, $$bufr, $len)==$len
$s->safe_sysread($bufr, $len)==$len
or $s->logconfess("get_data(): could not read message of length=$len bytes from socket: $!");
}
return $bufr;
}
## $ref = $s->get_ref_data()
## + reads reference data from the socket with Storable::fd_retrieve()
## + header should already have been read
sub get_ref_data {
$_[0]->vtrace("get_ref_data", @_[1..$#_]);
return
Storable::fd_retrieve($_[0]{fh})
|| $_[0]->logconfess("get_ref_data(): fd_retrieve() failed");
}
## \$str_or_undef = $s->get_str_data($flags, $len)
## \$str_or_undef = $s->get_str_data($flags, $len, \$str)
## + reads string bytes from the socket (header should already have been read)
## + returned value is auto-magically decoded
sub get_str_data {
$_[0]->vtrace("get_str_data", @_[1..$#_]);
my $s = shift;
my $bufr = $s->get_data(@_[1,2]);
$$bufr = '' if (!defined($$bufr)); ##-- get_data() returns empty string as undef
utf8::decode($$bufr) if ($_[0] & $sf_u8);
return $bufr;
}
## $ref_or_undef = $s->get( ); ##-- SCALAR context, local buffer
## ($flags,$len,$ref) = $s->get( ); ##-- LIST context, local buffer
## $ref_or_undef = $s->get( \$buf ); ##-- SCALAR context, user buffer
## ($flags,$len,$ref) = $s->get( \$buf ); ##-- LIST context, user buffer
## + gets next message from the buffer
## + if passed, \$buf is used as a data buffer,
## - it will hold the string data actually read from the socket
## - in the case of string messages, \$buf is also the value returned
## - in the case of ref messages, \$buf is the serialized (nfreeze()) reference
## - for undef or end-of-queue messages, $$buf will be set to undef
sub get {
$_[0]->vtrace("get", @_[1..$#_]);
my ($s,$bufr) = @_;
my ($flags,$len) = $s->get_header();
if ($flags & ($sf_eoq | $sf_undef)) {
$bufr = \undef;
} elsif ($flags & $sf_ref) {
$bufr = $s->get_ref_data();
} else {
$bufr = $s->get_str_data($flags,$len,$bufr);
}
return wantarray ? ($flags,$len,$bufr) : $bufr;
}
1; ##-- be happy
__END__
( run in 0.564 second using v1.01-cache-2.11-cpan-39bf76dae61 )