IO-Stream

 view release on metacpan or  search on metacpan

lib/IO/Stream.pm  view on Meta::CPAN

    no strict 'refs';
    for my $const (@p) {
        next if !$known{$const};
        *{"${pkg}::$const"} = \&{$const};
    }
    return;
}


my @Active;


sub new {
    my (undef, $opt) = @_;
    croak 'usage: IO::Stream->new({ cb=>, wait_for=>, [fh=>, | host=>, port=>,] ... })'
        if ref $opt ne 'HASH'
        || !$opt->{cb}
        || !($opt->{fh} xor $opt->{host})
        || ($opt->{host} xor $opt->{port});

    my $self = bless {
        # no default values for these:
        cb          => undef,
        wait_for    => undef,
        fh          => undef,
        host        => undef,
        port        => undef,
        # default values:
        method      => 'IO',
        in_buf_limit=> undef,
        out_buf     => q{},                 # modified on: OUT
        out_pos     => undef,               # modified on: OUT
        # user shouldn't provide values for these, but it's ok if he want:
        out_bytes   => 0,                   # modified on: OUT
        in_buf      => q{},                 # modified on: IN
        in_bytes    => 0,                   # modified on: IN
        ip          => undef,               # modified on: RESOLVED
        is_eof      => undef,               # modified on: EOF
        # load user values:
        %{$opt},
        # we'll setup these below:
        plugin      => {},
        _master     => undef,
        _slave      => undef,
        _id         => undef,
    }, __PACKAGE__;

    # Create socket if needed.
    if (!$self->{fh}) {
        # Maybe it have sense instead or croak just send event to user?
        # (Most probable reason: error in socket because there no more fd.)
        socket $self->{fh}, AF_INET, SOCK_STREAM, PROTO_TCP
                                                        or croak "socket: $!";
      if (!WIN32) {
        fcntl $self->{fh}, F_SETFL, O_NONBLOCK          or croak "fcntl: $!";
      } else {
        my $nb=1; ioctl $self->{fh}, FIONBIO, \$nb      or croak "ioctl: $!";
      }
    }

    # Keep this object alive, even if user doesn't keep it himself.
    $self->{_id} = fileno $self->{fh};
    if (!$self->{_id}) {
        croak q{can't get file descriptor};
    } elsif ($Active[ $self->{_id} ]) {
        croak q{can't create second object for same fh};
    } else {
        $Active[ $self->{_id} ] = $self;
    }

    # Connect plugins into chain and setup {plugin}.
    my $master = $self;
    if ($opt->{plugin}) {
        while (my ($name, $plugin) = splice @{ $opt->{plugin} }, 0, 2) {
            $self->{plugin}{$name}  = $plugin;
            $master->{_slave}       = $plugin;
            $plugin->{_master}      = $master;
            weaken($plugin->{_master});
            $master                 = $plugin;
        }
    }
    my $plugin          = IO::Stream::EV->new();
    $master->{_slave}   = $plugin;
    $plugin->{_master}  = $master;
    weaken($plugin->{_master});

    # Ask plugin chain to continue with initialization:
    $self->{_slave}->PREPARE($self->{fh}, $self->{host}, $self->{port});

    # Shortcuts for typical operations after creating new I/O object:
    if (length $self->{out_buf}) {
        $self->write();
    }

    return $self;
}

#
# Push user data down the stream, optionally adding new data to {out_buf}.
#
sub write {     ## no critic (ProhibitBuiltinHomonyms)
    my ($self, $data) = @_;
    if ($#_ > 0) {
        $self->{out_buf} .= $data;
    }
    $self->{_slave}->WRITE();
    return;
}

#
# Free fh and Stream object.
#
sub close {     ## no critic (ProhibitBuiltinHomonyms ProhibitAmbiguousNames)
    my ($self) = @_;
    undef $Active[ $self->{_id} ];
    return close $self->{fh};
}

#
# Filter and deliver to user events (received from top plugin in the chain).
#



( run in 1.251 second using v1.01-cache-2.11-cpan-39bf76dae61 )