IO-Lambda

 view release on metacpan or  search on metacpan

lib/IO/Lambda.pm  view on Meta::CPAN

	do {} while yield;
	set_frame(@frame);
}

#
# Part II - Procedural interface to the lambda-style programming
#
#################################################################

sub _lambda_restart { die "lambda() is not restartable" }
sub lambda(&)
{
	my $cb  = _subname(lambda => $_[0]);
	my $l   = __PACKAGE__-> new( sub {
		# initial lambda code is usually executed by tail/tails inside another lambda,
		# so protect the upper-level context
		local *__ANON__ = "IO::Lambda::lambda::callback";
		local $THIS     = shift;
		local @CONTEXT  = ();
		local $CALLBACK = $cb;
		local $METHOD   = \&_lambda_restart;

lib/IO/Lambda.pm  view on Meta::CPAN

}

sub restartable
{
	my $name = @_ ? $_[0] : join(':', caller);
	$THIS->{frames}->{$name} = [ $METHOD, $CALLBACK, @CONTEXT ];
	return $name;
}

# exceptions and backtracing
sub catch(&$)
{
	my ( $cb, $event) = @_;
	my $who = (caller(1))[3];
	my @ctx = @CONTEXT;
	confess "catch callback already defined" if $event-> [WATCH_CANCEL];
	$event->[WATCH_CANCEL] = $cb ? sub {
		local *__ANON__ = "$who\:\:catch" if $DEBUG_CALLER;
		$THIS     = shift;
		local $THIS-> {cancelled_event} = shift;
		local $THIS-> {cancelling} = 1;

lib/IO/Lambda.pm  view on Meta::CPAN

			@CONTEXT  = @ctx;
			$METHOD   = $method;
			$CALLBACK = $cb;
			$cb ? $cb-> (@_) : @_;
		}, 
		($AGAIN ? delete($self-> {cancel}) : undef),
	)
}

# rwx($flags,$handle,$deadline)
sub rwx(&)
{
	return $THIS-> override_handler('rwx', \&rwx, shift)
		if $THIS-> {override}->{rwx};

	$THIS-> add_watch( 
		_subname(rwx => shift), \&rwx,
		@CONTEXT[0,1,2,0,1,2]
	)
}

# readable($handle,$deadline)
sub readable(&)
{
	return $THIS-> override_handler('readable', \&readable, shift)
		if $THIS-> {override}->{readable};

	$THIS-> add_watch( 
		_subname(readable => shift), \&readable, IO_READ, 
		@CONTEXT[0,1,0,1]
	)
}

# writable($handle,$deadline)
sub writable(&)
{
	return $THIS-> override_handler('writable', \&writable, shift)
		if $THIS-> {override}->{writable};
	
	$THIS-> add_watch( 
		_subname(writable => shift), \&writable, IO_WRITE, 
		@CONTEXT[0,1,0,1]
	)
}

lib/IO/Lambda.pm  view on Meta::CPAN

			@CONTEXT  = @ctx;
			$METHOD   = $method;
			$CALLBACK = $cb;
			$cb ? $cb-> (@_) : @_;
		},
		($AGAIN ? delete($self-> {cancel}) : undef),
	)
}

# timeout($deadline)
sub timeout(&)
{
	return $THIS-> override_handler('timeout', \&timeout, shift)
		if $THIS-> {override}->{timeout};
	$THIS-> add_timer( _subname(timeout => shift), \&timeout, @CONTEXT[0,0])
}

# common wrapper for declaration of single lambda-watching user conditions
sub add_tail
{
	my ($self, $cb, $method, $lambda, @ctx) = @_;

lib/IO/Lambda.pm  view on Meta::CPAN

	$THIS-> watch_lambda( IO::Lambda-> new, sub {
		local *__ANON__ = "IO::Lambda::".$name."::callback";
		@CONTEXT  = @ctx;
		$METHOD   = $method;
		$CALLBACK = $cb;
		$cb-> ();
	}) if $cb;
}

# tail( $lambda, @param) -- initialize $lambda with @param, and wait for it
sub tail(&)
{
	return $THIS-> override_handler('tail', \&tail, shift)
		if $THIS-> {override}->{tail};
	
	my ( $lambda, @param) = context;
	return _empty(tail => \&tail, shift) unless $lambda;

	$lambda-> reset
		if $lambda-> is_stopped and $lambda-> autorestart;
	if ( @param) {
		$lambda-> call( @param);
	} else {
		$lambda-> call unless $lambda-> is_active;
	}
	$THIS-> add_tail( _subname(tail => shift), \&tail, $lambda, $lambda, @param);
}


# tails(@lambdas) -- wait for all lambdas to finish
sub tails(&)
{
	return $THIS-> override_handler('tails', \&tails, shift)
		if $THIS-> {override}->{tails};
	
	my $cb = _subname tails => $_[0];
	my @lambdas = context;
	my $n = $#lambdas;
	return _empty(tails => \&tails, $cb) unless @lambdas;

	my @ret;

lib/IO/Lambda.pm  view on Meta::CPAN

		@CONTEXT  = @lambdas;
		$METHOD   = \&tails;
		$CALLBACK = $cb;
		$cb ? $cb-> (@ret) : @ret;
	};
	my $this = $THIS;
	$this-> watch_lambda( $_, $watcher) for @lambdas;
}

# tailo(@lambdas) -- wait for all lambdas to finish, return ordered results
sub tailo(&)
{
	return $THIS-> override_handler('tailo', \&tailo, shift)
		if $THIS-> {override}->{tailo};
	
	my $cb = _subname tailo => $_[0];
	my @lambdas = context;
	my $n = $#lambdas;
	return _empty(tailo => \&tailo, $cb) unless @lambdas;

	my @ret;

lib/IO/Lambda.pm  view on Meta::CPAN

	for ( my $i = 0; $i < @lambdas; $i++) {
		my $d = $i;
		$this-> watch_lambda(
			$lambdas[$i], 
			sub { $watcher->($d, @_) }
		);
	};
}

# any_tail($deadline,@lambdas) -- wait for any lambda to finish within time
sub any_tail(&)
{
	return $THIS-> override_handler('any_tail', \&any_tail, shift)
		if $THIS-> {override}->{any_tail};
	
	my $cb = _subname any_tail => $_[0];
	my ( $deadline, @lambdas) = context;
	my $n = $#lambdas;
	return _empty(any_tail => \&any_tail, $cb) unless @lambdas;

	my ( @ret, @watchers);

lib/IO/Lambda.pm  view on Meta::CPAN

	tail {
		push @ret, $p if shift;
		return @ret unless @p;
		$p = shift @p;
		context $lambda, $p;
		again;
	}}
}

# curry(@a -> $l) :: @a -> @b
sub curry(&)
{
	my $cb = $_[0];
	lambda {
		context $cb->(@_);
		&tail();
	}
}

# seq() :: (@l) -> @m
sub seq { mapcar curry { shift } }

lib/IO/Lambda/DNS.pm  view on Meta::CPAN

		# behave like inet_aton, return single IP address
		for ( $packet-> answer) {
			return $_-> address if $_-> type eq 'A';
		}
		return 'response doesn\'t contain an IP address';
	}

	return $packet;
}}}

sub dns(&) { IO::Lambda::DNS-> new(context)-> condition(shift, \&dns, 'dns') }

1;

__DATA__

=pod

=head1 NAME

IO::Lambda::DNS - DNS queries lambda style

lib/IO/Lambda/Flock.pm  view on Meta::CPAN

{
	my ( $expired, $fh, $shared) = @_;
	if ( CORE::flock( $fh, LOCK_NB | ($shared ? LOCK_SH : LOCK_EX) )) {
		warn "flock $fh obtained\n" if $DEBUG;
		return 1, 1;
	}
	return 1, 0 if $expired;
	return 0;
}

