Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

sub _fatal {
    my ($self, $errstr) = @_;
    die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
    $self->{error_cb}->($errstr);
}

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
        $self->{hosts} = [ shuffle @hosts ];
    }

    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet
            $self->{server_prop}->{host} = $host;
            $self->{server_prop}->{port} = $port;
            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },
            );
        },
        on_error => sub {
            my ($fh, $fatal, $errmsg) = @_;
            # Some error occurred, such as a read error
            $self->_reset_connection;
            $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
        },
        on_eof => sub {
            my ($fh) = @_;
            # The server has closed the connection without sending DISCONNECT
            $self->_reset_connection;
            $self->_fatal("MQTT broker at $host:$port has gone away");
        },
        on_read => sub {
            my ($fh) = @_;

            my $packet_type;
            my $packet_flags;

            my $rbuff_len;
            my $packet_len;

            my $mult;
            my $offs;
            my $byte;

            my $timing_packets;

            unless (defined $BUSY_SINCE) {
                # Measure time elapsed while processing incoming packets
                $BUSY_SINCE = Time::HiRes::time;
                $timing_packets = 1; 
            }



( run in 0.856 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )