Beekeeper

 view release on metacpan or  search on metacpan

bin/bkpr  view on Meta::CPAN

#!/usr/bin/perl -wT

use strict;
use warnings;

our $VERSION = '0.10';

BEGIN {

    # Untaint PATH
    ($ENV{'PATH'}) = ($ENV{'PATH'} =~ m/(.*)/);

    # PERL5LIB is ignored when taint mode is enabled
    unshift @INC, ($ENV{'PERL5LIB'} =~ m/([^:]+)/g);
}

use Beekeeper::WorkerPool;

Beekeeper::WorkerPool->new->run;


__END__

examples/dashboard/lib/Beekeeper/Service/Dashboard/Worker.pm  view on Meta::CPAN

    if (@$stats_dest > $keep) {
        # Discard old data
        shift @$stats_dest;
    }
}

sub _save_state {
    my ($self) = @_;

    my $pool_id = $self->{_WORKER}->{pool_id};
    ($pool_id) = ($pool_id =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-dashboard-$pool_id-stats.dump";

    # Avoid stampede when several workers are exiting simultaneously
    return if (-e $tmp_file && (stat($tmp_file))[9] == time());

    # Lock file because several workers may try to write simultaneously to it
    sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
    flock($fh, LOCK_EX | LOCK_NB) or return;
    truncate($fh, 0) or return;

examples/dashboard/lib/Beekeeper/Service/Dashboard/Worker.pm  view on Meta::CPAN

        $self->{services_1h},
    ]);

    close($fh);
}

sub _load_state {
    my ($self) = @_;

    my $pool_id = $self->{_WORKER}->{pool_id};
    ($pool_id) = ($pool_id =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-dashboard-$pool_id-stats.dump";

    return unless (-e $tmp_file);

    local($/);
    open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
    my $data = <$fh>;
    close($fh);

    local $@;

lib/Beekeeper/AnyEvent.pm  view on Meta::CPAN

    # and it does not ignore exceptions thrown from within callbacks

    $ENV{'PERL_ANYEVENT_MODEL'} ||= 'Perl' unless $AnyEvent::MODEL;
}

UNTAINT_IP_ADDR: {

    no strict 'refs';
    no warnings 'redefine';

    # Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure
    # dependency in connect" error when running with taint mode enabled.
    # These addresses can be blindly untainted before being passed to parse_ipv4
    # and parse_ipv6 because these functions validate addresses properly

    my $parse_ipv4 = \&{'AnyEvent::Socket::parse_ipv4'};
    *{'AnyEvent::Socket::parse_ipv4'} = sub ($) {
        ($_[0]) = $_[0] =~ m/(.*)/s; # untaint addr
        $parse_ipv4->(@_);
    };

    my $parse_ipv6 = \&{'AnyEvent::Socket::parse_ipv6'};
    *{'AnyEvent::Socket::parse_ipv6'} = sub ($) {
        ($_[0]) = $_[0] =~ m/(.*)/s; # untaint addr
        $parse_ipv6->(@_);
    };
}

1;

__END__

=pod

lib/Beekeeper/AnyEvent.pm  view on Meta::CPAN


=over

=item *

Prefer the pure perl backend over default EV, as it is fast enough and
it does not ignore exceptions thrown from within callbacks.

=item *

Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure
dependency in connect" error as Beekeeper runs with taint mode enabled.
This module untaints resolved addresses, which can be done safely because
AnyEvent validates these addresses properly before using them.

=back

=head1 AUTHOR

José Micó, C<jose.mico@gmail.com>

=head1 COPYRIGHT AND LICENSE

lib/Beekeeper/Logger.pm  view on Meta::CPAN

        $service =~ s/-worker$//;

        $self->{service} = $service;
    }

    unless ($self->{log_file}) {
        # Use a single log file per service
        my $dir  = '/var/log';
        my $user = getpwuid($>);
        my $file = $self->{service} . '.log';
        ($user) = ($user =~ m/(\w+)/); # untaint

        $self->{log_file} = (-d "$dir/$user") ? "$dir/$user/$file" : "$dir/$file";
    }

    unless ($self->{foreground}) {

        my $log_file = $self->{log_file};

        # If running as root temporarily restore uid and gid to allow opening
        local $> = $< if ($< == 0);

lib/Beekeeper/MQTT.pm  view on Meta::CPAN


    # Determine next host of cluster to connect to
    my $try_hosts = $self->{try_hosts} ||= [];
    @$try_hosts = @{$self->{hosts}} unless @$try_hosts;

    # TCP connection args
    my $host = shift @$try_hosts;
    my $tls  = $config->{'tls'}  || 0;
    my $port = $config->{'port'} || ( $tls ? 8883 : 1883 );

    ($host) = ($host =~ m/^([a-zA-Z0-9\-\.]+)$/s); # untaint
    ($port) = ($port =~ m/^([0-9]+)$/s);

    $self->{handle} = AnyEvent::Handle->new(
        connect    => [ $host, $port ],
        tls        => $tls ? 'connect' : undef,
        keepalive  => 1,
        no_delay   => 1,
        on_connect => sub {
            my ($fh, $host, $port) = @_;
            # Send CONNECT packet

lib/Beekeeper/Service/Supervisor/Worker.pm  view on Meta::CPAN

    );

    log_info "Restarting workers" . ($args->{class} ? " $args->{class}..." : "...");

    my @worker_pids;

    foreach my $worker (@$workers) {
        # Do not restart supervisor
        next if ($worker->{class} eq 'Beekeeper::Service::Supervisor::Worker');

        my ($pid) = ($worker->{pid} =~ m/^(\d+)$/);  # untaint
        push @worker_pids, $pid if ($pid);
    }

    if (!$args->{delay}) {
        # Restart all workers at once
        foreach my $pid (@worker_pids) {
            kill( 'TERM', $pid );
        }
    }
    else {

lib/Beekeeper/Service/ToyBroker/Worker.pm  view on Meta::CPAN


sub start_listener {
    my ($self, $listener) = @_;
    weaken($self);

    my $max_packet_size = $listener->{'max_packet_size'};

    my $addr = $listener->{'listen_addr'} || '127.0.0.1';  # Must be an IPv4 or IPv6 address
    my $port = $listener->{'listen_port'} ||  1883;

    ($addr) = ($addr =~ m/^([\w\.:]+)$/);  # untaint
    ($port) = ($port =~ m/^(\d+)$/);

    log_info "Listening on $addr:$port";

    $self->{"listener-$addr-$port"} = tcp_server ($addr, $port, sub {
        my ($FH, $host, $port) = @_;

        my $packet_type;
        my $packet_flags;

lib/Beekeeper/Worker/Extension/SharedCache.pm  view on Meta::CPAN

        $self->delete( $key );
    }
}

sub _save_state {
    my $self = shift;

    return unless ($self->{synced});

    my $id = $self->{id};
    my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";

    # Avoid stampede when several workers are exiting simultaneously
    return if (-e $tmp_file && (stat($tmp_file))[9] == time());

    # Lock file because several workers may try to write simultaneously to it
    sysopen(my $fh, $tmp_file, O_RDWR|O_CREAT) or return;
    flock($fh, LOCK_EX | LOCK_NB) or return;
    truncate($fh, 0) or return;

    print $fh encode_json( $self->dump );

    close($fh);
}

sub _load_state {
    my $self = shift;

    my $id = $self->{id};
    my ($pool_id) = ($self->{pool_id} =~ m/^([\w-]+)$/); # untaint
    my $tmp_file = "/tmp/beekeeper-sharedcache-$pool_id-$id.dump";

    return unless (-e $tmp_file);

    # Do not load stale dumps
    return if ($self->{max_age} && (stat($tmp_file))[9] < time() - $self->{max_age});

    local($/);
    open(my $fh, '<', $tmp_file) or die "Couldn't read $tmp_file: $!";
    my $data = <$fh>;

lib/Beekeeper/WorkerPool.pm  view on Meta::CPAN

    my $self = $class->SUPER::new(
        daemon_name  => "beekeeper",
        description  => "worker pool",
        get_options  => [ "pool=s", "config-dir=s", "debug" ],
        %args,
    );

    $self->parse_options;

    my $pool_id = $self->{options}->{'pool'} || '';
    ($pool_id) = ($pool_id =~ m/^([\w-]+)$/); # untaint

    unless ($pool_id) {
        print "Mandatory parameter --pool was not specified.\n\n";
        #ENHACEMENT: list available pools
        $self->cmd_help;
        CORE::exit(1);
    }

    $self->{config}->{'pool_id'}   = $pool_id;
    $self->{config}->{daemon_name} = "beekeeper-$pool_id";
    $self->{config}->{description} = "worker pool $pool_id";

    # Pool cannot be started without a proper config file
    $self->load_config || CORE::exit(1);

    unless ($self->{config}->{log_file}) {
        my $file = "$pool_id-pool.log";
        my $dir  = '/var/log';
        my $user = $self->{options}->{'user'} || getpwuid($>);
        ($user) = ($user =~ m/^(\w+)$/); # untaint
        $self->{config}->{log_file} = (-d "$dir/$user") ? "$dir/$user/$file" : "$dir/$file";
    }

    return $self;
}

sub cmd_help {
    my $self = shift;

    my $progname = $0;

lib/Beekeeper/WorkerPool/Daemon.pm  view on Meta::CPAN

      }
  }

Then, the daemon can be executed with a script like this:

  #!/usr/bin/perl -wT
  use strict;
  use warnings;
  use MyDaemon;
  
  $ENV{PATH} = '/bin'; # untaint
  
  my $daemon = MyDaemon->new->run;

=head1 DESCRIPTION

This is a base module for creating daemons. It takes care of daemonization tasks
commonly found in init.d scripts: forking, redirecting output, writing pid files, 
start/stop/restart control commands, etc.

It is used by the command line tool C<bkpr> to daemonize itself.

t/lib/Tests/Service/Worker.pm  view on Meta::CPAN

}

sub catchall {
    my ($self, $params) = @_;
    $self->signal($params);
}

sub signal {
    my ($self, $params) = @_;

    my ($signal) = $params->{signal} =~ m/(\w+)/;  # untaint
    my ($pid)    = $params->{pid}    =~ m/(\d+)/;

    my $sleep = exists $params->{after} ? $params->{after} : rand() * 2;

    sleep $sleep;

    kill( $signal, $pid );
}

sub fail {



( run in 0.353 second using v1.01-cache-2.11-cpan-d6f9594c0a5 )