Alvis-Saa
view release on metacpan or search on metacpan
lib/Alvis/Saa.pm view on Meta::CPAN
$this->{'conn_sel'}->remove($conn);
delete($this->{'conns'}->{"${host}_$port"});
shutdown($conn, 2);
close($conn);
return 1;
}
sub unlisten
{
my $this = shift;
my $port = shift;
if(!exists($this->{'servs'}->{$port}))
{
$this->{'err'} = "Not connected";
return 0;
}
my $serv = $this->{'servs'}->{$port};
$this->{'serv_sel'}->remove($serv->{'unix_sock'});
$this->{'serv_sel'}->remove($serv->{'inet_sock'});
shutdown($serv->{'unix_sock'}, 2);
shutdown($serv->{'inet_sock'}, 2);
close($serv->{'unix_sock'});
close($serv->{'inet_sock'});
unlink("$LOCALADDR_PREFIX$port");
delete($this->{'servs'}->{$port});
return 1;
}
sub connect
{
my $this = shift;
my $host = shift;
my $port = shift;
if(exists($this->{'conns'}->{"${host}_$port"}))
{
$this->{'err'} = "Already connected";
return 0;
}
my $cn =
{
'host' => $host,
'port' => $port,
'auto_arb' => 1,
};
my $addr = gethostbyname($host);
my $conn = undef;
# local socket handling is fundamentally broken, a saa-redesign is needed
# if($this->{'my_addr'} eq $addr) # try domain socket first
# {
# $conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
# Type => SOCK_STREAM,
# Timeout => 10);
# }
if(!defined($conn))
{
# $debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM)))
{
$debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
$this->{'err'} = "$@";
return 0;
}
}
else
{
$debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
}
binmode($conn, ":raw");
$cn->{'conn'} = $conn;
$this->{'conn_sel'}->add($conn);
$this->{'conns'}->{"${host}_$port"} = $cn;
return 1;
}
# 'auto_arb' => bool
sub conn_set
{
my $this = shift;
my $host = shift;
my $port = shift;
my %par = @_;
my $c = "${host}_$port";
if(!exists($this->{'conns'}->{$c}))
{
$this->{'err'} = "No such connection.";
return 0;
}
for(keys(%par))
{
$this->{'conns'}->{$c}->{$_} = $par{$_};
}
return 1;
}
# 'tag' => client name for the msg
# 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
# 'arb_name' => scalar
sub queue
{
my $this = shift;
my $host = shift;
lib/Alvis/Saa.pm view on Meta::CPAN
($ok, undef) = $this->process_write($sent, $timeout);
}
else
{
($ok, $pending) = $this->process_write($sent, $timeout);
}
if(!$ok)
{
# print STDERR "write sanoi nok\n";
return (0, $sent, $received, $pending);
}
# print STDERR "*saa accept\n";
# accept
if($pending)
{
($ok, undef) = $this->process_accept($timeout);
}
else
{
($ok, $pending) = $this->process_accept($timeout);
}
return ($ok, $sent, $received, $pending);
}
sub tana_msg_reply
{
my ($saa, $msg, $host, $port, $wait) = @_;
my $giveup_time = time() + $wait;
my $done = 0;
my $reply = undef;
my $ok = 1;
do
{
if(!$saa->queue($host, $port, $msg))
{
return (0, "Saa::queue() failed: " . $saa->{'err'});
}
my $received = [];
$ok = 1;
while(scalar(@$received) < 1 && (time() < $giveup_time) && $ok)
{
($ok, undef, $received, undef) = $saa->process(0.1);
if(!$ok)
{
return (0, "Saa::process() failed: " . $saa->{'err'});
}
}
if(scalar(@$received) > 0)
{
$reply = $received->[0]->{'msg'};
$done = 1;
}
} while((!$done) && (time() < $giveup_time));
if(!$done)
{
return (0, "Timeout.");
}
return ($ok, $reply);
}
sub tana_msg_send
{
my ($saa, $msg, $host, $port, $wait) = @_;
my $giveup_time = time() + $wait;
my $done = 0;
my $stat = undef;
my $sent = [];
my $ok = 1;
do
{
if(!$saa->queue($host, $port, $msg))
{
return (0, "Saa::queue() failed: " . $saa->{'err'});
}
$ok = 1;
$sent = [];
while(scalar(@$sent) < 1 && (time() < $giveup_time) && $ok)
{
($ok, $sent, undef, undef) = $saa->process(0.1);
if(!$ok)
{
return (0, "Saa::process() failed: " . $saa->{'err'});
}
}
if(scalar(@$sent) > 0)
{
$done = 1;
}
} while((!$done) && (time() < $giveup_time));
if(!$done)
{
return (0, "Timeout.");
}
return ($ok, $sent->[0]->{'status'});
}
1;
__END__
=head1 NAME
Alvis::Saa - Perl extension for communicating over the Tana protocol
=head1 SYNOPSIS
use Alvis::Saa;
my $saa=Alvis::Saa->new();
# Build a Tana message
my %MSG=('command'=>'call',
'object'=>'tfidf',
'function'=>'query',
'max-results'=>10,
'snippets'=>'',
'accuracy'=>0.9,
'query-string'=>"some query");
$saa->queue($host, $port, $msg,
arb_name => undef, arb => undef) || die($saa->{'err'} . "\n");
my($ok, $sent, $received, $pending);
$received = []; $sent = [];
while(scalar(@$sent) < 1)
{
($ok, $sent, $received, $pending) = $saa->process(10);
$ok || die($saa->{'err'} . "\n");
}
if($wait)
{
while(scalar(@$received) < 1)
{
($ok, $sent, $received, $pending) = $saa->process(10);
$ok || die($saa->{'err'} . "\n");
}
# do something with the results
}
=head1 DESCRIPTION
Provides a set of methods for sending and receiving Tana messages.
=head1 METHODS
=head2 new()
( run in 0.339 second using v1.01-cache-2.11-cpan-ec4f86ec37b )