AnyEvent-Gearman

 view release on metacpan or  search on metacpan

lib/AnyEvent/Gearman/Connection.pm  view on Meta::CPAN

    # already connected
    return if $self->handler;

    my $g = tcp_connect $self->_host, $self->_port, sub {
        my ($fh) = @_;

        if ($fh) {
            my $handle = AnyEvent::Handle->new(
                fh       => $fh,
                on_read  => sub { $self->process_packet },
                on_error => sub {
                    my @undone = @{ $self->_need_handle },
                                 values %{ $self->_job_handles };
                    $_->event('on_fail') for @undone;

                    $self->_need_handle([]);
                    $self->_job_handles({});
                    $self->mark_dead;
                },
            );
            $self->handler( $handle );
            $_->() for map { $_->[0] } @{ $self->on_connect_callbacks };
        }
        else {
            warn sprintf("Connection failed: %s", $!);
            $self->mark_dead;
            $_->() for map { $_->[1] } @{ $self->on_connect_callbacks };
        }

        $self->on_connect_callbacks( [] );
    };

    weaken $self;
    $self->_con_guard($g);

    $self;
}

sub connected {
    !!shift->handler;
}

sub add_on_ready {
    my ($self, $cb, $eb) = @_;

    if ($self->connected) {
        $cb->();
    }
    else {
        push @{ $self->on_connect_callbacks }, [ $cb, $eb ];
        $self->connect;
    }
}

sub mark_dead {
    my ($self) = @_;
    $self->dead_time( time + 10 );
    $self->clear_handler;
}

sub alive {
    my ($self) = @_;
    $self->dead_time <= time;
}

sub process_packet {
    my $self   = shift;
    my $handle = $self->handler;

    $handle->unshift_read(chunk => 4, sub {
        unless ($_[1] eq "\0RES") {
            die qq[invalid packet: $_[1]"];
        }

        $_[0]->unshift_read( chunk => 8, sub {
            my ($type, $len)   = unpack 'NN', $_[1];
            my $packet_handler = $self->can("process_packet_$type");

            unless ($packet_handler) {
                # Ignore unimplement packet
                $_[0]->unshift_read( chunk => $len, sub {} ) if $len;
                return;
            }

            $packet_handler->( $self, $len );
        });
    });
    weaken $self;
}

__PACKAGE__->meta->make_immutable;

__END__

=head1 NAME

AnyEvent::Gearman::Connection - common base class to handle connection

=head1 SEE ALSO

L<AnyEvent::Gearman>, L<AnyEvent::Gearman::Client>, L<AnyEvent::Gearman::Worker>.

=head1 METHODS

=head2 connect

=head2 connected

=head2 add_on_ready

=head2 mark_dead

=head2 alive

=head2 process_packet

=head1 AUTHOR

Daisuke Murase <typester@cpan.org>

Pedro Melo <melo@cpan.org>

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2009 by KAYAC Inc.

This program is free software; you can redistribute
it and/or modify it under the same terms as Perl itself.

The full text of the license can be found in the
LICENSE file included with this module.

=cut



( run in 1.628 second using v1.01-cache-2.11-cpan-df04353d9ac )