sub flock(&)
{
	return this-> override_handler('flock', \&flock, shift)
		if this-> {override}->{flock};

	my $cb = _subname flock => shift;
	my ($fh, %opt) = context;
	my $deadline = exists($opt{timeout}) ? $opt{timeout} : $opt{deadline};

	poll_event(
		$cb, \&flock, \&poll_flock, 

lib/IO/Lambda/Fork.pm  view on Meta::CPAN

use IO::Lambda qw(:all :dev);
use IO::Lambda::Signal qw(pid);

our @ISA = qw(Exporter);
our @EXPORT_OK = qw(new_process process new_forked forked);
our %EXPORT_TAGS = (all => \@EXPORT_OK);

sub _d { "forked(" . _o($_[0]) . ")" }

# return pid and socket
sub new_process(&)
{
	my $cb = shift;
	
	my $r = IO::Handle-> new;
	my $w = IO::Handle-> new;
	socketpair( $r, $w, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
	$w-> blocking(0);

	my $pid = fork;
	unless ( defined $pid) {

lib/IO/Lambda/Fork.pm  view on Meta::CPAN

	}
		
	warn "forked pid=$pid\n" if $DEBUG;

	close($r);

	return ($pid, $w);
}

# simple fork, return only $? and $!
sub process(&)
{
	my $cb = shift;

	lambda { 
		my $pid = fork;
		return undef, $! unless defined $pid;
		unless ( $pid) {
			warn "process($$) started\n" if $DEBUG;
			eval { $cb->(); };
			warn "process($$) ended\n" if $DEBUG;

lib/IO/Lambda/Fork.pm  view on Meta::CPAN

		}

		warn "forked pid=$pid\n" if $DEBUG;
		context $pid;
		&pid();
	}
	
}

# return output from a subprocess
sub new_forked(&)
{
	my $cb = shift;

	my ( $pid, $r) = new_process {
		my @ret;
		my $socket = shift;
		eval { @ret = $cb-> () if $cb };
		my $msg = $@ ? [ 0, $@ ] : [ 1, @ret ];
		warn "process($$) ended: [@$msg]\n" if $DEBUG > 1;
		print $socket freeze($msg);

lib/IO/Lambda/Fork.pm  view on Meta::CPAN

		}

		context $pid;
	pid {
		warn "pid($pid): exitcode=$?, [@ret]\n" if $DEBUG > 1;
		return shift, @ret;
	}}}
}

# simpler version of new_forked
sub forked(&)
{
	my $cb = shift;
	lambda {
		context &new_forked($cb);
	tail {
		my ( $pid, $ok, @ret) = @_;
		return @ret;
	}}
}

lib/IO/Lambda/HTTP.pm  view on Meta::CPAN

package IO::Lambda::HTTP;
use vars qw(@ISA @EXPORT_OK);
@ISA = qw(Exporter);
@EXPORT_OK = qw(http_request http_server);

use strict;
use warnings;
use IO::Lambda qw(:lambda);
use IO::Lambda::HTTP::Client;

sub http_request(&) 
{
#	Carp::carp "IO::Lambda::HTTP is deprecated, use IO::Lambda::HTTP::Client instead";
	IO::Lambda::HTTP::Client-> new(context)-> 
		condition(shift, \&http_request, 'http_request')
}

sub new {
	shift;
#	Carp::carp "IO::Lambda::HTTP is deprecated, use IO::Lambda::HTTP::Client instead";
	IO::Lambda::HTTP::Client->new(@_);

lib/IO/Lambda/HTTP/Client.pm  view on Meta::CPAN

use warnings;
use Socket;
use Errno;
use Exporter;
use IO::Socket;
use HTTP::Response;
use IO::Lambda qw(:lambda :stream);
use IO::Lambda::Socket qw(connect);
use Time::HiRes qw(time);

sub http_request(&)
{
	__PACKAGE__-> new(context)->
		condition(shift, \&http_request, 'http_request')
}

sub new
{
	my ( $class, $req, %options) = @_;

	my $self = bless {}, $class;

lib/IO/Lambda/HTTP/Server.pm  view on Meta::CPAN

		warn length($resp), " bytes written\n" if $DEBUG > 1;
		return this->start if $keep_alive && !($self->{shutdown} && !length($buf));

		warn "[$session_data->{remote}] disconnect\n" if $DEBUG;
		if ( !close($conn)) {
			warn "error during response:$!\n" if $DEBUG;
		}
	}}}}}}
}

sub http_server(&$;@)
{
	my ( $cb, $listen, %opt) = @_;
	
	my $port = 80;
	unless ( ref $listen ) {
		($listen, $port) = ($1, $2) if $listen =~ /^(.*)\:(\d+)$/;
		$listen = IO::Socket::INET->new(
			Listen => 5,
			LocalAddr => $listen,
			LocalPort => $port,

lib/IO/Lambda/Loop/POE.pm  view on Meta::CPAN

			$poe_kernel-> $meth( $f-> {handle}, 'io', $f);
			warn "$meth charged for ", fileno($f-> {handle}), "\n" if $DEBUG;
		} elsif ( $f-> {mask} & $_) {
			$poe_kernel-> $meth( $f-> {handle} );
			warn "$meth cleared for ", fileno($f-> {handle}), "\n" if $DEBUG;
		}
	}
	$f-> {mask} = $mask;
}

sub purge_filenos(&)
{
	my $sub = shift;
	my @kill;
	while ( my ( $fileno, $r) = each %filenos) {
		my @xr = grep &$sub, @{$r->{rec}};
		next if @xr == @{$r->{rec}};
		$r-> {rec} = \@xr;
		push @kill, $fileno unless @xr;
		reset_mask( $r);
	}

lib/IO/Lambda/Message.pm  view on Meta::CPAN

	# won't end until we call resolve
	my $outer = IO::Lambda-> new;
	my $bind  = $outer-> bind;
	CORE::push @{ $self-> {queue} }, [ $outer, $bind, $msg, $deadline ];

	$self-> push if 1 == @{$self-> {queue}} and not $self-> is_listening;

	return $outer;
}

sub message(&) { new_message(context)-> condition( shift, \&message, 'message') }

package IO::Lambda::Message::Simple;

my $debug = $IO::Lambda::DEBUG{message} || 0;

sub _d { "simple_msg($_[0])" }

sub new
{
	my ( $class, $r, $w) = @_;

lib/IO/Lambda/Mutex.pm  view on Meta::CPAN


	my $lambda = shift @{$self-> {queue}};

	warn "$self gives ownership to $lambda\n" if $DEBUG;
	$lambda-> {__already_removed} = 1;
	$lambda-> terminate(undef);
}

sub DESTROY { $_-> terminate('dead') for @{shift-> {queue}} }

sub mutex(&)
{
	my ( $self, $timeout) = context;
	$self-> waiter($timeout)-> condition(shift, \&mutex, 'mutex')
}

