AnyEvent-Task
view release on metacpan
or search on metacpan
Changes
view on Meta::CPAN
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | 0.804 2017-02-12
* Bugfix for hung worker timeout not destroying object quickly
0.803 2017-02-12
* Bugfix for timeout errors thrown by checkout with Log::Defer
objects installed.
0.802 2015-10-23
* Stop using AnyEvent::Strict in the tests. It creates an
AE timer at compile time , therefore initializing the event
loop before we've had a chance to fork our server.
* Before forking in fork_task_server, assert that the AE
loop hasn't been initialized.
0.801 2014-02-15
* Bugfix: Fix memory leak of client objects.
* Change: Make hung worker timeout actually terminate the
worker to free up resources immediately.
0.800 2014-02-15
|
Changes
view on Meta::CPAN
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | * Major documentation updates
0.750 2013-04-08
* Backwards-incompatible change: The behaviour enabled by the
undocumented client option added in the previous release,
refork_after_error, is now the default behaviour. Instead
there is a new option called dont_refork_after_error to get
back the original behaviour.
* If log_defer_object is passed in when creating a checkout,
operations on this checkout are recorded as Log::Defer
timer events
* Major documentation updates, including working SYNOPSIS
0.720 2013-03-14
* Integration with Log::Defer so workers can log stuff and
it shows up in the client logs
* Complete the implementation of worker error recovery
* Remove undocumented "sk" protocol feature
* Big test-suite refactor/improvements
* Lots more/better docs, including for error handling/logging
|
README
view on Meta::CPAN
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | Each client maintains a "pool" of connections to worker processes. Every
time a checkout is requested, the request is placed into a first-come,
first-serve queue. Once a worker process becomes available, it is
associated with that checkout until that checkout is garbage collected
which in perl means as soon as it is no longer needed. Each checkout
also maintains a queue of requested method-calls so that as soon as a
worker process is allocated to a checkout, any queued method calls are
filled in order.
"timeout" can be passed as a keyword argument to "checkout" . Once a
request is queued up on that checkout, a timer of "timout" seconds
( default is 30, undef means infinity) is started. If the request
completes during this timeframe, the timer is cancelled. If the timer
expires, the worker connection is terminated and an exception is thrown
in the dynamic context of the callback (see the "ERROR HANDLING"
section).
Note that since timeouts are associated with a checkout, checkouts can
be created before the server is started. As long as the server is
running within "timeout" seconds, no error will be thrown and no
requests will be lost. The client will continually try to acquire worker
processes until a server is available, and once one is available it will
attempt to allocate all queued checkouts.
|
README
view on Meta::CPAN
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 | AnyEvent::Task::Server->new(
name => 'sleeper' ,
listen => [ 'unix/' , '/tmp/anyevent-task.socket' ],
interface => sub {
logger->info( 'about to compute some operation' );
{
my $timer = logger->timer( 'computing some operation' );
select undef , undef , undef , 1;
}
},
)->run;
Note: Portable server code should never call "sleep" because on some
systems it will interfere with the recoverable worker timeout feature
In your client code, pass a Log::Defer object in when you create a
|
README
view on Meta::CPAN
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 | '0.023881061050415' ,
30,
'about to compute some operation'
],
[
'1.025965' ,
30,
'finished some operation'
]
],
'timers' => {
'computing some operation' => [
'0.024089061050415' ,
'1.02470206105041'
]
}
};
ERROR HANDLING
In a synchronous program, if you expected some operation to throw an
exception you might wrap it in "eval" like this:
|
lib/AnyEvent/Task.pm
view on Meta::CPAN
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | |
lib/AnyEvent/Task.pm
view on Meta::CPAN
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | AnyEvent::Task::Server->new(
name => 'sleeper' ,
listen => [ 'unix/' , '/tmp/anyevent-task.socket' ],
interface => sub {
logger->info( 'about to compute some operation' );
{
my $timer = logger->timer( 'computing some operation' );
select undef , undef , undef , 1;
}
},
)->run;
Note: Portable server code should never call C< sleep > because on some systems it will interfere with the recoverable worker timeout feature implemented with C<SIGALRM>.
In your client code, pass a L<Log::Defer> object in when you create a checkout:
|
lib/AnyEvent/Task.pm
view on Meta::CPAN
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 | '0.023881061050415' ,
30,
'about to compute some operation'
],
[
'1.025965' ,
30,
'finished some operation'
]
],
'timers' => {
'computing some operation' => [
'0.024089061050415' ,
'1.02470206105041'
]
}
};
|
lib/AnyEvent/Task.pm
view on Meta::CPAN
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 | TODO
! max checkout queue size
- start delivering fatal errors to some (at front of queue or back of queue though?)
- write test for this
! docs: write good error handling examples
Make names more consistent between callback::frame backtraces and auto-generated log ::defer timers
make server not use AnyEvent so don't have to worry about workers unlinking unix socket in dtors In a graceful shutdown scenario, servers wait () on all their children before terminating.
- Support relinquishing accept () socket during this period?
Document hung_worker_timeout and SIGALRM stuff
Wire protocol:
- Support something other than JSON? Sereal?
|
lib/AnyEvent/Task/Client.pm
view on Meta::CPAN
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | my $service = $self ->{ connect }->[1];
my $worker_guard ;
$self ->{connecting_workers}->{0 + $worker_guard } = $worker_guard = tcp_connect $host , $service , sub {
my $fh = shift ;
delete $self ->{connecting_workers}->{0 + $worker_guard };
if (! $fh ) {
$self ->{total_workers}--;
$self ->install_populate_workers_timer;
return ;
}
delete $self ->{populate_workers_timer};
my $worker ; $worker = new AnyEvent::Handle
fh => $fh ,
on_read => sub { },
on_error => sub {
my ( $worker , $fatal , $message ) = @_ ;
my $checkout = $self ->{workers_to_checkouts}->{0 + $worker };
$checkout ->{timeout_timer} = undef ;
$checkout ->throw_fatal_error( 'worker connection suddenly died' ) if $checkout ;
$self ->destroy_worker( $worker );
$self ->populate_workers;
};
$self ->{worker_checkout_counts}->{0 + $worker } = 0;
$self ->make_worker_available( $worker );
$self ->try_to_fill_pending_checkouts;
};
}
}
sub install_populate_workers_timer {
my ( $self ) = @_ ;
return if exists $self ->{populate_workers_timer};
$self ->{populate_workers_timer} = AE::timer 0.2, 1, sub {
$self ->populate_workers;
};
}
sub try_to_fill_pending_checkouts {
my ( $self ) = @_ ;
return unless @{ $self ->{pending_checkouts}};
|
lib/AnyEvent/Task/Client/Checkout.pm
view on Meta::CPAN
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | my %args = ( code => $request ->[-1]);
$args {name} = $name if defined $name ;
$request ->[-1] = frame( %args )
unless Callback::Frame::is_frame( $request ->[-1]);
}
push @{ $self ->{pending_requests}}, $request ;
$self ->_install_timeout_timer;
$self ->_try_to_fill_requests;
return ;
}
sub _install_timeout_timer {
my ( $self ) = @_ ;
return if ! defined $self ->{timeout};
return if exists $self ->{timeout_timer};
$self ->{timeout_timer} = AE::timer $self ->{timeout}, 0, sub {
delete $self ->{timeout_timer};
$self ->{client}->remove_pending_checkout( $self );
if ( exists $self ->{worker}) {
$self ->{client}->destroy_worker( $self ->{worker});
delete $self ->{worker};
}
$self ->throw_fatal_error( "timed out after $self->{timeout} seconds" );
};
|
lib/AnyEvent/Task/Client/Checkout.pm
view on Meta::CPAN
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | return ;
}
my $method_name = $request ->[0];
if (! defined $method_name ) {
$method_name = '->()' ;
shift @$request ;
}
$self ->_install_timeout_timer;
$self ->{worker}->push_write( json => [ 'do' , {}, @$request , ], );
my $timer ;
if ( $self ->{log_defer_object}) {
$timer = $self ->{log_defer_object}->timer( $method_name );
}
$self ->{cmd_handler} = sub {
my ( $handle , $response ) = @_ ;
undef $timer ;
my ( $response_code , $meta , $response_value ) = @$response ;
if ( $self ->{log_defer_object} && $meta ->{ld}) {
$self ->{log_defer_object}->merge( $meta ->{ld});
}
if ( $response_code eq 'ok' ) {
local $@ = undef ;
$cb ->( $self , $response_value );
} elsif ( $response_code eq 'er' ) {
$self ->_throw_error( $response_value );
} else {
die "Unrecognized response_code: $response_code" ;
}
delete $self ->{timeout_timer};
delete $self ->{cmd_handler};
$self ->_try_to_fill_requests;
};
$self ->{worker}->push_read( json => $self ->{cmd_handler} );
}
sub DESTROY {
my ( $self ) = @_ ;
|
lib/AnyEvent/Task/Client/Checkout.pm
view on Meta::CPAN
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | if ( $self ->{fatal_error} || ( $self ->{error_occurred} && $self ->{client} && ! $self ->{client}->{dont_refork_after_error})) {
$self ->{client}->destroy_worker( $worker ) if $self ->{client};
$self ->{client}->populate_workers if $self ->{client};
} else {
$worker ->push_write( json => [ 'dn' , {} ] );
$self ->{client}->make_worker_available( $worker ) if $self ->{client};
$self ->{client}->try_to_fill_pending_checkouts if $self ->{client};
}
}
$self ->{pending_requests} = $self ->{current_cb} = $self ->{timeout_timer} = $self ->{cmd_handler} = undef ;
}
1;
|
t/error-clears-checkout-queue.t
view on Meta::CPAN
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | );
my $client = AnyEvent::Task::Client->new(
connect => [ 'unix/' , '/tmp/anyevent-task-test.socket' ],
);
my $cv = AE::cv;
my $timeout_watcher = AE::timer 0.5, 0, sub {
$cv -> send ;
};
my $num_exceptions_caught = 0;
frame( code => sub {
my $checkout = $client ->checkout;
$checkout ->success( sub { ok(1, "first in checkout queue" ) });
$checkout ->success( sub { ok(1, "second in checkout queue" ) });
|
t/logger.t
view on Meta::CPAN
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | AnyEvent::Task::Server::fork_task_server(
listen => [ 'unix/' , '/tmp/anyevent-task-test.socket' ],
interface => { normal =>
sub {
logger->info( "hello from" , $$);
logger->timer( "junk" );
1;
},
error =>
sub {
logger-> warn ( "something weird happened" );
die "uh oh" ;
},
sleep =>
sub { select undef , undef , undef , shift ; },
},
|
t/logger.t
view on Meta::CPAN
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | my $log_defer_object = Log::Defer->new( sub {
my $msg = shift ;
is( $msg ->{logs}->[0]->[2], 'hello from' , 'message from client' );
is( $msg ->{logs}->[1]->[2], 'hello from' , 'message from worker' );
isnt( $msg ->{logs}->[0]->[3], $msg ->{logs}->[1]->[3], 'pids are different' );
is( $msg ->{logs}->[2]->[2], 'after' , 'order of msgs ok' );
is( $msg ->{logs}->[3]->[2], 'something weird happened' , 'log messages transfered even on error' );
is(@{ $msg ->{timers}}, 5, 'right number of timers' );
is( $msg ->{timers}->[0]->[0], 'normal' , 'normal is timer 1' );
is( $msg ->{timers}->[1]->[0], 'junk' , 'junk is timer 2' );
is( $msg ->{timers}->[2]->[0], 'sleep' , 'sleep is timer 3' );
is( $msg ->{timers}->[3]->[0], 'sleep' , 'sleep is timer 4' );
is( $msg ->{timers}->[4]->[0], 'error' , 'error is timer 5' );
});
$log_defer_object ->info( "hello from" , $$);
$client ->checkout( log_defer_object => $log_defer_object )->normal( sub {
my ( $checkout , $ret ) = @_ ;
$log_defer_object ->info( "after" );
$checkout -> sleep (0.1, sub {});
|
t/logger.t
view on Meta::CPAN
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | $cv -> recv ;
$cv = AE::cv;
$log_defer_object = Log::Defer->new( sub {
my $msg = shift ;
is( $msg ->{timers}->[0]->[0], '->()' , "didn't leak first arg when called as code ref" );
});
$client ->checkout( log_defer_object => $log_defer_object )->( 'first arg' , frame( code => sub {
die "error not thrown by calling interface as a sub?" ;
}, catch => sub {
ok(1, 'error caught' );
$cv -> send ;
}));
$cv -> recv ;
|
t/max_checkouts.t
view on Meta::CPAN
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | connect => [ 'unix/' , '/tmp/anyevent-task-test.socket' ],
max_workers => 1,
max_checkouts => 2,
);
my $cv = AE::cv;
my $pid ;
my $timeout_watcher = AE::timer 1.0, 0, sub {
print STDERR "hanged, probably because socket was unlinked" ;
exit ;
};
{
$client ->checkout->( sub {
my ( $checkout , $ret ) = @_ ;
$pid = $ret ;
});
|
t/timeout-log-defer.t
view on Meta::CPAN
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | });
} frame_catch {
$ld -> warn ( "keep alive 2" );
my $err = $@;
ok(1, "timeout hit" );
ok( $err =~ /timed out after /, 'correct err msg' );
$error_thrown = 1;
};
}
my $timer = AE::timer 1, 0, sub {
fail( "log defer object destroyed" );
$cv -> send ;
};
$cv -> recv ;
|