App-Chart
view release on metacpan or search on metacpan
lib/App/Chart/Glib/Ex/DirBroadcast.pm view on Meta::CPAN
use File::Spec;
use Class::Singleton 1.03; # 1.03 for _new_instance()
use base 'Class::Singleton';
*_new_instance = \&new;
use constant MAXLEN => 16384;
sub new {
my ($class, $directory) = @_;
return bless { directory => $directory }, $class;
}
sub DESTROY {
my ($self) = @_;
delete $self->{'listen_source_ids'};
# close socket before removing file
delete $self->{'listen_sock'};
if (my $filename = delete $self->{'listen_filename'}) {
### DirBroadcast remove: $filename
unlink ($filename);
}
}
sub directory {
my ($self, $newval) = @_;
ref($self) or $self = $self->instance;
if (@_ < 2) { return $self->{'directory'}; }
if ($self->{'listen_source_ids'}) {
croak 'DirBroadcast: cannot set directory after listen';
}
$self->{'directory'} = $newval;
}
# connections is a hashref of key to arrayref of subrs, ie.
#
# $self->{'connections'} = { 'foo' => [ \&handler1, \&handler2 ],
# 'bar' => [ \&handler3, \&handler4 ] };
#
sub connect {
my ($self, $key, $subr) = @_;
ref($self) or $self = $self->instance;
my $aref = ($self->{'connections'}->{$key} ||= []);
push @$aref, $subr;
}
sub connect_first {
my ($self, $key, $subr) = @_;
ref($self) or $self = $self->instance;
my $aref = ($self->{'connections'}->{$key} ||= []);
unshift @$aref, $subr;
}
sub connect_for_object {
my ($self, $key, $subr, $obj) = @_;
ref($self) or $self = $self->instance;
require Scalar::Util;
Scalar::Util::weaken ($obj);
my $csubr;
$csubr = sub {
if ($obj) {
$subr->($obj, @_);
} else {
_disconnect ($self, $key, $csubr);
}
};
$self->connect ($key, $csubr);
}
sub _disconnect {
my ($self, $key, $subr) = @_;
if (my $aref = $self->{'connections'}->{$key}) {
@$aref = grep {$_ != $subr} @$aref;
}
}
sub send_locally {
my ($self, $key, @data) = @_;
ref($self) or $self = $self->instance;
if ($self->{'hold'}) {
push @{$self->{'hold_list'}}, sub { send_locally ($self, $key, @data); };
} else {
if (my $aref = $self->{'connections'}->{$key}) {
foreach my $subr (@$aref) {
$subr->(@data);
}
}
}
}
sub listen {
my ($self) = @_;
ref($self) or $self = $self->instance;
if ($self->{'listen_source_ids'}) { return; } # already done
my $directory = $self->{'directory'};
if (! defined $directory) {
croak 'DirBroadcast cannot listen until broadcast directory is set';
}
require File::Path;
File::Path::mkpath ($directory);
require Sys::Hostname;
my $hostname = Sys::Hostname::hostname();
my $listen_filename = $self->{'listen_filename'}
= File::Spec->catfile ($directory, "$hostname.$$");
unlink ($listen_filename); # possible previous leftover
# as usual socket() and friends get FD_CLOEXEC set automatically, no need
# to do anything special to avoid propagating $listen_sock fd down to
# subprocess jobs
require Socket;
require IO::Socket;
my $listen_sock = $self->{'listen_sock'}
= do { local $^F = 0; # ensure close-on-exec for the socket
lib/App/Chart/Glib/Ex/DirBroadcast.pm view on Meta::CPAN
});
my $frozen;
foreach my $filename (@filenames) {
if ($filename !~ $pattern) { next; }
$filename = File::Spec->catfile ($directory, $filename);
if ($filename eq ($self->{'listen_filename'}||'')) {
next; # ourselves
}
### DirBroadcast to: $filename
# send_sock created on first of any DirBroadcast instance and then kept
# open
$send_sock ||= do {
require IO::Socket;
require Socket;
my $sock = do {
local $^F = 0; # ensure close-on-exec for the socket
IO::Socket->new (Domain => Socket::AF_UNIX(),
Type => Socket::SOCK_DGRAM());
};
$sock->blocking(0);
binmode ($sock, ':raw') or die;
$sock
};
# put off freezing until we find someone to send to
if (! defined $frozen) {
require Storable;
$frozen = Storable::freeze ([$key, @data]);
if (length ($frozen) > MAXLEN) {
croak 'DirBroadcast: message too long: ',length($frozen);
}
}
my $sun = Socket::sockaddr_un ($filename);
my $sent = $send_sock->send ($frozen, 0, $sun);
if (! defined $sent || $sent != length($frozen)) {
### send: (! defined $sent && "removing, error $!") || "removing, short send $sent bytes"
unlink ($filename);
}
}
}
sub hold {
my ($self) = @_;
ref($self) or $self = $self->instance;
return App::Chart::Glib::Ex::DirBroadcast::Hold->new ($self);
}
package App::Chart::Glib::Ex::DirBroadcast::Hold;
use strict;
use warnings;
sub new {
my ($class, $dirb) = @_;
my $self = bless { }, $class;
$self->{'target'} = $dirb;
require Scalar::Util;
Scalar::Util::weaken ($self->{'target'});
$dirb->{'hold'} ++;
return $self;
}
sub DESTROY {
my ($self) = @_;
my $dirb = delete $self->{'target'} || return;
if (-- $dirb->{'hold'}) { return; }
my $hold_list = $dirb->{'hold_list'};
### DirBroadcast::Hold now run: $hold_list
while (my $subr = shift @$hold_list) {
$subr->();
}
}
1;
__END__
=head1 NAME
App::Chart::Glib::Ex::DirBroadcast -- broadcast messages through a directory of named pipes
=head1 SYNOPSIS
use App::Chart::Glib::Ex::DirBroadcast;
App::Chart::Glib::Ex::DirBroadcast->directory ('/my/directory');
App::Chart::Glib::Ex::DirBroadcast->listen;
App::Chart::Glib::Ex::DirBroadcast->connect ('my-key', sub { print @_; });
App::Chart::Glib::Ex::DirBroadcast->send ('my-key', "hello\n");
=head1 DESCRIPTION
DirBroadcast is a message broadcasting system based on named pipes in a
given directory, with a Glib main loop IO watch listening and calling
connected handlers. It's intended for use between multiple running copies
of a single application so they can notify each other of changes to files
etc.
Messages have a string "key" which is a name or type decided by the
application, and then any parameters which Storable can handle
(L<Storable>). You can have either a single broadcast directory used for
all purposes, or create multiple DirBroadcast objects. The method functions
described below take either the class name C<App::Chart::Glib::Ex::DirBroadcast> for the
single global, or a DirBroadcast object.
=head1 FUNCTIONS
=over 4
=item C<< App::Chart::Glib::Ex::DirBroadcast->new ($directory) >>
Create and return a new DirBroadcast object communicating through the given
C<$directory>. C<$directory> is created if it doesn't already exist (with a
C<croak> if that fails).
my $dirb = App::Chart::Glib::Ex::DirBroadcast->new ('/var/run/myapp')
( run in 0.880 second using v1.01-cache-2.11-cpan-39bf76dae61 )