Beekeeper

 view release on metacpan or  search on metacpan

examples/chat/flood.pl  view on Meta::CPAN

    my $took = time() - $start_on;
    if ($took > 1) {
        my $ovl = int(abs(($took - 1) * 100)); 
        print "Cannot sustain $msg_sec msg/s ($ovl\% overload)\n";
        next;
    }

    my $wait = 1 - $took;
    $cv = AnyEvent->condvar;
    AnyEvent->now_update;
    my $tmr = AnyEvent->timer( after => $wait, cb => $cv);
    $cv->recv;
}

1;

examples/dashboard/js/dashboard.js  view on Meta::CPAN

}}

function realTimeLineChart(id, data, points) { return {

    id:     id,
    data:   data,
    points: points,
    margin: { top: 20, right: 20, bottom: 20, left: 50 },
    width:  null,
    height: null,
    timer:  null,

    draw: function() {

        let target_element = document.getElementById(this.id);
        let cs = getComputedStyle(target_element);

        if (!this.width)  this.width  = cs.width.replace(/px/,"")  || 800;  
        if (!this.height) this.height = cs.height.replace(/px/,"") || 150; 

        let margin = this.margin; 

examples/dashboard/js/dashboard.js  view on Meta::CPAN

            yScale.domain([yMin, yMax]);

            // Refresh axis
            main.select(".x").call(xAxis);
            main.select(".y").call(yAxis);

            // Refresh line
            path.attr("d", line(data) );
        };

        if (this.timer) clearInterval(this.timer);
        this.timer = setInterval( this.refresh, 1000 );
    },

    clear: function() {
        clearInterval(this.timer);
    },

    set_data: function(data) {
        this.data.length = 0;
        this.data.push(data);
        this.refresh();
    },

    add_data: function(data) {
        this.data.push(data);

examples/dashboard/lib/Beekeeper/Service/Dashboard/Worker.pm  view on Meta::CPAN

    $self->{services_15m} ||= [];
    $self->{services_1h}  ||= [];

    my $now  = Time::HiRes::time;
    my $msec = $now - int($now);

    my $offs_1s  = $msec;
    my $offs_5s  = $now % 5  + $msec;
    my $offs_1m  = $now % 60 + $msec;

    $self->{collect_tmr} = AnyEvent->timer( 
        after    => 1 - $offs_1s,
        interval => 1,
        cb => sub {
            Beekeeper::Service::Supervisor->get_services_status_async(
                on_success => sub {
                    my ($resp) = @_;
                    $self->_collect_stats( $resp->result );
                },
                on_error => sub {
                    my ($error) = @_;
                    log_warn $error->message;
                },
            );
        },
    );

    $self->{consolidate_5s_tmr} = AnyEvent->timer( 
        after    => 5 - $offs_5s,
        interval => 5,
        cb => sub {

            # 1 hour in 5 sec resolution
            $self->_consolidate(
                from   => $self->{services_1s},
                into   => $self->{services_5s},
                period => 5,
                keep   => 60 * 60/5, # 720
            );
        },
    );

    $self->{consolidate_1m_tmr} = AnyEvent->timer( 
        after    => 60 - $offs_1m,
        interval => 60,
        cb => sub {

            # 1 day in 2 min resolution
            $self->_consolidate(
                from   => $self->{services_5s},
                into   => $self->{services_2m},
                period => 2 * 60,
                keep   => 24 * 60/2, # 720

examples/flood/flood.pl  view on Meta::CPAN


        if ($rate) {

            printf( "in %6s sec  %6s /sec %6s ms each", $clk, $cps, $avg );

            if ($took <= 1) {

                my $sleep = 1 - $took;
                my $cv = AnyEvent->condvar;
                AnyEvent->now_update;
                my $tmr = AnyEvent->timer( after => $sleep, cb => $cv);
                $cv->recv;
            }
            else {
                my $ovl = int(abs(($took - 1) * 100)); 
                print "   $ovl\% overload";
            }

            print "\n";

            last if $count && $total >= $count;

examples/flood/lib/MyApp/Service/Flood/Worker.pm  view on Meta::CPAN


sub message {
    my ($self, $params) = @_;
}

sub delayed_echo {
    my ($self, $params, $request) = @_;

    $request->async_response;

    my $timer_id = ++($self->{timer_seq});

    $self->{$timer_id} = AnyEvent->timer(
        after => 1,
        cb => sub {
            delete $self->{$timer_id};
            $request->send_response( $params );
        },
    );
}

1;

lib/Beekeeper/Client.pm  view on Meta::CPAN


        $req->{_waiting_response} = $client->{async_cv};
        $req->{_waiting_response}->begin;
    }

    $client->{in_progress}->{$req_id} = $req;

    # Ensure that timeout is set properly when the event loop was blocked
    if ($__now != CORE::time) { $__now = CORE::time; AnyEvent->now_update }

    # Request timeout timer
    my $timeout = $args{'timeout'} || REQ_TIMEOUT;
    $req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
        my $req = delete $client->{in_progress}->{$req_id};
        $req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
        $req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
        $req->{_waiting_response}->end;
    });

    bless $req, 'Beekeeper::JSONRPC::Request';
    return $req;
}

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

    my ($class, %args) = @_;

    my $self = {
        bus_id          => undef,
        bus_role        => undef,
        handle          => undef,    # the socket
        hosts           => undef,    # list of all hosts in cluster
        is_connected    => undef,    # true once connected
        try_hosts       => undef,    # list of hosts to try to connect
        connect_err     => undef,    # count of connection errors
        timeout_tmr     => undef,    # timer used for connection timeout
        reconnect_tmr   => undef,    # timer used for connection retry
        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

    my $config = $self->{config};

    my $timeout = $config->{'timeout'};
    $timeout = 30 unless defined $timeout;

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # Connection timeout handler
    if ($timeout && !$self->{timeout_tmr}) {
        $self->{timeout_tmr} = AnyEvent->timer( after => $timeout, cb => sub { 
            $self->_reset_connection;
            $self->{connect_cv}->send;
            $self->_fatal("Could not connect to MQTT broker after $timeout seconds");
        });
    }

    unless ($self->{hosts}) {
        # Initialize the list of cluster hosts
        my $hosts = $config->{'host'} || 'localhost';
        my @hosts = (ref $hosts eq 'ARRAY') ? @$hosts : ( $hosts );

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

            $self->_send_connect;
        },
        on_connect_error => sub {
            my ($fh, $errmsg) = @_;
            # Some error occurred while connection, such as an unresolved hostname
            # or connection refused. Try next host of cluster immediately, or retry
            # in few seconds if all hosts of the cluster are unresponsive
            $self->{connect_err}++;
            warn "Could not connect to MQTT broker at $host:$port: $errmsg\n" if ($self->{connect_err} <= @{$self->{hosts}});
            my $delay = @{$self->{try_hosts}} ? 0 : $self->{connect_err} / @{$self->{hosts}};
            $self->{reconnect_tmr} = AnyEvent->timer(
                after => ($delay < 10 ? $delay : 10),
                cb    => sub { $self->_connect },
            );
        },
        on_error => sub {
            my ($fh, $fatal, $errmsg) = @_;
            # Some error occurred, such as a read error
            $self->_reset_connection;
            $self->_fatal("Error on connection to MQTT broker at $host:$port: $errmsg");
        },

lib/Beekeeper/MQTT.pm  view on Meta::CPAN


        if ($code == 0) {
            # Success
            my $subs = $self->{subscriptions};
            my $subscr_id = delete $subs->{$topic};
            if ($subscr_id) {
                # Free on_publish callback if not used by another subscription
                my @still_used = grep { $subs->{$_} == $subscr_id } keys %$subs;
                unless (@still_used) {
                    # But not right now, as broker may send some messages *after* unsubscription
                    $self->{_timers}->{"unsub-$subscr_id"} = AnyEvent->timer( after => 60, cb => sub {
                        delete $self->{_timers}->{"unsub-$subscr_id"};
                        delete $self->{subscr_cb}->{$subscr_id};
                    });
                }
            }
        }
        else {
            # Failure
            $success = 0;
            unless ($unsuback_cb) {
                $self->_fatal("Unsubscription to topic '$topic' failed: $reason");

lib/Beekeeper/Service/LogTail/Worker.pm  view on Meta::CPAN

    $self->{max_entries} = $self->{config}->{buffer_entries} || 20000;
    $self->{log_level}   = $self->{config}->{log_level}      || LOG_DEBUG;

    $self->_connect_to_all_brokers;

    $self->accept_remote_calls(
        '_bkpr.logtail.tail' => 'tail',
    );

    # Ping backend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $self->_ping_backend_brokers },
    );

    log_info "Ready";
}

sub _connect_to_all_brokers {
    my $self = shift;

lib/Beekeeper/Service/LogTail/Worker.pm  view on Meta::CPAN


        my $bus; $bus = Beekeeper::MQTT->new( 
            %$config,
            bus_id   => $bus_id,
            timeout  => 300,
            on_error => sub {
                # Reconnect
                my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
                log_error "Connection to $bus_id failed: $errmsg";
                my $delay = $self->{connect_err}->{$bus_id}++;
                $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                    after => ($delay < 10 ? $delay * 3 : 30),
                    cb => sub {
                        $bus->connect(
                            on_connack => sub {
                                # Setup subscriptions
                                log_warn "Reconnected to $bus_id";
                                $self->_collect_log($bus);
                            }
                        );
                    },

lib/Beekeeper/Service/Router/Worker.pm  view on Meta::CPAN


    $self->{wait_frontends_up} = AnyEvent->condvar;

    # Create a connection to every frontend
    foreach my $config (@$frontends_config) {

        $self->init_frontend_connection( $config );
    }

    # Ping frontend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $self->ping_frontend_brokers },
    );
}

sub init_frontend_connection {
    my ($self, $config) = @_;
    weaken $self;

lib/Beekeeper/Service/Router/Worker.pm  view on Meta::CPAN

        %$config,
        bus_id   => $bus_id,
        timeout  => 60,
        on_error => sub {
            # Reconnect
            my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
            log_alert "Connection to $bus_id failed: $errmsg";
            delete $self->{FRONTEND}->{$bus_id};
            $self->{wait_frontends_up}->end;
            my $delay = $self->{connect_err}->{$bus_id}++;
            $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                after => ($delay < 10 ? $delay * 3 : 30),
                cb => sub {
                    $bus->connect(
                        on_connack => sub {
                            # Setup routing
                            log_warn "Rerouting: $back_id <--> $bus_id";
                            $self->{FRONTEND}->{$bus_id} = $bus;
                            $self->pull_frontend_requests( frontend => $bus );
                        }
                    );

lib/Beekeeper/Service/Router/Worker.pm  view on Meta::CPAN

            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;
                log_error "Could not unsubscribe from $topic" unless $success;
                $cv->end;
            }
        );
    }

    # 3. Wait for unsubacks, assuring that no more requests or messages are buffered 
    my $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
    $cv->recv;

    # 4. Just in case of pool full stop, wait for workers to finish their current tasks
    my $wait = AnyEvent->condvar;
    $tmr = AnyEvent->timer( after => $self->{shutdown_wait}, cb => sub { $wait->send });
    $wait->recv;

    $cv = AnyEvent->condvar;

    # 5. Stop forwarding responses to frontend
    foreach my $frontend_bus (values %{$self->{FRONTEND}}) {

        my $frontend_id = $frontend_bus->bus_id;

        foreach my $lane (1..QUEUE_LANES) {

lib/Beekeeper/Service/Router/Worker.pm  view on Meta::CPAN

                on_unsuback => sub {
                    my ($success, $prop) = @_;
                    log_error "Could not unsubscribe from $topic" unless $success;
                    $cv->end;
                }
            );
        }
    }

    # 6. Wait for unsubacks, assuring that no more responses are buffered 
    $tmr = AnyEvent->timer( after => 30, cb => sub { $cv->send });
    $cv->recv;

    # Disconnect from all frontends
    my @frontends = values %{$self->{FRONTEND}};
    foreach my $frontend_bus (@frontends) {

        next unless ($frontend_bus->{is_connected});
        $frontend_bus->disconnect;
    }

lib/Beekeeper/Service/Supervisor/Worker.pm  view on Meta::CPAN

        '_bkpr.supervisor.restart_workers' => 'restart_workers',
    );

    $self->accept_remote_calls(
        '_bkpr.supervisor.worker_status'       => 'worker_status',
        '_bkpr.supervisor.worker_exit'         => 'worker_exit',
        '_bkpr.supervisor.get_workers_status'  => 'get_workers_status',
        '_bkpr.supervisor.get_services_status' => 'get_services_status',
    );

    $self->{check_status_tmr} = AnyEvent->timer(
        after    => rand($CHECK_PERIOD), 
        interval => $CHECK_PERIOD, 
        cb => sub {
            $self->check_workers;
            $self->check_queues;
        },
    );
}

sub on_shutdown {

lib/Beekeeper/Service/Supervisor/Worker.pm  view on Meta::CPAN

        foreach my $pid (@worker_pids) {
            kill( 'TERM', $pid );
        }
    }
    else {
        # Slowly restart all workers
        my $delay = $args->{delay};
        my $count = 0;

        foreach my $pid (@worker_pids) {
            $self->{restart_worker_tmr}->{$pid} = AnyEvent->timer(
                after => $delay * $count++, 
                cb => sub {
                    delete $self->{restart_worker_tmr}->{$pid};
                    kill( 'TERM', $pid );
                },
            );
        }
    }
}

lib/Beekeeper/Service/Supervisor/Worker.pm  view on Meta::CPAN

    my $wpool_pid = $self->{_WORKER}->{parent_pid};
    my $delay = $args->{delay};

    if (!$delay) {
        kill( 'HUP', $wpool_pid );
    }
    else {

        my $index = $self->_get_pool_index( $self->{host}, $self->{pool} );

        $self->{restart_pool_tmr} = AnyEvent->timer(
            after => $delay * $index, 
            cb => sub {
                delete $self->{restart_pool_tmr};
                kill( 'HUP', $wpool_pid );
            },
        );
    }
}

sub _get_pool_index {

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

sub on_shutdown {
    my $self = shift;

    log_info "Shutting down";

    # Wait for clients to gracefully disconnect
    for (1..60) {
        my $conn_count = scalar keys %{$self->{connections}};
        last if $conn_count <= 1; # our self connection
        my $wait = AnyEvent->condvar;
        my $tmr = AnyEvent->timer( after => 0.5, cb => $wait );
        $wait->recv;
    }

    # Get rid of our self connection
    $self->{_BUS}->disconnect;

    log_info "Stopped";
}

sub authorize_request {

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN

            on_error => sub {
                log_error "$_[2]\n";
                $self->remove_client($fh);
                delete $self->{connections}->{"$fh"};
            }
        );

        $self->{connections}->{"$fh"} = $fh;

        #TODO: Close connection on login timeout
        # my $login_tmr = AnyEvent->timer( after => 5, cb => sub {
        #     $self->_shutdown($fh) unless $self->get_client($fh);
        # });
    });
}

sub _receive_connect {
    my ($self, $fh, $packet) = @_;

    my %prop;
    my $offs = 0;

lib/Beekeeper/Worker.pm  view on Meta::CPAN


sub __init_worker {
    my $self = shift;

    $self->on_startup;

    $self->__report_status;

    AnyEvent->now_update;

    $self->{_WORKER}->{report_status_timer} = AnyEvent->timer(
        after    => rand( $REPORT_STATUS_PERIOD ), 
        interval => $REPORT_STATUS_PERIOD,
        cb       => sub { $self->__report_status },
    );
}


sub on_startup {
    # Placeholder, intended to be overrided
    my $class = ref $_[0];

lib/Beekeeper/Worker.pm  view on Meta::CPAN


        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new notifications could be in flight or be 
        # already queued. We must wait for unsubscription completion, and then until the 
        # notification queue is empty to ensure that all received ones were processed. And 
        # even then wait a bit more, as some brokers may send messages *after* unsubscription.
        my $postpone = sub {

           my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{"msg.$fq_meth"};
                    undef $unsub_tmr;
                }
            );
        };

        $self->{_BUS}->unsubscribe(
            topic       => $topic,

lib/Beekeeper/Worker.pm  view on Meta::CPAN

        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new requests could be in flight or be already 
        # queued. We must wait for unsubscription completion, and then until the task queue 
        # is empty to ensure that all received requests were processed. And even then wait a
        # bit more, as some brokers may send requests *after* unsubscription.
        my $postpone = sub {

            $worker->{stop_cv}->begin;

            my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{$_} foreach @cb_keys;
                    delete $worker->{subscriptions}->{$service};
                    undef $unsub_tmr;

                    return unless $worker->{shutting_down};

                    if ($worker->{in_progress} > 0) {

                        # The task queue is empty now, but an asynchronous method handler is
                        # still busy processing some requests received previously. Wait for
                        # these requests to be completed before telling _work_forever to stop

                        my $wait_time = 60;
                        $worker->{stop_cv}->begin;

                        my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
                            unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
                                undef $busy_tmr;
                                $worker->{stop_cv}->end;
                            }
                        });
                    }

                    # Tell _work_forever to stop
                    $worker->{stop_cv}->end;
                }

lib/Beekeeper/Worker.pm  view on Meta::CPAN


This handler process requests concurrently:

  sub increment {
      my ($self, $params, $req) = @_;
  
      my $number = $params->{number};
  
      $req->async_response;
  
      my $t; $t = AnyEvent->timer( after => 1, cb => sub {
          undef $t;
          $req->send_response( $number + 1 );
      });
  }

Note that callback closures will not be executed in Beekeeper scope but in the event loop
one, so uncatched exceptions in these closures will cause the worker to die and be respawn.

Asynchronous method handlers use system resources more efficiently, but are significantly 
harder to write and debug.

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN


    $self->_connect_to_all_brokers($worker);

    my $Self = $self;
    weaken $Self;

    AnyEvent->now_update;

    if ($self->{max_age}) {

        $self->{gc_timer} = AnyEvent->timer(
            after    => $self->{max_age} * rand() + 60,
            interval => $self->{max_age},
            cb       => sub { $Self->_gc },
        );
    }

    if ($self->{refresh}) {

        $self->{refresh_timer} = AnyEvent->timer(
            after    => $self->{refresh} * rand() + 60,
            interval => $self->{refresh},
            cb       => sub { $Self->_send_sync_request },
        );
    }

    # Ping backend brokers to avoid disconnections due to inactivity
    $self->{ping_timer} = AnyEvent->timer(
        after    => 60 * rand(),
        interval => 60,
        cb       => sub { $Self->_ping_backend_brokers },
    );

    return $self;
}

sub _connect_to_all_brokers {
    my ($self, $worker) = @_;

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN


        my $bus; $bus = Beekeeper::MQTT->new( 
            %$config,
            bus_id   => $bus_id,
            timeout  => 300,
            on_error => sub {
                # Reconnect
                my $errmsg = $_[0] || ""; $errmsg =~ s/\s+/ /sg;
                log_error "Connection to $bus_id failed: $errmsg";
                my $delay = $self->{connect_err}->{$bus_id}++;
                $self->{reconnect_tmr}->{$bus_id} = AnyEvent->timer(
                    after => ($delay < 10 ? $delay * 3 : 30),
                    cb => sub { 
                        $bus->connect(
                            on_connack => sub {
                                log_warn "Reconnected to $bus_id";
                                $self->_setup_sync_listeners($bus);
                                $self->_accept_sync_requests($bus) if $self->{synced};
                            }
                        );
                    },

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN


    $bus->publish(
        topic          => "req/$local_bus/_sync/$cache_id/dump",
        response_topic => "priv/$client_id/sync-$cache_id",
    );

    # Ensure that timeout is set properly when the event loop was blocked
    AnyEvent->now_update;

    # When a fresh pool is started there is no master to reply sync requests
    $self->{_sync_timeout} = AnyEvent->timer(
        after => SYNC_REQUEST_TIMEOUT,
        cb    => sub { $self->_sync_completed(0) },
    );
}

sub _sync_completed {
    my ($self, $success) = @_;

    delete $self->{_sync_timeout};

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN

        $bus->publish(
            topic    => "msg/$local_bus/_sync/$cache_id/set",
            payload  => \$json,
        );
    }

    unless (defined $value) {
        # Postpone delete because it is necessary to keep the versioning 
        # of this modification until it is propagated to all workers

        # Ensure that timer is set properly when the event loop was blocked
        if ($_now != time) { $_now = time; AnyEvent->now_update }

        $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
            delete $self->{_destroy}->{$key};
            delete $self->{data}->{$key};
            delete $self->{vers}->{$key};
            delete $self->{time}->{$key};
        });
    }

    return 1;
}

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN

        $self->{vers}->{$key} = $keep->{vers};
        $self->{time}->{$key} = $keep->{time};

        $self->{on_update}->($key, $keep->{data}, $old) if $self->{on_update};
    }

    unless (defined $self->{data}->{$key}) {
        # Postpone delete because it is necessary to keep the versioning 
        # of this modification until it is propagated to all workers

        # Ensure that timer is set properly when the event loop was blocked
        if ($_now != time) { $_now = time; AnyEvent->now_update }

        $self->{_destroy}->{$key} = AnyEvent->timer( after => 60, cb => sub {
            delete $self->{_destroy}->{$key};
            delete $self->{data}->{$key};
            delete $self->{vers}->{$key};
            delete $self->{time}->{$key};
        });
    }
}

sub dump {
    my $self = shift;

t/lib/Tests/Examples.pm  view on Meta::CPAN

    }

    $test_end->recv;

    return $output;
}

sub async_wait {
    my ($self, $time) = @_;
    my $cv = AnyEvent->condvar; 
    my $tmr = AnyEvent->timer( after => $time, cb => $cv ); 
    $cv->recv;
}


sub check_01_supported_os : Test(startup => 1) {
    my ($self) =  @_;

    unless ($^O eq 'linux') {
        $self->BAILOUT("OS unsupported");
    }

t/lib/Tests/Mqtt.pm  view on Meta::CPAN


    $bus_config = Beekeeper::Config->get_bus_config( bus_id => 'test' );

    ok( $bus_config->{host}, "Read bus config, connecting to " . $bus_config->{host});
}

sub async_wait {
    my ($self, $time) = @_;
    $time *= 10 if $self->automated_testing;
    my $cv = AnyEvent->condvar; 
    my $tmr = AnyEvent->timer( after => $time, cb => $cv ); 
    $cv->recv;
}

sub test_01_topic : Test(3) {
    my $self = shift;

    my $bus1 = Beekeeper::MQTT->new( %$bus_config );
    my $bus2 = Beekeeper::MQTT->new( %$bus_config );

    $bus1->connect( blocking => 1 );



( run in 0.746 second using v1.01-cache-2.11-cpan-49f99fa48dc )