IO-Async

 view release on metacpan or  search on metacpan

t/42function.t  view on Meta::CPAN

#!/usr/bin/perl

use v5.14;
use warnings;

use IO::Async::Test;

use Test2::V0 0.000149;
use constant HAVE_TEST_MEMORYGROWTH => eval { require Test::MemoryGrowth; };

use File::Temp qw( tempdir );
use Time::HiRes qw( sleep );

use IO::Async::Function;

use IO::Async::OS;

use IO::Async::Loop;

use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;

my $loop = IO::Async::Loop->new_builtin;

testing_loop( $loop );

# by future
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   ok( defined $function, '$function defined' );
   isa_ok( $function, [ "IO::Async::Function" ], '$function isa IO::Async::Function' );

   is_oneref( $function, '$function has refcount 1' );

   $loop->add( $function );

   is_refcount( $function, 2, '$function has refcount 2 after $loop->add' );

   is( $function->workers, 1, '$function has 1 worker' );
   is( $function->workers_busy, 0, '$function has 0 workers busy' );
   is( $function->workers_idle, 1, '$function has 1 workers idle' );

   my $future = $function->call(
      args => [ 10, 20 ],
   );

   isa_ok( $future, [ "Future" ], '$future isa Future' );

   is_refcount( $function, 2, '$function has refcount 2 after ->call' );

   is( $function->workers_busy, 1, '$function has 1 worker busy after ->call' );
   is( $function->workers_idle, 0, '$function has 0 worker idle after ->call' );

   wait_for { $future->is_ready };

   my ( $result ) = $future->get;

   is( $result, 30, '$result after call returns by future' );

   is( $function->workers_busy, 0, '$function has 0 workers busy after call returns' );
   is( $function->workers_idle, 1, '$function has 1 workers idle after call returns' );

   # ->stop future
   wait_for_future my $stop_f = $function->stop;
   ok( !$stop_f->failure, '$stop_f succeeds' );

   $loop->remove( $function );
}

# by callback
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   $loop->add( $function );

   my $result;

   $function->call(
      args => [ 10, 20 ],
      on_return => sub { $result = shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { defined $result };

   is( $result, 30, '$result after call returns by callback' );

   $loop->remove( $function );
}

# Test queueing
{
   my $function = IO::Async::Function->new(
      min_workers => 1,
      max_workers => 1,
      code => sub { return $_[0] + $_[1] },
   );

   $loop->add( $function );

   my @result;

   my $f1 = $function->call(
      args => [ 1, 2 ],
      on_return => sub { push @result, shift },
      on_error  => sub { die "Test failed early - @_" },
   );
   my $f2 = $function->call(
      args => [ 3, 4 ],
      on_return => sub { push @result, shift },
      on_error  => sub { die "Test failed early - @_" },
   );

   is( $function->workers, 1, '$function->workers is still 1 after 2 calls' );

   isa_ok( $f1, [ "Future" ], '$f1 isa Future' );
   isa_ok( $f2, [ "Future" ], '$f2 isa Future' );

   wait_for { @result == 2 };

   is( \@result, [ 3, 7 ], '@result after both calls return' );

   is( $function->workers, 1, '$function->workers is still 1 after 2 calls return' );

   $loop->remove( $function );
}

# Queue priority
{
   my $serial = 0;
   my $function = IO::Async::Function->new(
      # Keep exactly 1 process so captured lexical works for testing
      min_workers => 1,
      max_workers => 1,
      code => sub { return $serial++ },
   );

   $loop->add( $function );

   # Push something just to make the function busy first
   $function->call( args => [], on_return => sub {}, on_error => sub {} );

   my $f = Future->needs_all(
      $function->call( args => [] ), # no priority
      $function->call( args => [], priority => 1 ),
      $function->call( args => [], priority => 1 ),
      $function->call( args => [], priority => 2 ),
   );

   is( [ ( wait_for_future $f )->get ],
      [ 4, 2, 3, 1 ], '$function->call with priority enqueues correctly' );

   $loop->remove( $function );
}

# References
{
   my $function = IO::Async::Function->new(
      code => sub { return ref( $_[0] ), \$_[1] },
   );

   $loop->add( $function );

   my @result;

   $function->call(
      args => [ \'a', 'b' ],
      on_return => sub { @result = @_ },
      on_error  => sub { die "Test failed early - @_" },
   );

   wait_for { scalar @result };

   is( \@result, [ 'SCALAR', \'b' ], 'Call and result preserves references' );

   $loop->remove( $function );
}

# Exception throwing
{
   my $line = __LINE__ + 2;
   my $function = IO::Async::Function->new(
      code => sub { die shift },
   );

   $loop->add( $function );

   my $err;

   my $f = $function->call(
      args => [ "exception name" ],
      on_return => sub { },
      on_error  => sub { $err = shift },
   );

   wait_for { defined $err };

   like( $err, qr/^exception name at \Q$0\E line \d+\.$/, '$err after exception' );

   is( [ $f->failure ],



( run in 0.683 second using v1.01-cache-2.11-cpan-39bf76dae61 )