Couchbase-Client
view release on metacpan or search on metacpan
lib/Couchbase/Test/Async/Loop.pm view on Meta::CPAN
use Log::Fu { level => "info" };
use Devel::Peek;
use Array::Assign;
use base qw(POE::Sugar::Attributes);
my $poe_kernel = "POE::Kernel";
sub cbc_connect :Start {
$_[HEAP]->object->connect();
}
sub unhandled :Event(_default) {
log_errf("Got unknown event %s", $_[ARG0]);
}
sub got_error :Event {
log_errf("Got errnum=%d, errstr=%s",
$_[ARG0], $_[ARG1]);
$_[HEAP]->on_error(@_[ARG0,ARG1]);
}
#This would be an event-loop specific implementation of update_event
my %EVMETH_MAP = (
COUCHBASE_WRITE_EVENT, "write",
COUCHBASE_READ_EVENT, "read"
);
sub _activate_events {
my ($cbc_flags, $dupfh, $opaque) = @_;
while (my ($ev,$meth) = each %EVMETH_MAP ) {
if($cbc_flags & $ev) {
log_debugf("Activating event %d on dupfd %d", $ev, fileno($dupfh));
$poe_kernel->${\"select_$meth"}($dupfh, "dispatch_event", $ev, $opaque);
}
}
}
sub _deactivate_events {
my ($cbc_flags, $dupfh) = @_;
while (my ($ev,$meth) = each %EVMETH_MAP ) {
if($cbc_flags & $ev) {
log_debugf("Deactivating event %d on dupfd %d", $ev, fileno($dupfh));
$poe_kernel->${\"select_$meth"}($dupfh);
}
}
}
sub _startstop_events {
my ($events,$prefix,$dupfh) = @_;
while (my ($ev,$meth) = each %EVMETH_MAP) {
if($events & $ev) {
log_debugf("Invoking $prefix: $meth on dupfd %d", fileno($dupfh));
$poe_kernel->${\"$prefix\_$meth"}($dupfh);
}
}
}
sub update_event :Event {
my ($evdata,$action,$flags) = @_[ARG0..ARG2];
my $dupfh = $evdata->[EVIDX_DUPFH];
if($action == EVACTION_WATCH) {
if(!$dupfh) {
open $dupfh, ">&", $evdata->[EVIDX_FD];
_activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
$evdata->[EVIDX_DUPFH] = $dupfh;
} else {
my $events_do_delete = $evdata->[EVIDX_WATCHFLAGS] & (~$flags);
log_debugf("Old events=%x, new events = %x, delete events %x",
$evdata->[EVIDX_WATCHFLAGS], $flags, $events_do_delete);
_activate_events($flags, $dupfh, $evdata->[EVIDX_OPAQUE]);
_deactivate_events($events_do_delete, $dupfh);
}
} elsif ($action == EVACTION_UNWATCH) {
if(!$dupfh) {
warn("Unwatch requested on undefined dup'd filehandle");
return;
}
_deactivate_events($evdata->[EVIDX_WATCHFLAGS], $dupfh);
} elsif ($action == EVACTION_SUSPEND || $action == EVACTION_RESUME) {
if(!$dupfh) {
warn("suspend/resume requested on undefined dup'd filehandle. ".
"fd=".$evdata->[EVIDX_FD]);
}
my $prefix = $action == EVACTION_SUSPEND ? "pause" : "resume";
$prefix = "select_" . $prefix;
_startstop_events($evdata->[EVIDX_WATCHFLAGS], $prefix, $dupfh);
} else {
die("Unhandled action $action");
}
}
sub update_timer :Event {
my ($evdata,$action,$usecs) = @_[ARG0..ARG2];
my $timer_id = $evdata->[EVIDX_PLDATA];
my $seconds;
if($usecs) {
$seconds = ($usecs / (1000*1000));
}
if($action == EVACTION_WATCH) {
if(defined $timer_id) {
log_debugf("Rescheduling timer %d in %0.5f seconds from now",
$timer_id, $seconds);
$poe_kernel->delay_adjust($timer_id, $seconds)
} else {
$timer_id = $poe_kernel->delay_set(
"dispatch_timeout", $seconds, $evdata->[EVIDX_OPAQUE]);
$evdata->[EVIDX_PLDATA] = $timer_id;
log_debugf("Scheduling timer %d for %0.5f seconds from now",
$timer_id, $seconds);
}
} else {
if(defined $timer_id) {
log_debug("Deletion requested for timer $timer_id.");
$poe_kernel->alarm_remove($timer_id);
$evdata->[EVIDX_PLDATA] = undef;
} else {
log_debug("Requested to delete non-existent timer ID");
}
}
}
#this is what an event loop does in order to tell libcouchbase that an event
#has been received
sub dispatch_event :Event {
my ($flags,$opaque) = @_[ARG2..ARG3];
log_debugf("Flags=%d, opaque=%x", $flags, $opaque);
Couchbase::Client::Async->HaveEvent($flags, $opaque);
}
sub dispatch_timeout :Event {
my $opaque = $_[ARG0];
my $flags = 0;
log_debugf("Dispatching timer.. opaque=%x", $opaque);
Couchbase::Client::Async->HaveEvent($flags, $opaque);
}
#### External interface
use Class::XSAccessor {
constructor => 'new',
accessors => [qw(object alias on_ready on_error)]
};
sub spawn {
my ($cls,$session_name,%options) = @_;
my $cb_ready = delete $options{on_ready}
or die ("Must have on_ready callback");
my $user_error_callback = delete $options{on_error};
my $async = Couchbase::Client::Async->new({
( run in 0.442 second using v1.01-cache-2.11-cpan-39bf76dae61 )