App-Pod

 view release on metacpan or  search on metacpan

t/cpan/Mojo2/UserAgent.pm  view on Meta::CPAN

package Mojo2::UserAgent;
use Mojo::Base 'Mojo::EventEmitter';

# "Fry: Since when is the Internet about robbing people of their privacy?
#  Bender: August 6, 1991."
use Mojo::IOLoop;
use Mojo::Promise;
use Mojo::Util qw(monkey_patch term_escape);
use Mojo::UserAgent::CookieJar;
use Mojo::UserAgent::Proxy;
use Mojo::UserAgent::Server;
use Mojo::UserAgent::Transactor;
use Scalar::Util qw(weaken);

use constant DEBUG => $ENV{MOJO_CLIENT_DEBUG} || 0;

has ca                 => sub { $ENV{MOJO_CA_FILE} };
has cert               => sub { $ENV{MOJO_CERT_FILE} };
has connect_timeout    => sub { $ENV{MOJO_CONNECT_TIMEOUT} || 10 };
has cookie_jar         => sub { Mojo::UserAgent::CookieJar->new };
has inactivity_timeout => sub { $ENV{MOJO_INACTIVITY_TIMEOUT} // 40 };
has insecure           => sub { $ENV{MOJO_INSECURE} };
has 'max_response_size';
has ioloop          => sub { Mojo::IOLoop->new };
has key             => sub { $ENV{MOJO_KEY_FILE} };
has max_connections => 5;
has max_redirects   => sub { $ENV{MOJO_MAX_REDIRECTS} || 0 };
has proxy           => sub { Mojo::UserAgent::Proxy->new };
has request_timeout => sub { $ENV{MOJO_REQUEST_TIMEOUT} // 0 };
has server => sub { Mojo::UserAgent::Server->new( ioloop => shift->ioloop ) };
has socket_options => sub { {} };
has transactor     => sub { Mojo::UserAgent::Transactor->new };

# Common HTTP methods
for my $name ( qw(DELETE GET HEAD OPTIONS PATCH POST PUT) ) {
    monkey_patch __PACKAGE__, lc $name, sub {
        my ( $self, $cb ) = ( shift, ref $_[-1] eq 'CODE' ? pop : undef );
        return $self->start( $self->build_tx( $name, @_ ), $cb );
    };
    monkey_patch __PACKAGE__, lc( $name ) . '_p', sub {
        my $self = shift;
        return $self->start_p( $self->build_tx( $name, @_ ) );
    };
}

sub DESTROY { shift->_cleanup unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }

sub build_tx           { shift->transactor->tx( @_ ) }
sub build_websocket_tx { shift->transactor->websocket( @_ ) }

sub start {
    my ( $self, $tx, $cb ) = @_;

    # Fork-safety
    $self->_cleanup->server->restart if $self->{pid} && $self->{pid} ne $$;
    $self->{pid} //= $$;

    # Non-blocking
    if ( $cb ) {
        warn "-- Non-blocking request (@{[_url($tx)]})\n" if DEBUG;
        return $self->_start( Mojo::IOLoop->singleton, $tx, $cb );
    }

    # Blocking
    warn "-- Blocking request (@{[_url($tx)]})\n" if DEBUG;
    $self->_start( $self->ioloop,
        $tx => sub { shift->ioloop->stop; $tx = shift } );
    $self->ioloop->start;

t/cpan/Mojo2/UserAgent.pm  view on Meta::CPAN

    $self->_finish( $id, 1 );
}

sub _finish {
    my ( $self, $id, $close ) = @_;

    # Remove request timeout and finish transaction
    return undef unless my $c = $self->{connections}{$id};
    $c->{ioloop}->remove( delete $c->{timeout} ) if $c->{timeout};
    return $self->_reuse( $id, $close ) unless my $old = $c->{tx};

    # Premature connection close
    my $res = $old->closed->res->finish;
    $res->error( { message => 'Premature connection close' } )
      if $close && !$res->code && !$res->error;

    # Always remove connection for WebSockets
    return $self->_remove( $id ) if $old->is_websocket;
    $self->cookie_jar->collect( $old );

    # Upgrade connection to WebSocket
    if ( my $new = $self->transactor->upgrade( $old ) ) {
        weaken $self;
        $new->on( resume => sub { $self->_write( $id ) } );
        $c->{cb}( $self, $c->{tx} = $new );
        return $new->client_read( $old->res->content->leftovers );
    }

    # CONNECT requests always have a follow-up request
    $self->_reuse( $id, $close ) unless uc $old->req->method eq 'CONNECT';
    $res->error( { message => $res->message, code => $res->code } )
      if $res->is_error;
    $c->{cb}( $self, $old ) unless $self->_redirect( $c, $old );
}

sub _process {
    my ( $self, $id ) = @_;

    my $c = $self->{connections}{$id};
    my $stream =
      $c->{ioloop}->stream( $id )->timeout( $self->inactivity_timeout );
    my $tx     = $c->{tx}->connection( $id );
    my $handle = $stream->handle;
    unless ( $handle->isa( 'IO::Socket::UNIX' ) ) {
        $tx->local_address( $handle->sockhost )
          ->local_port( $handle->sockport );
        $tx->remote_address( $handle->peerhost )
          ->remote_port( $handle->peerport );
    }

    weaken $self;
    $tx->on( resume => sub { $self->_write( $id ) } );
    $self->_write( $id );
}

sub _read {
    my ( $self, $id, $chunk ) = @_;

    # Corrupted connection
    return $self->_remove( $id ) unless my $tx = $self->{connections}{$id}{tx};
    warn term_escape "-- Client <<< Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
    $tx->client_read( $chunk );
    $self->_finish( $id ) if $tx->is_finished;
}

sub _redirect {
    my ( $self, $c, $old ) = @_;
    return undef unless my $new = $self->transactor->redirect( $old );
    return undef unless @{ $old->redirects } < $self->max_redirects;
    return $self->_start( $c->{ioloop}, $new, delete $c->{cb} );
}

sub _remove {
    my ( $self, $id ) = @_;
    my $c = delete $self->{connections}{$id};
    $self->_dequeue( $c->{ioloop}, $id );
    $c->{ioloop}->remove( $id );
}

sub _reuse {
    my ( $self, $id, $close ) = @_;

    # Connection close
    my $c   = $self->{connections}{$id};
    my $tx  = delete $c->{tx};
    my $max = $self->max_connections;
    return $self->_remove( $id )
      if $close || !$tx || !$max || !$tx->keep_alive || $tx->error;

    # Keep connection alive
    my $queue = $self->{queue}{ $c->{ioloop} } //= [];
    $self->_remove( shift( @$queue )->[1] ) while @$queue && @$queue >= $max;
    push @$queue, [ join( ':', $self->transactor->endpoint( $tx ) ), $id ];
}

sub _start {
    my ( $self, $loop, $tx, $cb ) = @_;

    # Application server
    $self->emit( prepare => $tx );
    my $url = $tx->req->url;
    if ( !$url->is_abs && ( my $server = $self->server ) ) {
        my $base = $loop == $self->ioloop ? $server->url : $server->nb_url;
        $url->scheme( $base->scheme )->host( $base->host )->port( $base->port );
    }

    $_->prepare( $tx ) for $self->proxy, $self->cookie_jar;
    my $max = $self->max_response_size;
    $tx->res->max_message_size( $max ) if defined $max;
    $self->emit( start => $tx );

    # Allow test servers sharing the same event loop to clean up connections
    !$loop->next_tick( sub { } ) and $loop->one_tick unless $loop->is_running;
    return undef unless my $id = $self->_connection( $loop, $tx, $cb );

    if ( my $t = $self->request_timeout ) {
        weaken $self;
        $self->{connections}{$id}{timeout} ||=
          $loop->timer( $t => sub { $self->_error( $id, 'Request timeout' ) } );
    }

    return $id;
}

sub _url { shift->req->url->to_abs }

sub _write {
    my ( $self, $id ) = @_;

    # Protect from resume event recursion
    my $c = $self->{connections}{$id};
    return if !( my $tx = $c->{tx} ) || $c->{writing};
    local $c->{writing} = 1;
    my $chunk = $tx->client_write;
    warn term_escape "-- Client >>> Server (@{[_url($tx)]})\n$chunk\n" if DEBUG;
    return unless length $chunk;

    weaken $self;
    $c->{ioloop}->stream( $id )
      ->write( $chunk => sub { $self && $self->_write( $id ) } );
}

1;

=encoding utf8

=head1 NAME

Mojo::UserAgent - Non-blocking I/O HTTP and WebSocket user agent

=head1 SYNOPSIS

  use Mojo::UserAgent;

  # Fine grained response handling (dies on connection errors)
  my $ua  = Mojo::UserAgent->new;
  my $res = $ua->get('docs.mojolicious.org')->result;
  if    ($res->is_success)  { say $res->body }
  elsif ($res->is_error)    { say $res->message }
  elsif ($res->code == 301) { say $res->headers->location }
  else                      { say 'Whatever...' }

  # Say hello to the Unicode snowman and include an Accept header
  say $ua->get('www.☃.net?hello=there' => {Accept => '*/*'})->result->body;

  # Extract data from HTML and XML resources with CSS selectors
  say $ua->get('www.perl.org')->result->dom->at('title')->text;

  # Scrape the latest headlines from a news site
  say $ua->get('blogs.perl.org')->result->dom->find('h2 > a')->map('text')->join("\n");

  # IPv6 PUT request with Content-Type header and content
  my $tx = $ua->put('[::1]:3000' => {'Content-Type' => 'text/plain'} => 'Hi!');

  # Quick JSON API request with Basic authentication
  my $url = Mojo::URL->new('https://example.com/test.json')->userinfo('sri:☃');
  my $value = $ua->get($url)->result->json;

  # JSON POST (application/json) with TLS certificate authentication
  my $tx = $ua->cert('tls.crt')->key('tls.key')->post('https://example.com' => json => {top => 'secret'});

  # Form POST (application/x-www-form-urlencoded)
  my $tx = $ua->post('https://metacpan.org/search' => form => {q => 'mojo'});

  # Search DuckDuckGo anonymously through Tor
  $ua->proxy->http('socks://127.0.0.1:9050');
  say $ua->get('api.3g2upl4pq6kufc4m.onion/?q=mojolicious&format=json')->result->json('/Abstract');

  # GET request via UNIX domain socket "/tmp/myapp.sock" (percent encoded slash)
  say $ua->get('http+unix://%2Ftmp%2Fmyapp.sock/test')->result->body;

  # Follow redirects to download Mojolicious from GitHub
  $ua->max_redirects(5)
    ->get('https://www.github.com/mojolicious/mojo/tarball/main')
    ->result->save_to('/home/sri/mojo.tar.gz');



( run in 2.393 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )