Alvis-Saa

 view release on metacpan or  search on metacpan

lib/Alvis/Saa.pm  view on Meta::CPAN

    $this->{'conn_sel'}->remove($conn);
    delete($this->{'conns'}->{"${host}_$port"});

    shutdown($conn, 2);
    close($conn);

    return 1;
}


sub unlisten
{
    my $this = shift;
    my $port = shift;

    if(!exists($this->{'servs'}->{$port}))
    {
	$this->{'err'} = "Not connected";
	return 0;
    }

    my $serv = $this->{'servs'}->{$port};
    $this->{'serv_sel'}->remove($serv->{'unix_sock'});
    $this->{'serv_sel'}->remove($serv->{'inet_sock'});
    shutdown($serv->{'unix_sock'}, 2);
    shutdown($serv->{'inet_sock'}, 2);
    close($serv->{'unix_sock'});
    close($serv->{'inet_sock'});
    unlink("$LOCALADDR_PREFIX$port");
    delete($this->{'servs'}->{$port});

    return 1;
}

sub connect
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    if(exists($this->{'conns'}->{"${host}_$port"}))
    {
	$this->{'err'} = "Already connected";
	return 0;
    }

    my $cn = 
    {
	'host' => $host,
	'port' => $port,
	'auto_arb' => 1,
    };

    my $addr = gethostbyname($host);
    my $conn = undef;
# local socket handling is fundamentally broken, a saa-redesign is needed
#    if($this->{'my_addr'} eq $addr) # try domain socket first
#    {
#	$conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
#				      Type => SOCK_STREAM,
#				      Timeout => 10);
#    }
    if(!defined($conn))
    {
#	$debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
	if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
					   PeerPort => $port,
					   Proto => "tcp",
					   Type => SOCK_STREAM)))
	{
	    $debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
	    $this->{'err'} = "$@";
	    return 0;
	}
    }
    else
    {
	$debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
    }

    binmode($conn, ":raw");

    $cn->{'conn'} = $conn;
    $this->{'conn_sel'}->add($conn);
    $this->{'conns'}->{"${host}_$port"} = $cn;

    return 1;
}

# 'auto_arb' => bool
sub conn_set
{
    my $this = shift;
    my $host = shift;
    my $port = shift;

    my %par = @_;

    my $c = "${host}_$port";
    if(!exists($this->{'conns'}->{$c}))
    {
	$this->{'err'} = "No such connection.";
	return 0;
    }

    for(keys(%par))
    {
	$this->{'conns'}->{$c}->{$_} = $par{$_};
    }
    
    return 1;
}


# 'tag' => client name for the msg
# 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
# 'arb_name' => scalar
sub queue
{
    my $this = shift;
    my $host = shift;

lib/Alvis/Saa.pm  view on Meta::CPAN

	($ok, undef) = $this->process_write($sent, $timeout);
    }
    else
    {
	($ok, $pending) = $this->process_write($sent, $timeout);
    }
    if(!$ok)
    {
#	print STDERR "write sanoi nok\n";
	return (0, $sent, $received, $pending);
    }

#    print STDERR "*saa accept\n";
    # accept
    if($pending)
    {
	($ok, undef) = $this->process_accept($timeout);
    }
    else
    {
	($ok, $pending) = $this->process_accept($timeout);
    }

    return ($ok, $sent, $received, $pending);
}

sub tana_msg_reply
{
    my ($saa, $msg, $host, $port, $wait) = @_;

    my $giveup_time = time() + $wait;
    my $done = 0;
    my $reply = undef;
    my $ok = 1;
    do
    {
	if(!$saa->queue($host, $port, $msg))
	{
	    return (0, "Saa::queue() failed: " . $saa->{'err'}); 
	}

	my $received = [];
	$ok = 1;
	while(scalar(@$received) < 1 && (time() < $giveup_time) && $ok)
	{
	    ($ok, undef, $received, undef) = $saa->process(0.1);
	    if(!$ok)
	    {
		return (0, "Saa::process() failed: " . $saa->{'err'});
	    }
	}
	if(scalar(@$received) > 0)
	{
	    $reply = $received->[0]->{'msg'};
	    $done = 1;
	}
    } while((!$done) && (time() < $giveup_time));

    if(!$done)
    {
	return (0, "Timeout.");
    }

    return ($ok, $reply);
}

sub tana_msg_send
{
    my ($saa, $msg, $host, $port, $wait) = @_;

    my $giveup_time = time() + $wait;
    my $done = 0;
    my $stat = undef;
    my $sent = [];
    my $ok = 1;
    do
    {
	if(!$saa->queue($host, $port, $msg))
	{
	    return (0, "Saa::queue() failed: " . $saa->{'err'}); 
	}
	$ok = 1;
	$sent = [];
	while(scalar(@$sent) < 1 && (time() < $giveup_time) && $ok)
	{
	    ($ok, $sent, undef, undef) = $saa->process(0.1);
	    if(!$ok)
	    {
		return (0, "Saa::process() failed: " . $saa->{'err'});
	    }
	}
	if(scalar(@$sent) > 0)
	{
	    $done = 1;
	}
    } while((!$done) && (time() < $giveup_time));

    if(!$done)
    {
	return (0, "Timeout.");
    }

    return ($ok, $sent->[0]->{'status'});
}



1;

__END__

=head1 NAME

Alvis::Saa - Perl extension for communicating over the Tana protocol

=head1 SYNOPSIS

 use Alvis::Saa;

 my $saa=Alvis::Saa->new();
 
 # Build a Tana message
 my %MSG=('command'=>'call',
          'object'=>'tfidf',
          'function'=>'query',
          'max-results'=>10,
          'snippets'=>'',
          'accuracy'=>0.9,
          'query-string'=>"some query");

  $saa->queue($host, $port, $msg,
              arb_name => undef, arb => undef) || die($saa->{'err'} . "\n");

  my($ok, $sent, $received, $pending);
  $received = []; $sent = [];
  while(scalar(@$sent) < 1)
  {
     ($ok, $sent, $received, $pending) = $saa->process(10);
     $ok || die($saa->{'err'} . "\n");
  }
  if($wait)
  {
     while(scalar(@$received) < 1)
     {
        ($ok, $sent, $received, $pending) = $saa->process(10);
        $ok || die($saa->{'err'} . "\n");
     }

     # do something with the results
  }


=head1 DESCRIPTION

Provides a set of methods for sending and receiving Tana messages.

=head1 METHODS

=head2 new()



( run in 0.339 second using v1.01-cache-2.11-cpan-ec4f86ec37b )