MojoX-HTTP-Async

 view release on metacpan or  search on metacpan

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

    $slot->{'connected_ts'} = time();
    $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
    $slot->{'sock_no'} = fileno($socket);
    if ($self->{'ssl'}) {
        my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
        croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
        $ssl_socket->blocking(0); # just to be sure
        $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
    }
}

sub _connect_slot ($self, $slot) {
    my $timeout = $self->{'connect_timeout'};

    if ($timeout > 0) {
        eval {
            local $SIG{'ALRM'} = sub { die "alarm\n" };
            alarm($timeout);
            $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
            alarm(0);
        };

        my $error = $@;

        alarm(0);

        if ($error) {
            croak($error) if ($error ne "alarm\n");
            $self->_mark_request_as_timeouted($slot, 'Connect timeout');
        }
    } else {
        $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
    }
}

sub _make_connections ($self, $amount) {

    my $host_addr = inet_aton($self->{'host'});
    croak("can't call inet_aton") if (! $host_addr);

    $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
    $self->{'proto'} //= getprotobyname("tcp");

    for (1 .. $amount) {
        my $slot = $self->_make_slot();
        $self->_connect_slot($slot);
        $self->_add_slot($slot);
    }
}

sub _add_slot ($self, $slot) {
    push($self->{'_conns'}->@*, $slot) if ($slot);
}

sub _make_slot ($self) {
    return {
        'reader' => undef,
        'writer' => undef,
        'socket' => undef,
        'sock_no' => 0,
        'is_busy' => 0,
        'request' => undef,
        'tx' => undef,
        'exp_ts' => 0,
        'tmp_response' => undef,
        'reconnect_is_required' => 0,
        'last_response_ts' => 0,
        'connected_ts' => 0,
    };
}

sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') {

    my $message = $reason;

    if (!$message) {
        $message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error';
    }

    for my $slot_no (keys %$socks2slots) {
        if ( vec($error_handles, $slot_no, 1) != 0 ) {
            my $slot = $socks2slots->{ $slot_no };
            $self->_mark_response_as_broken($slot, 520, $message);
        }
    }
}

sub _get_free_slot ($self) {

    my $slot;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
                      $self->{'_conns'}->@*;

    if (%socks2slots) {

        local $!;
        my $write_handles = '';

        vec($write_handles, $_, 1) = 1 for keys %socks2slots;

        my $error_handles = $write_handles;
        my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0);

        $self->_check_for_errors(\%socks2slots, $error_handles, $!);

        if ($nfound) {
            my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots;
            $slot = $socks2slots{ $slot_no };
        }
    }

    return $slot;
}

=head2 add ($self, $request_or_uri, $timeout = undef)

Adds HTTP request into empty slot.

If the request was successfully added, then it will return 1.
Otherwise it will return 0.

The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to
the limit of slot's amount had been reached (see C<new> and C<slots>.

It's recommendable always to check result code of this method.

Example:

    my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1);

    # let's occupy the only slot
    $ua->add('/page1.html');

    # let's wait until it's become free again
    while ( ! $ua->add('/page2.html') ) {
        while (my $tx = $ua->wait_for_next_response() ) {
            # do something here
        }
    }

=over

=item $request_or_uri

It can be either an instance of C<Mojo::Message::Request> class, or an instance of C<Mojo::URL>.
It also can be a simple URI srtring.

If the resourse contains the host, then it must be the same as in the constructor C<new>.

Using of string with URI or an instance of C<Mojo::URL> class assumes that GET HTTP method will be used.

=item $timeout

Time in seconds. Can be fractional with microseconds tolerance.

The C<request_timeout> from conmtrucor will be used by default.

=back

=cut

sub add ($self, $request_or_uri, $timeout = undef) {
    my $status = 0;
    my $slot = $self->_get_free_slot();

    if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) {
        $self->_make_connections(1);
        $slot = $self->_get_free_slot();
    }

    if ($slot) {
        my $request = $request_or_uri;
        if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) {
            $request = Mojo::Message::Request->new();
            $request->url()->parse($request_or_uri);
        }
        if ($request) {
            $self->_send_request($slot, $request, $timeout);
            $status = 1;
        }
    }

    return $status;
}

sub _clear_slot ($self, $slot, $force = 0) {
    $slot->{'is_busy'} = 0;
    $slot->{'exp_ts'} = 0;
    $slot->{'tx'} = undef;
    $slot->{'request'} = undef;
    $slot->{'tmp_response'} = undef;
    if ($force) {
        close($slot->{'socket'}) if $slot->{'socket'};
        $slot->{'socket'} = undef;
        $slot->{'reader'} = undef;
        $slot->{'writer'} = undef;
        $slot->{'sock_no'} = 0;
        $slot->{'reconnect_is_required'} = 0;
        $slot->{'last_response_ts'} = 0;
        $slot->{'connected_ts'} = 0;
    }
}

sub _mark_slot_as_broken($self, $slot) {
    $slot->{'reconnect_is_required'} = 1;
    $slot->{'is_busy'} = 1;
    $slot->{'request'} //= Mojo::Message::Request->new();
    $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
        'req' => $slot->{'request'},
        'res' => Mojo::Message::Response->new()
    );
}

sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
    $self->_mark_slot_as_broken($slot);li
    $slot->{'request'}->error({'message' => $msg, 'code' => $code});
}

sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
    $self->_mark_slot_as_broken($slot);

    my $res = $slot->{'tx'}->res();
    $res->error({'message' => $msg, 'code' => $code});
    $res->headers()->content_length(0);
    $res->code($code);
    $res->message($msg);
}

sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_request_as_broken($slot, 524, $message);
}

sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_response_as_broken($slot, 524, $message);
}

sub _send_request ($self, $slot, $request, $timeout = undef) {

    croak("slot is busy") if ($slot->{'is_busy'});
    croak("request object is obligatory") if (!$request);
    croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));

    my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
    my $url = $request->url();
    my $uri = URI->new( $url );
    my $scheme = $url->scheme();

    if ($scheme && $required_scheme ne $scheme) {
        croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
    }

    if (! $uri->scheme()) {
        # URI::_generic doesn't have C<host> method, that's why we set the scheme by ourseleves to change C<$uri> type
        $uri->scheme($required_scheme);
    }

    if (my $host = $uri->host()) {
        if ($host ne $self->{'host'}) {
            croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'}));
        }
    }

    $timeout //= $self->{'request_timeout'};

    my $response = '';
    state $default_ua_hdr = 'perl/' . __PACKAGE__;

    my $h = $request->headers();
    $h->host($self->{'host'}) if (! $h->host() );
    $h->user_agent($default_ua_hdr) if (! $h->user_agent() );

    $slot->{'request'} = $request;
    $slot->{'is_busy'} = 1;
    $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;

    my $plain_request = $request->to_string();

    if ($self->{'ssl'}) {
        $slot->{'writer'}->print($plain_request);
    } else {
        my $socket = $slot->{'socket'};
        my $msg_len = bytes::length($plain_request);
        my $sent_bytes = 0;
        my $attempts = 10;

        local $!;

        while ($sent_bytes < $msg_len && $attempts--) {
            my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes);

            if ($! || ! defined($bytes)) {
                my $error = $! // 'Unknown error';
                $self->_mark_request_as_broken($slot, 520, $error);
                return;
            }

            $sent_bytes += $bytes;
            $plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len;
        }

        if ($sent_bytes < $msg_len) {
            my $error = $! // 'sent message is shorter than original';
            $self->_mark_request_as_broken($slot, 520, $error);
            return;
        }
    }

    if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
        $self->_mark_request_as_timeouted($slot);
    }

    return;
}

sub _try_to_read ($self, $slot) {

    return if $slot->{'tx'} || ! $slot->{'is_busy'};

    my $reader = $slot->{'reader'};
    my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();

    $response->parse($_) while (<$reader>);

    if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
        $self->_mark_response_as_broken($slot, 520, $!);
    } elsif ($response && $response->code()) {

        my $content = $response->content();

        if ($content->is_finished()) { # this is required to support  "Transfer-Encoding: chunked"
            $slot->{'tx'} = Mojo::Transaction::HTTP->new(
                'req' => $slot->{'request'},
                'res' => $response
            );
            $slot->{'tmp_response'} = undef;
            $slot->{'last_response_ts'} = time();
        } else {
            $slot->{'tmp_response'} = $response;
        }

        $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
    }

    if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
        $self->_mark_response_as_timeouted($slot);
    }
}

=head2 not_empty($self)

Returns 1 if there even one slot is busy or slot contains a not processed response.
Otherwise the method returns 0.

=cut

sub not_empty ($self) {

    my $not_empty = scalar $self->{'_conns'}->@*;

    for my $slot ($self->{'_conns'}->@*) {
        $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
    }

    return $not_empty ? 1 : 0;
}


=head2 wait_for_next_response($self, $timeout = 0)

Waits for first received response or time-outed request in any slot.
Returns the C<Mojo::Transaction::HTTP> instance with result.

=over

=item $timeout

Period of time in seconds. Can be fractial with microsecond tollerance.
The response will be marked as time-outed after this time is out.

The default value is 0, which means that request will have been blocked until the response is received.

If all slots are empty, then C<undef> will be returned.

=back

=cut

sub wait_for_next_response ($self, $timeout = 0) {

    my $response;
    my $exp_ts = $timeout ? (time() + $timeout) : 0;

    while (1) {
        last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed
        last if (($response = $self->next_response()) || !$self->not_empty());
        select(undef, undef, undef, 1E-6) if (!$response); # sleep 1 microsecond
    }

    return $response;
}

=head2 next_response ($self)

Returns an instance of C<Mojo::Transaction::HTTP> class.
If there is no response, it will return C<undef>.

=cut

sub next_response ($self) {
    return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot();
}

sub _get_response_from_slot ($self) {

    my $tx;
    my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*;

    if ($slot) {
        $tx = $slot->{'tx'};
        $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
    }

    return $tx;
}

sub _get_response_from_ready_slot ($self) {

    my $tx;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
                      $self->{'_conns'}->@*;

    if (%socks2slots) {

        local $!;
        my $read_handles = '';

        vec($read_handles, $_, 1) = 1 for keys %socks2slots;

        my $error_handles = $read_handles;
        my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);

        $self->_check_for_errors(\%socks2slots, $error_handles, $!);

        for my $sock_no (keys %socks2slots) {
            my $slot = $socks2slots{ $sock_no };
            if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
                $self->_try_to_read($slot);
                next if ! $slot->{'tx'};
                next if ! $slot->{'is_busy'};
                $tx = $slot->{'tx'};
            } else {
                if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
                    $self->_mark_response_as_timeouted($slot);
                    $tx = $slot->{'tx'};
                }
            }

            if ($tx) {
                $self->_clear_slot($slot, 0);
                last;
            }
        }
    }

    return $tx;
}

=head2 refresh_connections ($self)

Closes connections in slots in the following cases:

    1. The slot was marked as timeouted
    2. The "inactivity_conn_ts" was set and the connection was expired
    3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc)

Returns the amount of made reconnections.

=cut

sub refresh_connections ($self) {

    my $n = 0;
    my $now = time();
    my $keep_ts = $self->{'inactivity_conn_ts'} // 0;

    if (scalar $self->{'_conns'}->@*) {

        local $!;
        my $error_handles = '';
        my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*;

        vec($error_handles, $_, 1) = 1 for keys %socks2slots;
        select(undef, undef, $error_handles, 0);

        $self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect
    }

    for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) {
        my $slot = $self->{'_conns'}->[$i];
        my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts;
        my $is_outdated = $keep_ts && $slot_exp_ts <= $now;

        warn("Outdated\n") if $self->{'debug'} && $is_outdated;

        if ($slot->{'reconnect_is_required'} || $is_outdated) {
            warn("Going to reconnect\n") if $self->{'debug'};
            $self->_clear_slot($slot, 1);
            $self->_connect_slot($slot);
            $n++;
        }
    }

    return $n;
}

=head2 close_all ($self)

Closes all opened connections and resets all slots with requests.

=cut

sub close_all ($self) {
    $self->_clear_slot($_, 1) for $self->{'_conns'}->@*;
    $self->{'_conns'} = [];
    return;
}

=head2 DESTROY($self)

The class destructor.

Closes all opened sockets.

=cut

sub DESTROY ($self) {
    my $in_use = 0;
    while ( my $slot = shift($self->{'_conns'}->@*) ) {
        $in_use++ if ($slot->{'is_busy'});
        $slot->{'socket'}->close() if ($slot->{'socket'});
    }
    warn ref($self) ." object destroyed but still in use" if $in_use;
}

1;
__END__



( run in 0.696 second using v1.01-cache-2.11-cpan-39bf76dae61 )