Alvis-Saa
view release on metacpan or search on metacpan
lib/Alvis/Saa.pm view on Meta::CPAN
package Alvis::Saa;
$Alvis::Saa::VERSION = '0.2';
use strict;
use Alvis::Tana;
# use Data::Dumper;
use Sys::Hostname;
use IO::Socket;
use IO::Select;
use Fcntl;
my $LOCALADDR_PREFIX = "/var/tmp/searchrpc_localsoc_";
my $debug = 0;
######################################################################
#
# Public methods
#
###################################################################
sub new
{
my ($this) = @_;
my $class = ref($this) || $this;
my $my_addr = gethostbyname(hostname());
if(!defined($my_addr))
{
return undef;
}
$this = {
'servs' => {},
'serv_sel' => IO::Select->new(),
'conns' => {},
'conn_sel' => IO::Select->new(),
'ip_clis' => {},
'my_addr' => $my_addr,
'err' => '',
'queue' => [],
};
bless $this, $class;
$SIG{'PIPE'} = 'IGNORE';
return $this;
}
sub err
{
my $this = shift;
return $this->{'err'};
}
# 'auto_arb' => bool # Autoread arb messages?
# 'callback' => [func, params]
sub listen
{
my $this = shift;
my $port = shift;
my %par = @_;
if(exists($this->{'servs'}->{$port}))
{
$this->{'err'} = "Already listening";
return 0;
}
my $serv =
{
lib/Alvis/Saa.pm view on Meta::CPAN
close($conn);
return 1;
}
sub unlisten
{
my $this = shift;
my $port = shift;
if(!exists($this->{'servs'}->{$port}))
{
$this->{'err'} = "Not connected";
return 0;
}
my $serv = $this->{'servs'}->{$port};
$this->{'serv_sel'}->remove($serv->{'unix_sock'});
$this->{'serv_sel'}->remove($serv->{'inet_sock'});
shutdown($serv->{'unix_sock'}, 2);
shutdown($serv->{'inet_sock'}, 2);
close($serv->{'unix_sock'});
close($serv->{'inet_sock'});
unlink("$LOCALADDR_PREFIX$port");
delete($this->{'servs'}->{$port});
return 1;
}
sub connect
{
my $this = shift;
my $host = shift;
my $port = shift;
if(exists($this->{'conns'}->{"${host}_$port"}))
{
$this->{'err'} = "Already connected";
return 0;
}
my $cn =
{
'host' => $host,
'port' => $port,
'auto_arb' => 1,
};
my $addr = gethostbyname($host);
my $conn = undef;
# local socket handling is fundamentally broken, a saa-redesign is needed
# if($this->{'my_addr'} eq $addr) # try domain socket first
# {
# $conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
# Type => SOCK_STREAM,
# Timeout => 10);
# }
if(!defined($conn))
{
# $debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM)))
{
$debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
$this->{'err'} = "$@";
return 0;
}
}
else
{
$debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
}
binmode($conn, ":raw");
$cn->{'conn'} = $conn;
$this->{'conn_sel'}->add($conn);
$this->{'conns'}->{"${host}_$port"} = $cn;
return 1;
}
# 'auto_arb' => bool
sub conn_set
{
my $this = shift;
my $host = shift;
my $port = shift;
my %par = @_;
my $c = "${host}_$port";
if(!exists($this->{'conns'}->{$c}))
{
$this->{'err'} = "No such connection.";
return 0;
}
for(keys(%par))
{
$this->{'conns'}->{$c}->{$_} = $par{$_};
}
return 1;
}
# 'tag' => client name for the msg
# 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
# 'arb_name' => scalar
sub queue
{
my $this = shift;
my $host = shift;
my $port = shift;
my $msg = shift;
my %par = @_;
my $q_elem = {
'host' => $host,
'port' => $port,
'msg' => $msg
};
if(exists($par{'arb'}))
{
$q_elem->{'arb'} = $par{'arb'};
$q_elem->{'arb_name'} = $par{'arb_name'};
}
lib/Alvis/Saa.pm view on Meta::CPAN
}
# print STDERR "scheduled req: " . Dumper($q_elem);
push(@{$this->{'queue'}}, $q_elem);
return 1;
}
sub process_accept
{
my $this = shift;
my $timeout = shift;
$timeout=10;
my @servs = keys(%{$this->{'servs'}});
my @reads = $this->{'serv_sel'}->can_read($timeout);
# print "Riidit: " . Dumper(\@reads) . "\n";
my $conn;
foreach $conn (@reads)
{
my $serv;
my $found = 0;
for(@servs)
{
if($this->{'servs'}->{$_}->{'inet_sock'} == $conn ||
$this->{'servs'}->{$_}->{'unix_sock'} == $conn)
{
$serv = $this->{'servs'}->{$_};
$found = 1;
last;
}
}
my $client = $conn->accept();
# print "Conn " . Dumper($conn);
if(!defined($client))
{
# print STDERR "PRKL: $!\n";
next;
}
my $str_ip;
my $port;
#for some reason sockdomain returns undef
# if(AF_INET == $client->sockdomain)
# {
my $sockaddr = $client->peername();
my $iaddr;
($port, $iaddr) = sockaddr_in($sockaddr);
$str_ip = inet_ntoa($iaddr);
# print STDERR "Saa: accept found port $port and ip $str_ip\n";
# }
# else # AF_UNIX
# {
# my $sn = $client->sockname();
# $sn =~ /$LOCALADDR_PREFIX([0-9]+)/;
# $port = $1;
# $str_ip = inet_ntoa($this->{'my_addr'});
# $debug && print STDERR "Saa::process_accept(): AF_UNIX connection with ip $str_ip port $port\n";
# }
my $cn =
{
'host' => $str_ip,
'port' => $port,
'conn' => $client,
'lport' => $serv->{'port'},
};
$serv->{'auto_arb'} && ($cn->{'auto_arb'} = $serv->{'auto_arb'});
$serv->{'callback'} && ($cn->{'callback'} = $serv->{'callback'});
$this->{'conns'}->{"${str_ip}_$port"} = $cn;
$this->{'conn_sel'}->add($client);
}
return (1, 0);
}
sub process_write
{
my $this = shift;
my $sent = shift;
my $q = $this->{'queue'};
my %banned = (); # makes sure the order of messages for the same connection is kept
my $offset = 0;
while($offset < scalar(@$q))
{
my $qe = $q->[$offset];
# ensure connection
if(! $this->connected($qe->{'host'}, $qe->{'port'}))
{
# print STDERR "Write connects to $qe->{'host'} $qe->{'port'}\n";
if(!$this->connect($qe->{'host'}, $qe->{'port'}))
{
# print STDERR "Write is not connected to $qe->{'host'} $qe->{'port'}: $@\n";
$this->{'err'} = $@;
$qe->{'status'} = "failed";
shift(@$q);
push(@$sent, $qe);
return (0, scalar(@$q));
}
}
my $connstr = $qe->{'host'} . "_" . $qe->{'port'};
my $conn = $this->{'conns'}->{$connstr}->{'conn'};
#see writability if not known
if($banned{$connstr})
{
$offset++;
next; # earlier message in q already unsent to this connstr
}
if(scalar(IO::Select->new($conn)->can_write(0)))
lib/Alvis/Saa.pm view on Meta::CPAN
my $arb_type = 0;
# print STDERR "saa: reading msg from " . $conns[$i] . " / " . fileno($conns[$i]) . "\n";
my $msg = Alvis::Tana::read($conn, \$arb_type);
# warn "Saa process_read(): Alvis::Tana::read() gave msg",Dumper($msg);
if(!defined($msg))
{
$this->{'err'} = Alvis::Tana::error($conn);
if(scalar(@conns) > scalar(@$received))
{
$pending = 1;
}
$this->disconnect($cnkey);
next;
}
my $entry =
{
'msg' => $msg,
'type' => 'fix',
'host' => $cn->{'host'},
'port' => $cn->{'port'},
'conn' => $conn,
};
if(defined($arb_type))
{
$entry->{'type'} = 'arb';
$entry->{'arb_name'} = $arb_type;
if(exists($cn->{'auto_arb'}) && ($cn->{'auto_arb'}))
{
my $eom = 0;
my $arb = '';
while(!$eom)
{
my $ext = Alvis::Tana::read_arb($entry->{'conn'}, 1024000, \$eom);
if(!defined($ext))
{
$this->{'err'} = "Error auto-reading arb: " . Alvis::Tana::error($entry->{'conn'});
if(scalar(@conns) > scalar(@$received))
{
$pending = 1;
}
return (0, $pending);
}
$arb .= $ext;
}
$entry->{'arb'} = $arb;
}
}
if($cn->{'callback'})
{
my $cb = $cn->{'callback'};
my $func = undef;
my @param = ();
$debug && print STDERR "Callback = ", ref($cb), "\n";
if(ref($cb) eq 'CODE')
{
$func = $cb;
}
else
{
@param = @$cb;
$func = shift(@param);
}
$debug && print STDERR "Func cb ref = ", ref($func), "\n";
$func->($this, $entry, @param);
}
else
{
push(@$received, $entry);
}
}
return (1, 0);
}
sub process
{
my $this = shift;
my $timeout = shift;
$timeout=10;
my $received = [];
my $sent = [];
#cleanup
for (keys(%{$this->{'conns'}}))
{
my $c = $this->{'conns'}->{$_}->{'conn'};
if((!$c->connected()))
{
print STDERR "Reaping $_\n";
my($host, $port) = split("_", $_);
$this->disconnect($host, $port);
}
}
# read from conns
# print STDERR "*saa read\n";
my ($ok, $pending) = $this->process_read($received, $timeout);
if(!$ok)
{
print STDERR "read sanoi nok\n";
return (0, $sent, $received, $pending);
}
# print STDERR "*saa write\n";
# write queue
if($pending)
{
($ok, undef) = $this->process_write($sent, $timeout);
}
else
{
($ok, $pending) = $this->process_write($sent, $timeout);
}
if(!$ok)
{
# print STDERR "write sanoi nok\n";
return (0, $sent, $received, $pending);
}
# print STDERR "*saa accept\n";
( run in 2.022 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )