Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers
        packet_seq      => 1,        # sequence used for packet ids
        subscr_seq      => 1,        # sequence used for subscription ids
        alias_seq       => 1,        # sequence used for topic alias ids
        use_alias       => 0,        # topic alias enabled
        config          => \%args,
    };

    $self->{bus_id}   = delete $args{'bus_id'};
    $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
    $self->{error_cb} = delete $args{'on_error'};

    bless $self, $class;
    return $self;
}

sub bus_id   { $_[0]->{bus_id}   }
sub bus_role { $_[0]->{bus_role} }

sub _fatal {
    my ($self, $errstr) = @_;
    die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
    $self->{error_cb}->($errstr);
}

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );
        $self->{hosts} = [ shuffle @hosts ];
    }

    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet
            $self->{server_prop}->{host} = $host;
            $self->{server_prop}->{port} = $port;
            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },
            );
        },
        on_error => sub {
            my ($fh, $fatal, $errmsg) = @_;
            # Some error occurred, such as a read error
            $self->_reset_connection;
            $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");



( run in 1.684 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )