AnyEvent-Task

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
0.804   2017-02-12
  * Bugfix for hung worker timeout not destroying object quickly
 
0.803   2017-02-12
  * Bugfix for timeout errors thrown by checkout with Log::Defer
    objects installed.
 
0.802   2015-10-23
  * Stop using AnyEvent::Strict in the tests. It creates an
    AE timer at compile time, therefore initializing the event
    loop before we've had a chance to fork our server.
  * Before forking in fork_task_server, assert that the AE
    loop hasn't been initialized.
 
0.801   2014-02-15
  * Bugfix: Fix memory leak of client objects.
  * Change: Make hung worker timeout actually terminate the
    worker to free up resources immediately.
 
0.800   2014-02-15

Changes  view on Meta::CPAN

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  * Major documentation updates
 
0.750   2013-04-08
  * Backwards-incompatible change: The behaviour enabled by the
    undocumented client option added in the previous release,
    refork_after_error, is now the default behaviour. Instead
    there is a new option called dont_refork_after_error to get
    back the original behaviour.
  * If log_defer_object is passed in when creating a checkout,
    operations on this checkout are recorded as Log::Defer
    timer events
  * Major documentation updates, including working SYNOPSIS
 
0.720   2013-03-14
  * Integration with Log::Defer so workers can log stuff and
    it shows up in the client logs
  * Complete the implementation of worker error recovery
  * Remove undocumented "sk" protocol feature
  * Big test-suite refactor/improvements
  * Lots more/better docs, including for error handling/logging

README  view on Meta::CPAN

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
Each client maintains a "pool" of connections to worker processes. Every
time a checkout is requested, the request is placed into a first-come,
first-serve queue. Once a worker process becomes available, it is
associated with that checkout until that checkout is garbage collected
which in perl means as soon as it is no longer needed. Each checkout
also maintains a queue of requested method-calls so that as soon as a
worker process is allocated to a checkout, any queued method calls are
filled in order.
 
"timeout" can be passed as a keyword argument to "checkout". Once a
request is queued up on that checkout, a timer of "timout" seconds
(default is 30, undef means infinity) is started. If the request
completes during this timeframe, the timer is cancelled. If the timer
expires, the worker connection is terminated and an exception is thrown
in the dynamic context of the callback (see the "ERROR HANDLING"
section).
 
Note that since timeouts are associated with a checkout, checkouts can
be created before the server is started. As long as the server is
running within "timeout" seconds, no error will be thrown and no
requests will be lost. The client will continually try to acquire worker
processes until a server is available, and once one is available it will
attempt to allocate all queued checkouts.

README  view on Meta::CPAN

338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
 
    AnyEvent::Task::Server->new(
      name => 'sleeper',
      listen => ['unix/', '/tmp/anyevent-task.socket'],
      interface => sub {
        logger->info('about to compute some operation');
        {
          my $timer = logger->timer('computing some operation');
          select undef,undef,undef, 1; ## sleep for 1 second
        }
      },
    )->run;
 
Note: Portable server code should never call "sleep" because on some
systems it will interfere with the recoverable worker timeout feature
implemented with "https://metacpan.org/pod/SIGALRM">SIGALRM".
 
In your client code, pass a Log::Defer object in when you create a

README  view on Meta::CPAN

399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
                            '0.023881061050415',
                            30,
                            'about to compute some operation'
                          ],
                          [
                            '1.025965',
                            30,
                            'finished some operation'
                          ]
                        ],
              'timers' => {
                            'computing some operation' => [
                                                            '0.024089061050415',
                                                            '1.02470206105041'
                                                          ]
                          }
            };
 
ERROR HANDLING
    In a synchronous program, if you expected some operation to throw an
    exception you might wrap it in "eval" like this:

lib/AnyEvent/Task.pm  view on Meta::CPAN

169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
 
 
 
=head1 DESIGN
 
