view release on metacpan or search on metacpan
lib/IO/Lambda.pm view on Meta::CPAN
do {} while yield;
set_frame(@frame);
}
#
# Part II - Procedural interface to the lambda-style programming
#
#################################################################
sub _lambda_restart { die "lambda() is not restartable" }
sub lambda(&)
{
my $cb = _subname(lambda => $_[0]);
my $l = __PACKAGE__-> new( sub {
# initial lambda code is usually executed by tail/tails inside another lambda,
# so protect the upper-level context
local *__ANON__ = "IO::Lambda::lambda::callback";
local $THIS = shift;
local @CONTEXT = ();
local $CALLBACK = $cb;
local $METHOD = \&_lambda_restart;
lib/IO/Lambda.pm view on Meta::CPAN
}
sub restartable
{
my $name = @_ ? $_[0] : join(':', caller);
$THIS->{frames}->{$name} = [ $METHOD, $CALLBACK, @CONTEXT ];
return $name;
}
# exceptions and backtracing
sub catch(&$)
{
my ( $cb, $event) = @_;
my $who = (caller(1))[3];
my @ctx = @CONTEXT;
confess "catch callback already defined" if $event-> [WATCH_CANCEL];
$event->[WATCH_CANCEL] = $cb ? sub {
local *__ANON__ = "$who\:\:catch" if $DEBUG_CALLER;
$THIS = shift;
local $THIS-> {cancelled_event} = shift;
local $THIS-> {cancelling} = 1;
lib/IO/Lambda.pm view on Meta::CPAN
@CONTEXT = @ctx;
$METHOD = $method;
$CALLBACK = $cb;
$cb ? $cb-> (@_) : @_;
},
($AGAIN ? delete($self-> {cancel}) : undef),
)
}
# rwx($flags,$handle,$deadline)
sub rwx(&)
{
return $THIS-> override_handler('rwx', \&rwx, shift)
if $THIS-> {override}->{rwx};
$THIS-> add_watch(
_subname(rwx => shift), \&rwx,
@CONTEXT[0,1,2,0,1,2]
)
}
# readable($handle,$deadline)
sub readable(&)
{
return $THIS-> override_handler('readable', \&readable, shift)
if $THIS-> {override}->{readable};
$THIS-> add_watch(
_subname(readable => shift), \&readable, IO_READ,
@CONTEXT[0,1,0,1]
)
}
# writable($handle,$deadline)
sub writable(&)
{
return $THIS-> override_handler('writable', \&writable, shift)
if $THIS-> {override}->{writable};
$THIS-> add_watch(
_subname(writable => shift), \&writable, IO_WRITE,
@CONTEXT[0,1,0,1]
)
}
lib/IO/Lambda.pm view on Meta::CPAN
@CONTEXT = @ctx;
$METHOD = $method;
$CALLBACK = $cb;
$cb ? $cb-> (@_) : @_;
},
($AGAIN ? delete($self-> {cancel}) : undef),
)
}
# timeout($deadline)
sub timeout(&)
{
return $THIS-> override_handler('timeout', \&timeout, shift)
if $THIS-> {override}->{timeout};
$THIS-> add_timer( _subname(timeout => shift), \&timeout, @CONTEXT[0,0])
}
# common wrapper for declaration of single lambda-watching user conditions
sub add_tail
{
my ($self, $cb, $method, $lambda, @ctx) = @_;
lib/IO/Lambda.pm view on Meta::CPAN
$THIS-> watch_lambda( IO::Lambda-> new, sub {
local *__ANON__ = "IO::Lambda::".$name."::callback";
@CONTEXT = @ctx;
$METHOD = $method;
$CALLBACK = $cb;
$cb-> ();
}) if $cb;
}
# tail( $lambda, @param) -- initialize $lambda with @param, and wait for it
sub tail(&)
{
return $THIS-> override_handler('tail', \&tail, shift)
if $THIS-> {override}->{tail};
my ( $lambda, @param) = context;
return _empty(tail => \&tail, shift) unless $lambda;
$lambda-> reset
if $lambda-> is_stopped and $lambda-> autorestart;
if ( @param) {
$lambda-> call( @param);
} else {
$lambda-> call unless $lambda-> is_active;
}
$THIS-> add_tail( _subname(tail => shift), \&tail, $lambda, $lambda, @param);
}
# tails(@lambdas) -- wait for all lambdas to finish
sub tails(&)
{
return $THIS-> override_handler('tails', \&tails, shift)
if $THIS-> {override}->{tails};
my $cb = _subname tails => $_[0];
my @lambdas = context;
my $n = $#lambdas;
return _empty(tails => \&tails, $cb) unless @lambdas;
my @ret;
lib/IO/Lambda.pm view on Meta::CPAN
@CONTEXT = @lambdas;
$METHOD = \&tails;
$CALLBACK = $cb;
$cb ? $cb-> (@ret) : @ret;
};
my $this = $THIS;
$this-> watch_lambda( $_, $watcher) for @lambdas;
}
# tailo(@lambdas) -- wait for all lambdas to finish, return ordered results
sub tailo(&)
{
return $THIS-> override_handler('tailo', \&tailo, shift)
if $THIS-> {override}->{tailo};
my $cb = _subname tailo => $_[0];
my @lambdas = context;
my $n = $#lambdas;
return _empty(tailo => \&tailo, $cb) unless @lambdas;
my @ret;
lib/IO/Lambda.pm view on Meta::CPAN
for ( my $i = 0; $i < @lambdas; $i++) {
my $d = $i;
$this-> watch_lambda(
$lambdas[$i],
sub { $watcher->($d, @_) }
);
};
}
# any_tail($deadline,@lambdas) -- wait for any lambda to finish within time
sub any_tail(&)
{
return $THIS-> override_handler('any_tail', \&any_tail, shift)
if $THIS-> {override}->{any_tail};
my $cb = _subname any_tail => $_[0];
my ( $deadline, @lambdas) = context;
my $n = $#lambdas;
return _empty(any_tail => \&any_tail, $cb) unless @lambdas;
my ( @ret, @watchers);
lib/IO/Lambda.pm view on Meta::CPAN
tail {
push @ret, $p if shift;
return @ret unless @p;
$p = shift @p;
context $lambda, $p;
again;
}}
}
# curry(@a -> $l) :: @a -> @b
sub curry(&)
{
my $cb = $_[0];
lambda {
context $cb->(@_);
&tail();
}
}
# seq() :: (@l) -> @m
sub seq { mapcar curry { shift } }
lib/IO/Lambda/DNS.pm view on Meta::CPAN
# behave like inet_aton, return single IP address
for ( $packet-> answer) {
return $_-> address if $_-> type eq 'A';
}
return 'response doesn\'t contain an IP address';
}
return $packet;
}}}
sub dns(&) { IO::Lambda::DNS-> new(context)-> condition(shift, \&dns, 'dns') }
1;
__DATA__
=pod
=head1 NAME
IO::Lambda::DNS - DNS queries lambda style
lib/IO/Lambda/Flock.pm view on Meta::CPAN
{
my ( $expired, $fh, $shared) = @_;
if ( CORE::flock( $fh, LOCK_NB | ($shared ? LOCK_SH : LOCK_EX) )) {
warn "flock $fh obtained\n" if $DEBUG;
return 1, 1;
}
return 1, 0 if $expired;
return 0;
}
sub flock(&)
{
return this-> override_handler('flock', \&flock, shift)
if this-> {override}->{flock};
my $cb = _subname flock => shift;
my ($fh, %opt) = context;
my $deadline = exists($opt{timeout}) ? $opt{timeout} : $opt{deadline};
poll_event(
$cb, \&flock, \&poll_flock,
lib/IO/Lambda/Fork.pm view on Meta::CPAN
use IO::Lambda qw(:all :dev);
use IO::Lambda::Signal qw(pid);
our @ISA = qw(Exporter);
our @EXPORT_OK = qw(new_process process new_forked forked);
our %EXPORT_TAGS = (all => \@EXPORT_OK);
sub _d { "forked(" . _o($_[0]) . ")" }
# return pid and socket
sub new_process(&)
{
my $cb = shift;
my $r = IO::Handle-> new;
my $w = IO::Handle-> new;
socketpair( $r, $w, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
$w-> blocking(0);
my $pid = fork;
unless ( defined $pid) {
lib/IO/Lambda/Fork.pm view on Meta::CPAN
}
warn "forked pid=$pid\n" if $DEBUG;
close($r);
return ($pid, $w);
}
# simple fork, return only $? and $!
sub process(&)
{
my $cb = shift;
lambda {
my $pid = fork;
return undef, $! unless defined $pid;
unless ( $pid) {
warn "process($$) started\n" if $DEBUG;
eval { $cb->(); };
warn "process($$) ended\n" if $DEBUG;
lib/IO/Lambda/Fork.pm view on Meta::CPAN
}
warn "forked pid=$pid\n" if $DEBUG;
context $pid;
&pid();
}
}
# return output from a subprocess
sub new_forked(&)
{
my $cb = shift;
my ( $pid, $r) = new_process {
my @ret;
my $socket = shift;
eval { @ret = $cb-> () if $cb };
my $msg = $@ ? [ 0, $@ ] : [ 1, @ret ];
warn "process($$) ended: [@$msg]\n" if $DEBUG > 1;
print $socket freeze($msg);
lib/IO/Lambda/Fork.pm view on Meta::CPAN
}
context $pid;
pid {
warn "pid($pid): exitcode=$?, [@ret]\n" if $DEBUG > 1;
return shift, @ret;
}}}
}
# simpler version of new_forked
sub forked(&)
{
my $cb = shift;
lambda {
context &new_forked($cb);
tail {
my ( $pid, $ok, @ret) = @_;
return @ret;
}}
}
lib/IO/Lambda/HTTP.pm view on Meta::CPAN
package IO::Lambda::HTTP;
use vars qw(@ISA @EXPORT_OK);
@ISA = qw(Exporter);
@EXPORT_OK = qw(http_request http_server);
use strict;
use warnings;
use IO::Lambda qw(:lambda);
use IO::Lambda::HTTP::Client;
sub http_request(&)
{
# Carp::carp "IO::Lambda::HTTP is deprecated, use IO::Lambda::HTTP::Client instead";
IO::Lambda::HTTP::Client-> new(context)->
condition(shift, \&http_request, 'http_request')
}
sub new {
shift;
# Carp::carp "IO::Lambda::HTTP is deprecated, use IO::Lambda::HTTP::Client instead";
IO::Lambda::HTTP::Client->new(@_);
lib/IO/Lambda/HTTP/Client.pm view on Meta::CPAN
use warnings;
use Socket;
use Errno;
use Exporter;
use IO::Socket;
use HTTP::Response;
use IO::Lambda qw(:lambda :stream);
use IO::Lambda::Socket qw(connect);
use Time::HiRes qw(time);
sub http_request(&)
{
__PACKAGE__-> new(context)->
condition(shift, \&http_request, 'http_request')
}
sub new
{
my ( $class, $req, %options) = @_;
my $self = bless {}, $class;
lib/IO/Lambda/HTTP/Server.pm view on Meta::CPAN
warn length($resp), " bytes written\n" if $DEBUG > 1;
return this->start if $keep_alive && !($self->{shutdown} && !length($buf));
warn "[$session_data->{remote}] disconnect\n" if $DEBUG;
if ( !close($conn)) {
warn "error during response:$!\n" if $DEBUG;
}
}}}}}}
}
sub http_server(&$;@)
{
my ( $cb, $listen, %opt) = @_;
my $port = 80;
unless ( ref $listen ) {
($listen, $port) = ($1, $2) if $listen =~ /^(.*)\:(\d+)$/;
$listen = IO::Socket::INET->new(
Listen => 5,
LocalAddr => $listen,
LocalPort => $port,
lib/IO/Lambda/Loop/POE.pm view on Meta::CPAN
$poe_kernel-> $meth( $f-> {handle}, 'io', $f);
warn "$meth charged for ", fileno($f-> {handle}), "\n" if $DEBUG;
} elsif ( $f-> {mask} & $_) {
$poe_kernel-> $meth( $f-> {handle} );
warn "$meth cleared for ", fileno($f-> {handle}), "\n" if $DEBUG;
}
}
$f-> {mask} = $mask;
}
sub purge_filenos(&)
{
my $sub = shift;
my @kill;
while ( my ( $fileno, $r) = each %filenos) {
my @xr = grep &$sub, @{$r->{rec}};
next if @xr == @{$r->{rec}};
$r-> {rec} = \@xr;
push @kill, $fileno unless @xr;
reset_mask( $r);
}
lib/IO/Lambda/Message.pm view on Meta::CPAN
# won't end until we call resolve
my $outer = IO::Lambda-> new;
my $bind = $outer-> bind;
CORE::push @{ $self-> {queue} }, [ $outer, $bind, $msg, $deadline ];
$self-> push if 1 == @{$self-> {queue}} and not $self-> is_listening;
return $outer;
}
sub message(&) { new_message(context)-> condition( shift, \&message, 'message') }
package IO::Lambda::Message::Simple;
my $debug = $IO::Lambda::DEBUG{message} || 0;
sub _d { "simple_msg($_[0])" }
sub new
{
my ( $class, $r, $w) = @_;
lib/IO/Lambda/Mutex.pm view on Meta::CPAN
my $lambda = shift @{$self-> {queue}};
warn "$self gives ownership to $lambda\n" if $DEBUG;
$lambda-> {__already_removed} = 1;
$lambda-> terminate(undef);
}
sub DESTROY { $_-> terminate('dead') for @{shift-> {queue}} }
sub mutex(&)
{
my ( $self, $timeout) = context;
$self-> waiter($timeout)-> condition(shift, \&mutex, 'mutex')
}
sub pipeline
{
my ( $self, $lambda, $timeout) = @_;
lambda {
my @p = @_;
lib/IO/Lambda/Poll.pm view on Meta::CPAN
sub poll_handler
{
my ( $expired, $cb, @opt) = @_;
my @res = $cb->(@opt);
return 1,@res if $res[0];
return 1,0 if $expired;
return 0;
}
sub poller(&)
{
my $cb = _subname poller => shift;
lambda {
my %opt = @_;
poll_event(
undef, undef, \&poll_handler,
exists($opt{timeout}) ? $opt{timeout} : $opt{deadline},
$opt{frequency},
$cb, %opt
lib/IO/Lambda/Socket.pm view on Meta::CPAN
use Socket;
use Exporter;
use IO::Lambda qw(:all :dev);
use Time::HiRes qw(time);
use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS);
@ISA = qw(Exporter);
%EXPORT_TAGS = (all => \@EXPORT_OK);
@EXPORT_OK = qw(connect accept send recv);
use subs qw(connect accept send recv);
sub connect(&)
{
return this-> override_handler('connect', \&connect, shift)
if this-> {override}->{connect};
my $cb = _subname connect => shift;
my ($socket, $deadline) = context;
return this-> add_constant( $cb, \&connect, "Bad socket") unless $socket;
my $w32 = $^O eq 'MSWin32';
lib/IO/Lambda/Socket.pm view on Meta::CPAN
my $e = $w32 ? \ $^E : \ $!;
$$e = unpack('i', getsockopt( $socket, SOL_SOCKET, SO_ERROR));
@param = ($$e) if $$e;
}
local *__ANON__ = "IO::Lambda::Socket::connect::callback";
$cb ? $cb-> (@param) : @param;
}
);
}
sub accept(&)
{
return this-> override_handler('accept', \&accept, shift)
if this-> {override}->{accept};
my $cb = _subname accept => shift;
my ($socket, $deadline) = context;
return this-> add_constant( $cb, \&connect, "Bad socket") unless $socket;
this-> watch_io(
lib/IO/Lambda/Socket.pm view on Meta::CPAN
($h) : ($!)
);
}
local *__ANON__ = "IO::Lambda::Socket::accept::callback";
$cb ? $cb-> (@param) : @param;
}
);
}
# recv($fh, $length, $flags, $deadline) -> (address,msg|undef,error)
sub recv(&)
{
return this-> override_handler('recv', \&recv, shift)
if this-> {override}->{recv};
my $cb = _subname( recv => shift );
my ($socket, $length, $flags, $deadline) = context;
return this-> add_constant( $cb, \&recv, undef, "Bad socket")
unless $socket;
lib/IO/Lambda/Socket.pm view on Meta::CPAN
@param = ( undef, $!);
}
}
local *__ANON__ = "IO::Lambda::Socket::recv::callback";
$cb ? $cb-> (@param) : @param;
}
);
}
# send($fh, $msg, $flags, $to, $deadline) -> ioresult
sub send(&)
{
return this-> override_handler('send', \&send, shift)
if this-> {override}->{send};
my $cb = _subname send => shift;
my ($socket, $msg, $flags, $to, $deadline) = context;
return this-> add_constant( $cb, \&send, undef, "Bad socket")
unless $socket;
lib/IO/Lambda/Thread.pm view on Meta::CPAN
close($self->{socket}) if $self-> {socket};
delete @{$self}{qw(socket thread)};
$self-> SUPER::DESTROY;
}
sub thread { $_[0]-> {thread} }
sub socket { $_[0]-> {socket} }
sub threaded(&)
{
my $cb = shift;
# use overridden IO::Lambda, because we need
# give the caller a chance to join
# for it, if the lambda gets terminated
__PACKAGE__-> new( sub {
# Save context. This is needed because the caller
# may have his own this. lambda(&) does the same
# protection
t/13_synthetic.t view on Meta::CPAN
sub f
{
my @b = @_;
return lambda {
my @c = @_;
return "$a0/@b/@c";
};
}
# test synthetic conditions
sub new_condition(&)
{
f($a0++)-> call($b0++)-> condition( shift, \&new_condition)
}
my $a2 = 0;
this lambda {
context 'a';
new_condition { join('', @_, $a2++, context) }
};