Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

    utf8::decode($str);

    return $str;
}

sub _decode_binary_data {
    my ($packet, $offs) = @_;

    my $data = unpack("n/a", substr($$packet, $$offs));
    $$offs += 2 + length($data);

    return $data;
}

sub _decode_var_int {
    my ($packet, $offs) = @_;

    my $int = 0;
    my $mult = 1;
    my $byte;

    do {
        $byte = unpack("C", substr($$packet, $$offs, 1));
        $int += ($byte & 0x7F) * $mult;
        $mult *= 128;
        $$offs++;
    } while ($byte & 0x80);

    return $int;
}

sub _encode_var_int {
    return pack("C", $_[0]) if ($_[0] < 128);
    my @a = unpack("C*", pack("w", $_[0]));
    $a[0]  &= 0x7F;
    $a[-1] |= 0x80;
    return pack("C*", reverse @a);
}


sub new {
    my ($class, %args) = @_;

    my $self = {
        bus_id          => undef,
        bus_role        => undef,
        handle          => undef,    # the socket
        hosts           => undef,    # list of all hosts in cluster
        is_connected    => undef,    # true once connected
        try_hosts       => undef,    # list of hosts to try to connect
        connect_err     => undef,    # count of connection errors
        timeout_tmr     => undef,    # timer used for connection timeout
        reconnect_tmr   => undef,    # timer used for connection retry
        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers
        packet_seq      => 1,        # sequence used for packet ids
        subscr_seq      => 1,        # sequence used for subscription ids
        alias_seq       => 1,        # sequence used for topic alias ids
        use_alias       => 0,        # topic alias enabled
        config          => \%args,
    };

    $self->{bus_id}   = delete $args{'bus_id'};
    $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
    $self->{error_cb} = delete $args{'on_error'};

    bless $self, $class;
    return $self;
}

sub bus_id   { $_[0]->{bus_id}   }
sub bus_role { $_[0]->{bus_role} }

sub _fatal {
    my ($self, $errstr) = @_;
    die "(" . __PACKAGE__ . ") $errstr\n" unless $self->{error_cb};
    $self->{error_cb}->($errstr);
}

our $BUSY_SINCE = undef;
our $BUSY_TIME  = 0;

sub connect {
    my ($self, %args) = @_;

    $self->{connack_cb} = $args{'on_connack'};
    $self->{connect_cv} = AnyEvent->condvar;

    $self->_connect;

    $self->{connect_cv}->recv if $args{'blocking'};
    $self->{connect_cv} = undef;

    return $args{'blocking'} ? $self->{is_connected} : 1;
}

sub _connect {
    my ($self) = @_;
    weaken($self);

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

            $prop{'message_expiry_interval'} = unpack("N", substr($$packet, $offs, 4));
            $offs += 4;
        }
        elsif ($prop_id == MQTT_TOPIC_ALIAS) {
            # 3.3.2.3.4  Topic Alias  (short int)
            my $alias = unpack("n", substr($$packet, $offs, 2));
            $offs += 2;
            if (length $topic) {
                $self->{server_alias}->{$alias} = $topic;
            }
            else {
                $prop{'topic'} = $self->{server_alias}->{$alias};
            }
        }
        elsif ($prop_id == MQTT_RESPONSE_TOPIC) {
            # 3.3.2.3.5  Response Topic  (utf8 string)
            my $resp_topic = unpack("n/a", substr($$packet, $offs));
            $offs += 2 + length $resp_topic;
            utf8::decode( $resp_topic );
            $prop{'response_topic'} = $resp_topic;
        }
        elsif ($prop_id == MQTT_CORRELATION_DATA) {
            # 3.3.2.3.6  Correlation Data  (binary data)
            $prop{'correlation_data'} = unpack("n/a", substr($$packet, $offs));
            $offs += 2 + length $prop{'correlation_data'};
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.3.2.3.7  User Property  (utf8 string pair)
            my ($key, $val) = unpack("n/a n/a", substr($$packet, $offs));
            $offs += 4 + length($key) + length($val);
            utf8::decode( $key );
            utf8::decode( $val );
            $prop{$key} = $val;
        }
        elsif ($prop_id == MQTT_SUBSCRIPTION_IDENTIFIER) {
            # 3.3.2.3.8  Subscription Identifier  (variable int)
            push @subscr_ids, _decode_var_int($packet, \$offs);
        }
        elsif ($prop_id == MQTT_CONTENT_TYPE) {
            # 3.3.2.3.9  Content Type  (utf8 string)
            my $content_type = unpack("n/a", substr($$packet, $offs));
            $offs += 2 + length $content_type;
            utf8::decode( $content_type );
            $prop{'content_type'} = $content_type;
        }
        else {
            # Protocol error
            $self->_fatal("Received PUBLISH with unknown property $prop_id");
        }
    }

    # Trim variable header from packet, the remaining is the payload
    substr($$packet, 0, $prop_end, '');

    if ($prop{'payload_format'}) {
        # Payload is UTF-8 Encoded Character Data
        utf8::decode( $$packet );
    }

    foreach (@subscr_ids) {
        # Execute subscriptions callbacks

        $self->{subscr_cb}->{$_}->($packet, \%prop);
    }
}


