view release on metacpan or search on metacpan
inc/Spiffy.pm view on Meta::CPAN
sub_start =>
"sub {\n",
set_default =>
" \$_[0]->{%s} = %s\n unless exists \$_[0]->{%s};\n",
init =>
" return \$_[0]->{%s} = do { my \$self = \$_[0]; %s }\n" .
" unless \$#_ > 0 or defined \$_[0]->{%s};\n",
weak_init =>
" return do {\n" .
" \$_[0]->{%s} = do { my \$self = \$_[0]; %s };\n" .
" Scalar::Util::weaken(\$_[0]->{%s}) if ref \$_[0]->{%s};\n" .
" \$_[0]->{%s};\n" .
" } unless \$#_ > 0 or defined \$_[0]->{%s};\n",
return_if_get =>
" return \$_[0]->{%s} unless \$#_ > 0;\n",
set =>
" \$_[0]->{%s} = \$_[1];\n",
weaken =>
" Scalar::Util::weaken(\$_[0]->{%s}) if ref \$_[0]->{%s};\n",
sub_end =>
" return \$_[0]->{%s};\n}\n",
);
sub field {
my $package = caller;
my ($args, @values) = do {
no warnings;
local *boolean_arguments = sub { (qw(-weak)) };
local *paired_arguments = sub { (qw(-package -init)) };
Spiffy->parse_arguments(@_);
};
my ($field, $default) = @values;
$package = $args->{-package} if defined $args->{-package};
die "Cannot have a default for a weakened field ($field)"
if defined $default && $args->{-weak};
return if defined &{"${package}::$field"};
require Scalar::Util if $args->{-weak};
my $default_string =
( ref($default) eq 'ARRAY' and not @$default )
? '[]'
: (ref($default) eq 'HASH' and not keys %$default )
? '{}'
: default_as_code($default);
my $code = $code{sub_start};
if ($args->{-init}) {
my $fragment = $args->{-weak} ? $code{weak_init} : $code{init};
$code .= sprintf $fragment, $field, $args->{-init}, ($field) x 4;
}
$code .= sprintf $code{set_default}, $field, $default_string, $field
if defined $default;
$code .= sprintf $code{return_if_get}, $field;
$code .= sprintf $code{set}, $field;
$code .= sprintf $code{weaken}, $field, $field
if $args->{-weak};
$code .= sprintf $code{sub_end}, $field;
my $sub = eval $code;
die $@ if $@;
no strict 'refs';
*{"${package}::$field"} = $sub;
return $code if defined wantarray;
}
lib/AnyEvent/Gearman/Client/Connection.pm view on Meta::CPAN
package AnyEvent::Gearman::Client::Connection;
use Any::Moose;
use Scalar::Util 'weaken';
extends 'AnyEvent::Gearman::Connection';
no Any::Moose;
sub add_task {
my ($self, $task, $on_complete, $on_error, $type) = @_;
$self->add_on_ready(
sub {
push @{ $self->_need_handle }, $task;
$self->handler->push_write( $task->pack_req($type) );
$on_complete->();
},
$on_error,
);
weaken($self);
return;
}
sub process_work { # common handler for WORK_*
my ($self, $len, $cb) = @_;
my $handle = $self->handler;
$handle->unshift_read( line => "\0", sub {
my $job_handle = $_[1];
lib/AnyEvent/Gearman/Client/Connection.pm view on Meta::CPAN
my $handle = $self->handler;
$handle->unshift_read( chunk => $len, sub {
my $job_handle = $_[1];
my $task = shift @{ $self->_need_handle } or return;
$task->job_handle($job_handle);
$self->_job_handles->{ $job_handle } = $task;
$task->event( 'on_created' );
});
weaken $self;
}
sub process_packet_12 { # WORK_STATUS
my ($self, $len) = @_;
my $handle = $self->handler;
$handle->unshift_read( line => "\0", sub {
my $job_handle = $_[1];
$len -= length($_[1]) + 1;
lib/AnyEvent/Gearman/Client/Connection.pm view on Meta::CPAN
$len -= length($_[1]) + 1;
$_[0]->unshift_read( chunk => $len, sub {
my $denominator = $_[1];
my $task = $self->_job_handles->{ $job_handle } or return;
$task->event( on_status => $numerator, $denominator );
});
});
});
weaken $self;
}
sub process_packet_13 { # WORK_COMPLETE
my ($self) = @_;
push @_, sub {
my ($job_handle, $data) = @_;
my $task = delete $self->_job_handles->{ $job_handle } or return;
$task->event( on_complete => $data );
};
weaken $self;
goto \&process_work;
}
sub process_packet_14 { # WORK_FAIL
my ($self, $len) = @_;
my $handle = $self->handler;
$handle->unshift_read( chunk => $len, sub {
my $job_handle = $_[1];
my $task = delete $self->_job_handles->{ $job_handle } or return;
$task->event('on_fail');
});
weaken $self;
}
sub process_packet_25 { # WORK_EXCEPTION
my ($self) = @_;
push @_, sub {
my ($job_handle, $data) = @_;
my $task = $self->_job_handles->{ $job_handle } or return;
$task->event( on_exception => $data );
};
Scalar::Util::weaken($self);
goto \&process_work;
}
sub process_packet_28 { # WORK_DATA
my ($self) = @_;
push @_, sub {
my ($job_handle, $data) = @_;
my $task = $self->_job_handles->{ $job_handle } or return;
$task->event( on_data => $data );
};
weaken $self;
goto \&process_work;
}
sub process_packet_29 { # WORK_WARNING
my ($self) = @_;
push @_, sub {
my ($job_handle, $data) = @_;
my $task = $self->_job_handles->{ $job_handle } or return;
$task->event( on_warning => $data );
};
weaken $self;
goto \&process_work;
}
__PACKAGE__->meta->make_immutable;
__END__
=head1 NAME
lib/AnyEvent/Gearman/Connection.pm view on Meta::CPAN
package AnyEvent::Gearman::Connection;
use Any::Moose;
use Scalar::Util 'weaken';
use AnyEvent::Socket;
use AnyEvent::Handle;
has hostspec => (
is => 'ro',
isa => 'Str',
required => 1,
);
lib/AnyEvent/Gearman/Connection.pm view on Meta::CPAN
}
else {
warn sprintf("Connection failed: %s", $!);
$self->mark_dead;
$_->() for map { $_->[1] } @{ $self->on_connect_callbacks };
}
$self->on_connect_callbacks( [] );
};
weaken $self;
$self->_con_guard($g);
$self;
}
sub connected {
!!shift->handler;
}
sub add_on_ready {
lib/AnyEvent/Gearman/Connection.pm view on Meta::CPAN
unless ($packet_handler) {
# Ignore unimplement packet
$_[0]->unshift_read( chunk => $len, sub {} ) if $len;
return;
}
$packet_handler->( $self, $len );
});
});
weaken $self;
}
__PACKAGE__->meta->make_immutable;
__END__
=head1 NAME
AnyEvent::Gearman::Connection - common base class to handle connection
lib/AnyEvent/Gearman/Worker/Connection.pm view on Meta::CPAN
package AnyEvent::Gearman::Worker::Connection;
use Any::Moose;
use Scalar::Util 'weaken';
require bytes;
use AnyEvent::Gearman::Constants;
use AnyEvent::Gearman::Job;
extends 'AnyEvent::Gearman::Connection';
has grabbing => (
is => 'rw',
isa => 'Bool',
lib/AnyEvent/Gearman/Worker/Connection.pm view on Meta::CPAN
$self->add_on_ready(
sub {
$self->handler->push_write(
"\0REQ" . pack('NN', $type, bytes::length($args)) . $args
);
},
sub {
warn sprintf 'Failed to send request to "%s": %s', $self->hostspec, $!;
},
);
weaken $self;
}
sub register_function {
my ($self, $func_name) = @_;
my $prefix = $self->context->prefix;
$func_name = "$prefix\t$func_name" if $prefix;
$self->can_do($func_name);
$self->grab_job unless $self->grabbing;
lib/AnyEvent/Gearman/Worker/Connection.pm view on Meta::CPAN
},
on_warning => sub {
my ($job, $warning) = @_;
$self->request(WORK_WARNING, "$job_handle\0$warning");
},
);
$self->work( $job );
});
});
});
weaken $self;
}
sub work {
my ($self, $job) = @_;
my $cb = $self->context->functions->{ $job->function } or return;
$cb->($job);
}
__PACKAGE__->meta->make_immutable;