Alvis-Saa
view release on metacpan or search on metacpan
lib/Alvis/Saa.pm view on Meta::CPAN
'queue' => [],
};
bless $this, $class;
$SIG{'PIPE'} = 'IGNORE';
return $this;
}
sub err
{
my $this = shift;
return $this->{'err'};
}
# 'auto_arb' => bool # Autoread arb messages?
# 'callback' => [func, params]
sub listen
{
my $this = shift;
my $port = shift;
my %par = @_;
if(exists($this->{'servs'}->{$port}))
{
$this->{'err'} = "Already listening";
return 0;
}
my $serv =
{
'port' => $port,
'auto_arb' => 0,
};
if(exists($par{'callback'}))
{
$serv->{'callback'} = $par{'callback'};
}
if(exists($par{'auto_arb'}))
{
$serv->{'auto_arb'} = $par{'auto_arb'};
}
my $inet_sock = IO::Socket::INET->new(LocalPort => $port,
Type => SOCK_STREAM,
Reuse => 1,
Listen => 10);
if(!defined($inet_sock))
{
$this->{'err'} = "$@";
return 0;
}
# print STDERR "Soketti on $LOCALADDR_PREFIX$port\n";
unlink "$LOCALADDR_PREFIX$port";
my $unix_sock = IO::Socket::UNIX->new(Local => "$LOCALADDR_PREFIX$port",
Type => SOCK_STREAM,
Listen => 10);
if(!defined($unix_sock))
{
$this->{'err'} = "$@";
close($inet_sock);
return 0;
}
binmode($inet_sock, ":raw");
binmode($unix_sock, ":raw");
$serv->{'inet_sock'} = $inet_sock;
$serv->{'unix_sock'} = $unix_sock;
$this->{'servs'}->{$port} = $serv;
$this->{'serv_sel'}->add($inet_sock);
$this->{'serv_sel'}->add($unix_sock);
return 1;
}
sub connected
{
my $this = shift;
my $host = shift;
my $port = shift;
return(exists($this->{'conns'}->{"${host}_$port"}));
}
sub disconnect_all
{
my $this = shift;
foreach (keys(%{$this->{'conns'}}))
{
my $conn = $this->{'conns'}->{$_}->{'conn'};
$this->{'conn_sel'}->remove($conn);
delete($this->{'conns'}->{"$_"});
shutdown($conn, 2);
close($conn);
}
return 1;
}
sub disconnect
{
my $this = shift;
my $host = shift;
my $port = shift;
if(!exists($this->{'conns'}->{"${host}_$port"}))
{
$this->{'err'} = "Not connected";
return 0;
}
my $conn = $this->{'conns'}->{"${host}_$port"}->{'conn'};
$this->{'conn_sel'}->remove($conn);
delete($this->{'conns'}->{"${host}_$port"});
shutdown($conn, 2);
close($conn);
return 1;
}
sub unlisten
{
my $this = shift;
my $port = shift;
if(!exists($this->{'servs'}->{$port}))
{
$this->{'err'} = "Not connected";
return 0;
}
my $serv = $this->{'servs'}->{$port};
$this->{'serv_sel'}->remove($serv->{'unix_sock'});
$this->{'serv_sel'}->remove($serv->{'inet_sock'});
shutdown($serv->{'unix_sock'}, 2);
shutdown($serv->{'inet_sock'}, 2);
close($serv->{'unix_sock'});
close($serv->{'inet_sock'});
unlink("$LOCALADDR_PREFIX$port");
delete($this->{'servs'}->{$port});
return 1;
}
sub connect
{
my $this = shift;
my $host = shift;
my $port = shift;
if(exists($this->{'conns'}->{"${host}_$port"}))
{
$this->{'err'} = "Already connected";
return 0;
}
my $cn =
{
'host' => $host,
'port' => $port,
'auto_arb' => 1,
};
my $addr = gethostbyname($host);
my $conn = undef;
# local socket handling is fundamentally broken, a saa-redesign is needed
# if($this->{'my_addr'} eq $addr) # try domain socket first
# {
# $conn = IO::Socket::UNIX->new(Peer => "$LOCALADDR_PREFIX$port",
# Type => SOCK_STREAM,
# Timeout => 10);
# }
if(!defined($conn))
{
# $debug && print STDERR "Saa::connect(): domain socket $LOCALADDR_PREFIX$port failed with $!, trying inet\n";
if(!($conn = IO::Socket::INET->new(PeerAddr => $host,
PeerPort => $port,
Proto => "tcp",
Type => SOCK_STREAM)))
{
$debug && print STDERR "Saa::connect(): tcp connect failed with $@\n";
$this->{'err'} = "$@";
return 0;
}
}
else
{
$debug && print STDERR "Saa::connect(): Successfully opened localsoc!\n";
}
binmode($conn, ":raw");
$cn->{'conn'} = $conn;
$this->{'conn_sel'}->add($conn);
$this->{'conns'}->{"${host}_$port"} = $cn;
return 1;
lib/Alvis/Saa.pm view on Meta::CPAN
}
return 1;
}
# 'tag' => client name for the msg
# 'arb' => scalar data or func(tag) that returs scalar or undef on end-of-data
# 'arb_name' => scalar
sub queue
{
my $this = shift;
my $host = shift;
my $port = shift;
my $msg = shift;
my %par = @_;
my $q_elem = {
'host' => $host,
'port' => $port,
'msg' => $msg
};
if(exists($par{'arb'}))
{
$q_elem->{'arb'} = $par{'arb'};
$q_elem->{'arb_name'} = $par{'arb_name'};
}
if(exists($par{'tag'}))
{
$q_elem->{'tag'} = $par{'tag'};
}
# print STDERR "scheduled req: " . Dumper($q_elem);
push(@{$this->{'queue'}}, $q_elem);
return 1;
}
sub process_accept
{
my $this = shift;
my $timeout = shift;
$timeout=10;
my @servs = keys(%{$this->{'servs'}});
my @reads = $this->{'serv_sel'}->can_read($timeout);
# print "Riidit: " . Dumper(\@reads) . "\n";
my $conn;
foreach $conn (@reads)
{
my $serv;
my $found = 0;
for(@servs)
{
if($this->{'servs'}->{$_}->{'inet_sock'} == $conn ||
$this->{'servs'}->{$_}->{'unix_sock'} == $conn)
{
$serv = $this->{'servs'}->{$_};
$found = 1;
last;
}
}
my $client = $conn->accept();
# print "Conn " . Dumper($conn);
if(!defined($client))
{
# print STDERR "PRKL: $!\n";
next;
}
my $str_ip;
my $port;
#for some reason sockdomain returns undef
# if(AF_INET == $client->sockdomain)
# {
my $sockaddr = $client->peername();
my $iaddr;
($port, $iaddr) = sockaddr_in($sockaddr);
$str_ip = inet_ntoa($iaddr);
# print STDERR "Saa: accept found port $port and ip $str_ip\n";
# }
# else # AF_UNIX
# {
# my $sn = $client->sockname();
# $sn =~ /$LOCALADDR_PREFIX([0-9]+)/;
# $port = $1;
# $str_ip = inet_ntoa($this->{'my_addr'});
# $debug && print STDERR "Saa::process_accept(): AF_UNIX connection with ip $str_ip port $port\n";
# }
my $cn =
{
'host' => $str_ip,
'port' => $port,
'conn' => $client,
'lport' => $serv->{'port'},
};
$serv->{'auto_arb'} && ($cn->{'auto_arb'} = $serv->{'auto_arb'});
$serv->{'callback'} && ($cn->{'callback'} = $serv->{'callback'});
$this->{'conns'}->{"${str_ip}_$port"} = $cn;
$this->{'conn_sel'}->add($client);
}
return (1, 0);
}
sub process_write
{
my $this = shift;
my $sent = shift;
my $q = $this->{'queue'};
my %banned = (); # makes sure the order of messages for the same connection is kept
( run in 2.383 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )