MojoX-HTTP-Async

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
     that GET HTTP method will be used.
 
   $timeout
 
     Time in seconds. Can be fractional with microseconds tolerance.
 
     The request_timeout from conmtrucor will be used by default.
 
not_empty($self)
 
   Returns 1 if there even one slot is busy or slot contains a not
   processed response. Otherwise the method returns 0.
 
wait_for_next_response($self, $timeout = 0)
 
   Waits for first received response or time-outed request in any slot.
   Returns the Mojo::Transaction::HTTP instance with result.
 
   $timeout
 
     Period of time in seconds. Can be fractial with microsecond

README.md  view on Meta::CPAN

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
        Using of string with URI or an instance of "Mojo::URL" class assumes
        that GET HTTP method will be used.
 
###### $timeout
        Time in seconds. Can be fractional with microseconds tolerance.
 
        The "request_timeout" from construcor will be used by default.
 
##### not_empty($self)
    Returns 1 if there even one slot is busy or slot contains a not
    processed response. Otherwise the method returns 0.
 
##### wait_for_next_response($self, $timeout = 0)
    Waits for first received response or time-outed request in any slot.
    Returns the "Mojo::Transaction::HTTP" instance with result.
 
###### $timeout
        Period of time in seconds. Can be fractional with microsecond
        tolerance. The response will be marked as time-outed after this time is out.

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
sub _add_slot ($self, $slot) {
    push($self->{'_conns'}->@*, $slot) if ($slot);
}
 
sub _make_slot ($self) {
    return {
        'reader' => undef,
        'writer' => undef,
        'socket' => undef,
        'sock_no' => 0,
        'is_busy' => 0,
        'request' => undef,
        'tx' => undef,
        'exp_ts' => 0,
        'tmp_response' => undef,
        'reconnect_is_required' => 0,
        'last_response_ts' => 0,
        'connected_ts' => 0,
    };
}

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
            my $slot = $socks2slots->{ $slot_no };
            $self->_mark_response_as_broken($slot, 520, $message);
        }
    }
}
 
sub _get_free_slot ($self) {
 
    my $slot;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
                      $self->{'_conns'}->@*;
 
    if (%socks2slots) {
 
        local $!;
        my $write_handles = '';
 
        vec($write_handles, $_, 1) = 1 for keys %socks2slots;
 
        my $error_handles = $write_handles;

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
        if ($request) {
            $self->_send_request($slot, $request, $timeout);
            $status = 1;
        }
    }
 
    return $status;
}
 
sub _clear_slot ($self, $slot, $force = 0) {
    $slot->{'is_busy'} = 0;
    $slot->{'exp_ts'} = 0;
    $slot->{'tx'} = undef;
    $slot->{'request'} = undef;
    $slot->{'tmp_response'} = undef;
    if ($force) {
        close($slot->{'socket'}) if $slot->{'socket'};
        $slot->{'socket'} = undef;
        $slot->{'reader'} = undef;
        $slot->{'writer'} = undef;
        $slot->{'sock_no'} = 0;
        $slot->{'reconnect_is_required'} = 0;
        $slot->{'last_response_ts'} = 0;
        $slot->{'connected_ts'} = 0;
    }
}
 
sub _mark_slot_as_broken($self, $slot) {
    $slot->{'reconnect_is_required'} = 1;
    $slot->{'is_busy'} = 1;
    $slot->{'request'} //= Mojo::Message::Request->new();
    $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
        'req' => $slot->{'request'},
        'res' => Mojo::Message::Response->new()
    );
}
 
sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
    $self->_mark_slot_as_broken($slot);li
    $slot->{'request'}->error({'message' => $msg, 'code' => $code});

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_request_as_broken($slot, 524, $message);
}
 
sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
    $self->_mark_response_as_broken($slot, 524, $message);
}
 
sub _send_request ($self, $slot, $request, $timeout = undef) {
 
    croak("slot is busy") if ($slot->{'is_busy'});
    croak("request object is obligatory") if (!$request);
    croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
 
    my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
    my $url = $request->url();
    my $uri = URI->new( $url );
    my $scheme = $url->scheme();
 
    if ($scheme && $required_scheme ne $scheme) {
        croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
$timeout //= $self->{'request_timeout'};
 
my $response = '';
state $default_ua_hdr = 'perl/' . __PACKAGE__;
 
my $h = $request->headers();
$h->host($self->{'host'}) if (! $h->host() );
$h->user_agent($default_ua_hdr) if (! $h->user_agent() );
 
$slot->{'request'} = $request;
$slot->{'is_busy'} = 1;
$slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
 
my $plain_request = $request->to_string();
 
if ($self->{'ssl'}) {
    $slot->{'writer'}->print($plain_request);
} else {
    my $socket = $slot->{'socket'};
    my $msg_len = bytes::length($plain_request);
    my $sent_bytes = 0;

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
    if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
        $self->_mark_request_as_timeouted($slot);
    }
 
    return;
}
 
sub _try_to_read ($self, $slot) {
 
    return if $slot->{'tx'} || ! $slot->{'is_busy'};
 
    my $reader = $slot->{'reader'};
    my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
 
    $response->parse($_) while (<$reader>);
 
    if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
        $self->_mark_response_as_broken($slot, 520, $!);
    } elsif ($response && $response->code()) {

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
        $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
    }
 
    if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
        $self->_mark_response_as_timeouted($slot);
    }
}
 
=head2 not_empty($self)
 
Returns 1 if there even one slot is busy or slot contains a not processed response.
Otherwise the method returns 0.
 
=cut
 
sub not_empty ($self) {
 
    my $not_empty = scalar $self->{'_conns'}->@*;
 
    for my $slot ($self->{'_conns'}->@*) {
        $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
    }
 
    return $not_empty ? 1 : 0;
}
 
 
=head2 wait_for_next_response($self, $timeout = 0)
 
Waits for first received response or time-outed request in any slot.
Returns the C<Mojo::Transaction::HTTP> instance with result.

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
        $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
    }
 
    return $tx;
}
 
sub _get_response_from_ready_slot ($self) {
 
    my $tx;
    my %socks2slots = map { $_->{'sock_no'} => $_ }
                      grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
                      $self->{'_conns'}->@*;
 
    if (%socks2slots) {
 
        local $!;
        my $read_handles = '';
 
        vec($read_handles, $_, 1) = 1 for keys %socks2slots;
 
        my $error_handles = $read_handles;
        my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
 
        $self->_check_for_errors(\%socks2slots, $error_handles, $!);
 
        for my $sock_no (keys %socks2slots) {
            my $slot = $socks2slots{ $sock_no };
            if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
                $self->_try_to_read($slot);
                next if ! $slot->{'tx'};
                next if ! $slot->{'is_busy'};
                $tx = $slot->{'tx'};
            } else {
                if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
                    $self->_mark_response_as_timeouted($slot);
                    $tx = $slot->{'tx'};
                }
            }
 
            if ($tx) {
                $self->_clear_slot($slot, 0);

lib/MojoX/HTTP/Async.pm  view on Meta::CPAN

778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
The class destructor.
 
Closes all opened sockets.
 
=cut
 
sub DESTROY ($self) {
    my $in_use = 0;
    while ( my $slot = shift($self->{'_conns'}->@*) ) {
        $in_use++ if ($slot->{'is_busy'});
        $slot->{'socket'}->close() if ($slot->{'socket'});
    }
    warn ref($self) ." object destroyed but still in use" if $in_use;
}
 
1;
__END__

t/inactivity_timeout.t  view on Meta::CPAN

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
my $server = start_server(\&on_start_cb, $host);
my $ua = MojoX::HTTP::Async->new(
    'host' => $host,
    'port' => $server->port(),
    'slots' => $slots,
    'connect_timeout' => $connect_timeout,
    'request_timeout' => $request_timeout,
    'inactivity_conn_ts' => $inactivity_timeout,
);
 
# one slot is free and one slot is busy
ok($ua->add("/page/01.html"), "Adding the first request");
 
sleep($inactivity_timeout + 0.1);
 
my $n = $ua->refresh_connections();
 
is($n, 1, "Checking the amount of renewed slots");
 
# both slots are busy
ok($ua->add("/page/02.html"), "Adding the second request");
ok($ua->add("/page/03.html"), "Adding the third request");
 
sleep($inactivity_timeout + 0.1);
 
$n = $ua->refresh_connections();
 
is($n, 2, "Checking the amount of renewed slots");
 
# all connections are fresh



( run in 0.264 second using v1.01-cache-2.11-cpan-d6f9594c0a5 )