sub puback {
    my ($self, %args) = @_;

    croak "Missing packet_id" unless $args{'packet_id'};

    my $raw_mqtt = pack( 
        "C C n C", 
        MQTT_PUBACK << 4,           # 3.4.1    Packet type 
        3,                          # 3.4.1    Remaining length
        $args{'packet_id'},         # 3.4.2    Packet identifier
        $args{'reason_code'} || 0,  # 3.4.2.1  Reason code
    );

    if ($args{'buffer_id'}) {
        # Do not send right now, wait until flush_buffer
        $self->{buffers}->{$args{'buffer_id'}}->{raw_mqtt} .= $raw_mqtt;
        return 1;
    }

    $self->{handle}->push_write( $raw_mqtt );

    1;
}

sub _receive_puback {
    my ($self, $packet) = @_;

    my ($packet_id, $reason_code) = unpack("n C", $$packet);
    $reason_code = 0 unless defined $reason_code;

    #TODO: 3.5.2.2  Properties

    my $puback_cb = delete $self->{packet_cb}->{$packet_id};
    return unless defined $puback_cb;

    $puback_cb->($reason_code);
}

sub pubrec {
    my ($self, %args) = @_;

    croak "Missing packet_id" unless $args{'packet_id'};

    my $raw_mqtt = pack( 
        "C C n C", 
        MQTT_PUBREC << 4,           # 3.5.1    Packet type 
        3,                          # 3.5.1    Remaining length
        $args{'packet_id'},         # 3.5.2    Packet identifier
        $args{'reason_code'} || 0,  # 3.5.2.1  Reason code
    );

    #TODO: set PUBREL callback

    $self->{handle}->push_write( $raw_mqtt );

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

            $prop{'authentication_data'} = _decode_binary_data($packet, \$offs);
        }
        elsif ($prop_id == MQTT_REASON_STRING) {
            # 3.15.2.2.4  Reason String  (utf8 string)
            $prop{'reason_string'} = _decode_utf8_str($packet, \$offs);
        }
        elsif ($prop_id == MQTT_USER_PROPERTY) {
            # 3.15.2.2.5  User Property  (utf8 string pair)
            my $key = _decode_utf8_str($packet, \$offs);
            my $val = _decode_utf8_str($packet, \$offs);
            $prop{$key} = $val;
        }
        else {
            # Protocol error
            $self->_fatal("Received AUTH with unexpected property $prop_id");
        }
    }

    my $auth_cb = delete $self->{packet_cb}->{'auth'};

    $auth_cb->(\%prop) if $auth_cb;
}


sub flush_buffer {
    my ($self, %args) = @_;

    my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};

    # Nothing to do if nothing was buffered
    return unless $buffer;

    $self->{handle}->push_write( $buffer->{raw_mqtt} );

    if (defined $self->{handle}->{wbuf} && length $self->{handle}->{wbuf} > 0) {

        # Kernel write buffer is full, see publish() above

        # Make AnyEvent allow one level of recursive condvar blocking
        $AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
        local $AE_WAITING = 1;
        local $AnyEvent::CondVar::Base::WAITING = 0;

        my $flushed = AnyEvent->condvar;
        $self->{handle}->on_drain( $flushed );
        $flushed->recv;
        $self->{handle}->on_drain(); # clear
    }

    1;
}

sub discard_buffer {
    my ($self, %args) = @_;

    my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};

    # Nothing to do if nothing was buffered
    return unless $buffer;

    # Remove all pending puback callbacks, as those will never be executed
    foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
        delete $self->{packet_cb}->{$packet_id};
    }

    1;
}


sub DESTROY {
    my $self = shift;
    # Disconnect gracefully from server if already connected
    return unless defined $self->{handle};
    $self->disconnect;
}

1;

__END__

=pod

=encoding utf8

=head1 NAME
 
Beekeeper::MQTT - Asynchronous MQTT 5.0 client
 
=head1 VERSION
 
Version 0.09

=head1 SYNOPSIS

  my $mqtt = Beekeeper::MQTT->new(
      host     => 'localhost',
      username => 'guest',
      password => 'guest',
  );
  
  $mqtt->connect( 
      blocking => 1,
      on_connack => sub {
          my ($success, $properties) = @_;
          die $properties->{reason_string} unless $success;
      },
  );
  
  $mqtt->subscribe(
      topic => 'foo/bar',
      on_publish => sub {
          my ($payload, $properties) = @_;
          print "Got a message: $$payload";
      },
  );
  
  $mqtt->publish(
      topic   => 'foo/bar',
      payload => 'Hello',
  );
  



( run in 1.665 second using v1.01-cache-2.11-cpan-5837b0d9d2c )