IO-Stream
view release on metacpan or search on metacpan
lib/IO/Stream.pm view on Meta::CPAN
no strict 'refs';
for my $const (@p) {
next if !$known{$const};
*{"${pkg}::$const"} = \&{$const};
}
return;
}
my @Active;
sub new {
my (undef, $opt) = @_;
croak 'usage: IO::Stream->new({ cb=>, wait_for=>, [fh=>, | host=>, port=>,] ... })'
if ref $opt ne 'HASH'
|| !$opt->{cb}
|| !($opt->{fh} xor $opt->{host})
|| ($opt->{host} xor $opt->{port});
my $self = bless {
# no default values for these:
cb => undef,
wait_for => undef,
fh => undef,
host => undef,
port => undef,
# default values:
method => 'IO',
in_buf_limit=> undef,
out_buf => q{}, # modified on: OUT
out_pos => undef, # modified on: OUT
# user shouldn't provide values for these, but it's ok if he want:
out_bytes => 0, # modified on: OUT
in_buf => q{}, # modified on: IN
in_bytes => 0, # modified on: IN
ip => undef, # modified on: RESOLVED
is_eof => undef, # modified on: EOF
# load user values:
%{$opt},
# we'll setup these below:
plugin => {},
_master => undef,
_slave => undef,
_id => undef,
}, __PACKAGE__;
# Create socket if needed.
if (!$self->{fh}) {
# Maybe it have sense instead or croak just send event to user?
# (Most probable reason: error in socket because there no more fd.)
socket $self->{fh}, AF_INET, SOCK_STREAM, PROTO_TCP
or croak "socket: $!";
if (!WIN32) {
fcntl $self->{fh}, F_SETFL, O_NONBLOCK or croak "fcntl: $!";
} else {
my $nb=1; ioctl $self->{fh}, FIONBIO, \$nb or croak "ioctl: $!";
}
}
# Keep this object alive, even if user doesn't keep it himself.
$self->{_id} = fileno $self->{fh};
if (!$self->{_id}) {
croak q{can't get file descriptor};
} elsif ($Active[ $self->{_id} ]) {
croak q{can't create second object for same fh};
} else {
$Active[ $self->{_id} ] = $self;
}
# Connect plugins into chain and setup {plugin}.
my $master = $self;
if ($opt->{plugin}) {
while (my ($name, $plugin) = splice @{ $opt->{plugin} }, 0, 2) {
$self->{plugin}{$name} = $plugin;
$master->{_slave} = $plugin;
$plugin->{_master} = $master;
weaken($plugin->{_master});
$master = $plugin;
}
}
my $plugin = IO::Stream::EV->new();
$master->{_slave} = $plugin;
$plugin->{_master} = $master;
weaken($plugin->{_master});
# Ask plugin chain to continue with initialization:
$self->{_slave}->PREPARE($self->{fh}, $self->{host}, $self->{port});
# Shortcuts for typical operations after creating new I/O object:
if (length $self->{out_buf}) {
$self->write();
}
return $self;
}
#
# Push user data down the stream, optionally adding new data to {out_buf}.
#
sub write { ## no critic (ProhibitBuiltinHomonyms)
my ($self, $data) = @_;
if ($#_ > 0) {
$self->{out_buf} .= $data;
}
$self->{_slave}->WRITE();
return;
}
#
# Free fh and Stream object.
#
sub close { ## no critic (ProhibitBuiltinHomonyms ProhibitAmbiguousNames)
my ($self) = @_;
undef $Active[ $self->{_id} ];
return close $self->{fh};
}
#
# Filter and deliver to user events (received from top plugin in the chain).
#
( run in 1.251 second using v1.01-cache-2.11-cpan-39bf76dae61 )