sub pipeline
{
	my ( $self, $lambda, $timeout) = @_;
	lambda {
		my @p = @_;

lib/IO/Lambda/Poll.pm  view on Meta::CPAN


sub poll_handler
{
	my ( $expired, $cb, @opt) = @_;
	my @res = $cb->(@opt);
	return 1,@res if $res[0];
	return 1,0 if $expired;
	return 0;
}

sub poller(&)
{
	my $cb = _subname poller => shift;

	lambda {
		my %opt = @_;
		poll_event(
			undef, undef, \&poll_handler, 
			exists($opt{timeout}) ? $opt{timeout} : $opt{deadline},
			$opt{frequency}, 
			$cb, %opt

lib/IO/Lambda/Socket.pm  view on Meta::CPAN

use Socket;
use Exporter;
use IO::Lambda qw(:all :dev);
use Time::HiRes qw(time);
use vars       qw(@ISA @EXPORT_OK %EXPORT_TAGS);
@ISA         = qw(Exporter);
%EXPORT_TAGS = (all => \@EXPORT_OK);
@EXPORT_OK   = qw(connect accept send recv);
use subs       qw(connect accept send recv);

sub connect(&)
{
	return this-> override_handler('connect', \&connect, shift)
		if this-> {override}->{connect};

	my $cb = _subname connect => shift;
	my ($socket, $deadline) = context;

	return this-> add_constant( $cb, \&connect, "Bad socket") unless $socket;

	my $w32 = $^O eq 'MSWin32';

lib/IO/Lambda/Socket.pm  view on Meta::CPAN

				my $e = $w32 ? \ $^E : \ $!;
				$$e = unpack('i', getsockopt( $socket, SOL_SOCKET, SO_ERROR));
				@param = ($$e) if $$e;
			}
			local *__ANON__ = "IO::Lambda::Socket::connect::callback";
			$cb ? $cb-> (@param) : @param;
		}
	);
}

sub accept(&)
{
	return this-> override_handler('accept', \&accept, shift)
		if this-> {override}->{accept};

	my $cb = _subname accept => shift;
	my ($socket, $deadline) = context;

	return this-> add_constant( $cb, \&connect, "Bad socket") unless $socket;

	this-> watch_io(

lib/IO/Lambda/Socket.pm  view on Meta::CPAN

					($h) : ($!)
				);
			}
			local *__ANON__ = "IO::Lambda::Socket::accept::callback";
			$cb ? $cb-> (@param) : @param;
		}
	);
}

# recv($fh, $length, $flags, $deadline) -> (address,msg|undef,error)
sub recv(&)
{
	return this-> override_handler('recv', \&recv, shift)
		if this-> {override}->{recv};

	my $cb = _subname( recv => shift );
	my ($socket, $length, $flags, $deadline) = context;

	return this-> add_constant( $cb, \&recv, undef, "Bad socket")
		unless $socket;

lib/IO/Lambda/Socket.pm  view on Meta::CPAN

					@param = ( undef, $!);
				}
			}
			local *__ANON__ = "IO::Lambda::Socket::recv::callback";
			$cb ? $cb-> (@param) : @param;
		}
	);
}

# send($fh, $msg, $flags, $to, $deadline) -> ioresult
sub send(&)
{
	return this-> override_handler('send', \&send, shift)
		if this-> {override}->{send};

	my $cb = _subname send => shift;
	my ($socket, $msg, $flags, $to, $deadline) = context;

	return this-> add_constant( $cb, \&send, undef, "Bad socket")
		unless $socket;

lib/IO/Lambda/Thread.pm  view on Meta::CPAN


	close($self->{socket}) if $self-> {socket};
	delete @{$self}{qw(socket thread)};

	$self-> SUPER::DESTROY;
}

sub thread { $_[0]-> {thread} }
sub socket { $_[0]-> {socket} }

sub threaded(&)
{
	my $cb = shift;

	# use overridden IO::Lambda, because we need 
	# give the caller a chance to join
	# for it, if the lambda gets terminated
	__PACKAGE__-> new( sub { 
		# Save context. This is needed because the caller
		# may have his own this. lambda(&) does the same
		# protection

t/13_synthetic.t  view on Meta::CPAN

sub f
{
	my @b = @_;
	return lambda {
		my @c = @_;
		return "$a0/@b/@c";
	};
}

# test synthetic conditions
sub new_condition(&)
{ 
	f($a0++)-> call($b0++)-> condition( shift, \&new_condition) 
}

my $a2 = 0;
this lambda {
	context 'a';
	new_condition { join('', @_, $a2++, context) }
};



( run in 0.893 second using v1.01-cache-2.11-cpan-49f99fa48dc )