Both client and server are of course built with L<AnyEvent>. However, workers can't use AnyEvent (yet). I've never found a need to do event processing in the worker since if the library you wish to use is already AnyEvent-compatible you can simply us...
 
Each client maintains a "pool" of connections to worker processes. Every time a checkout is requested, the request is placed into a first-come, first-serve queue. Once a worker process becomes available, it is associated with that checkout until that...
 
C<timeout> can be passed as a keyword argument to C<checkout>. Once a request is queued up on that checkout, a timer of C<timout> seconds (default is 30, undef means infinity) is started. If the request completes during this timeframe, the timer is c...
 
Note that since timeouts are associated with a checkout, checkouts can be created before the server is started. As long as the server is running within C<timeout> seconds, no error will be thrown and no requests will be lost. The client will continua...
 
Because of checkout queuing, the maximum number of worker processes a client will attempt to obtain can be limited with the C<max_workers> argument when creating a client object. If there are more live checkouts than C<max_workers>, the remaining che...
 
The C<min_workers> argument determines how many "hot-standby" workers should be pre-forked when creating the client. The default is 2 though note that this may change to 0 in the future.

lib/AnyEvent/Task.pm  view on Meta::CPAN

267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
 
    AnyEvent::Task::Server->new(
      name => 'sleeper',
      listen => ['unix/', '/tmp/anyevent-task.socket'],
      interface => sub {
        logger->info('about to compute some operation');
        {
          my $timer = logger->timer('computing some operation');
          select undef,undef,undef, 1; ## sleep for 1 second
        }
      },
    )->run;
 
 
Note: Portable server code should never call C<sleep> because on some systems it will interfere with the recoverable worker timeout feature implemented with C<SIGALRM>.
 
 
In your client code, pass a L<Log::Defer> object in when you create a checkout:

lib/AnyEvent/Task.pm  view on Meta::CPAN

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
                '0.023881061050415',
                30,
                'about to compute some operation'
              ],
              [
                '1.025965',
                30,
                'finished some operation'
              ]
            ],
  'timers' => {
                'computing some operation' => [
                                                '0.024089061050415',
                                                '1.02470206105041'
                                              ]
              }
};

lib/AnyEvent/Task.pm  view on Meta::CPAN

514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
 
TODO
 
! max checkout queue size
  - start delivering fatal errors to some (at front of queue or back of queue though?)
  - write test for this
 
! docs: write good error handling examples
 
Make names more consistent between callback::frame backtraces and auto-generated log::defer timers
 
make server not use AnyEvent so don't have to worry about workers unlinking unix socket in dtors
 
In a graceful shutdown scenario, servers wait() on all their children before terminating.
  - Support relinquishing accept() socket during this period?
 
Document hung_worker_timeout and SIGALRM stuff
 
Wire protocol:
  - Support something other than JSON? Sereal?

lib/AnyEvent/Task/Client.pm  view on Meta::CPAN

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    my $service = $self->{connect}->[1];
 
    my $worker_guard;
    $self->{connecting_workers}->{0 + $worker_guard} = $worker_guard = tcp_connect $host, $service, sub {
      my $fh = shift;
 
      delete $self->{connecting_workers}->{0 + $worker_guard};
 
      if (!$fh) {
        $self->{total_workers}--;
        $self->install_populate_workers_timer;
        return;
      }
 
      delete $self->{populate_workers_timer};
 
      my $worker; $worker = new AnyEvent::Handle
                              fh => $fh,
                              on_read => sub { }, ## So we always have a read watcher and can instantly detect worker deaths
                              on_error => sub {
                                my ($worker, $fatal, $message) = @_;
 
                                my $checkout = $self->{workers_to_checkouts}->{0 + $worker};
 
                                $checkout->{timeout_timer} = undef; ## timer keeps a circular reference
 
                                $checkout->throw_fatal_error('worker connection suddenly died') if $checkout;
 
                                $self->destroy_worker($worker);
                                $self->populate_workers;
                              };
 
      $self->{worker_checkout_counts}->{0 + $worker} = 0;
 
      $self->make_worker_available($worker);
 
      $self->try_to_fill_pending_checkouts;
    };
  }
 
}
 
 
sub install_populate_workers_timer {
  my ($self) = @_;
 
  return if exists $self->{populate_workers_timer};
 
  $self->{populate_workers_timer} = AE::timer 0.2, 1, sub {
    $self->populate_workers;
  };
}
 
 
sub try_to_fill_pending_checkouts {
  my ($self) = @_;
 
  return unless @{$self->{pending_checkouts}};

lib/AnyEvent/Task/Client/Checkout.pm  view on Meta::CPAN

70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    my %args = (code => $request->[-1]);
 
    $args{name} = $name if defined $name;
 
    $request->[-1] = frame(%args)
      unless Callback::Frame::is_frame($request->[-1]);
  }
 
  push @{$self->{pending_requests}}, $request;
 
  $self->_install_timeout_timer;
 
  $self->_try_to_fill_requests;
 
  return;
}
 
sub _install_timeout_timer {
  my ($self) = @_;
 
  return if !defined $self->{timeout};
  return if exists $self->{timeout_timer};
 
  $self->{timeout_timer} = AE::timer $self->{timeout}, 0, sub {
    delete $self->{timeout_timer};
 
    $self->{client}->remove_pending_checkout($self);
 
    if (exists $self->{worker}) {
      $self->{client}->destroy_worker($self->{worker});
      delete $self->{worker};
    }
 
    $self->throw_fatal_error("timed out after $self->{timeout} seconds");
  };

lib/AnyEvent/Task/Client/Checkout.pm  view on Meta::CPAN

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    return;
  }
 
  my $method_name = $request->[0];
 
  if (!defined $method_name) {
    $method_name = '->()';
    shift @$request;
  }
 
  $self->_install_timeout_timer;
 
  $self->{worker}->push_write( json => [ 'do', {}, @$request, ], );
 
  my $timer;
 
  if ($self->{log_defer_object}) {
    $timer = $self->{log_defer_object}->timer($method_name);
  }
 
  $self->{cmd_handler} = sub {
    my ($handle, $response) = @_;
 
    undef $timer;
 
    my ($response_code, $meta, $response_value) = @$response;
 
    if ($self->{log_defer_object} && $meta->{ld}) {
      $self->{log_defer_object}->merge($meta->{ld});
    }
 
    if ($response_code eq 'ok') {
      local $@ = undef;
      $cb->($self, $response_value);
    } elsif ($response_code eq 'er') {
      $self->_throw_error($response_value);
    } else {
      die "Unrecognized response_code: $response_code";
    }
 
    delete $self->{timeout_timer};
    delete $self->{cmd_handler};
 
    $self->_try_to_fill_requests;
  };
 
  $self->{worker}->push_read( json => $self->{cmd_handler} );
}
 
sub DESTROY {
  my ($self) = @_;

lib/AnyEvent/Task/Client/Checkout.pm  view on Meta::CPAN

216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    if ($self->{fatal_error} || ($self->{error_occurred} && $self->{client} && !$self->{client}->{dont_refork_after_error})) {
      $self->{client}->destroy_worker($worker) if $self->{client};
      $self->{client}->populate_workers if $self->{client};
    } else {
      $worker->push_write( json => [ 'dn', {} ] );
      $self->{client}->make_worker_available($worker) if $self->{client};
      $self->{client}->try_to_fill_pending_checkouts if $self->{client};
    }
  }
 
  $self->{pending_requests} = $self->{current_cb} = $self->{timeout_timer} = $self->{cmd_handler} = undef;
}
 
 
1;

t/error-clears-checkout-queue.t  view on Meta::CPAN

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
);
 
 
my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
             );
 
 
my $cv = AE::cv;
 
my $timeout_watcher = AE::timer 0.5, 0, sub {
  $cv->send;
};
 
my $num_exceptions_caught = 0;
 
frame(code => sub {
  my $checkout = $client->checkout;
 
  $checkout->success(sub { ok(1, "first in checkout queue") });
  $checkout->success(sub { ok(1, "second in checkout queue") });

t/logger.t  view on Meta::CPAN

7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
 
use Test::More tests => 14;
 
 
## The point of this test is to verify Log::Defer integration.
## If log_defer_object is passed in when creating a checkout:
##   1) The server can add log messages, timers, data, etc to
##      this object by using the AnyEvent::Task::Logger::logger
##   2) Every time a request is placed onto a checkout, a timer
##      is started in this object and it is ended once the
##      request is fulfilled.
 
 
 
AnyEvent::Task::Server::fork_task_server(
  listen => ['unix/', '/tmp/anyevent-task-test.socket'],
  interface => { normal =>
                   sub {
                     logger->info("hello from", $$);
                     logger->timer("junk");
                     1;
                   },
                 error =>
                   sub {
                     logger->warn("something weird happened");
                     die "uh oh";
                   },
                 sleep =>
                   sub { select undef,undef,undef,shift; },
               },

t/logger.t  view on Meta::CPAN

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
my $log_defer_object = Log::Defer->new(sub {
  my $msg = shift;
 
  is($msg->{logs}->[0]->[2], 'hello from', 'message from client');
  is($msg->{logs}->[1]->[2], 'hello from', 'message from worker');
  isnt($msg->{logs}->[0]->[3], $msg->{logs}->[1]->[3], 'pids are different');
  is($msg->{logs}->[2]->[2], 'after', 'order of msgs ok');
  is($msg->{logs}->[3]->[2], 'something weird happened', 'log messages transfered even on error');
 
  is(@{$msg->{timers}}, 5, 'right number of timers');
  is($msg->{timers}->[0]->[0], 'normal', 'normal is timer 1');
  is($msg->{timers}->[1]->[0], 'junk', 'junk is timer 2');
  is($msg->{timers}->[2]->[0], 'sleep', 'sleep is timer 3');
  is($msg->{timers}->[3]->[0], 'sleep', 'sleep is timer 4');
  is($msg->{timers}->[4]->[0], 'error', 'error is timer 5');
});
 
$log_defer_object->info("hello from", $$);
 
$client->checkout(log_defer_object => $log_defer_object)->normal(sub {
  my ($checkout, $ret) = @_;
 
  $log_defer_object->info("after");
 
  $checkout->sleep(0.1, sub {});

t/logger.t  view on Meta::CPAN

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
 
$cv->recv;
 
 
$cv = AE::cv;
 
$log_defer_object = Log::Defer->new(sub {
  my $msg = shift;
 
  is($msg->{timers}->[0]->[0], '->()', "didn't leak first arg when called as code ref");
});
 
$client->checkout(log_defer_object => $log_defer_object)->('first arg', frame(code => sub {
  die "error not thrown by calling interface as a sub?";
}, catch => sub {
  ok(1, 'error caught');
  $cv->send;
}));
 
$cv->recv;

t/max_checkouts.t  view on Meta::CPAN

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
               max_workers => 1,
               max_checkouts => 2,
             );
 
 
my $cv = AE::cv;
 
my $pid;
 
my $timeout_watcher = AE::timer 1.0, 0, sub {
  print STDERR "hanged, probably because socket was unlinked";
  exit;
};
 
{
  $client->checkout->(sub {
    my ($checkout, $ret) = @_;
    $pid = $ret;
  });

t/timeout-log-defer.t  view on Meta::CPAN

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    });
  } frame_catch {
    $ld->warn("keep alive 2");
    my $err = $@;
    ok(1, "timeout hit");
    ok($err =~ /timed out after/, 'correct err msg');
    $error_thrown = 1;
  };
}
 
my $timer = AE::timer 1, 0, sub {
  fail("log defer object destroyed");
  $cv->send;
};
 
$cv->recv;



( run in 0.385 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )