Festival-Client-Async

 view release on metacpan or  search on metacpan

Async.pm  view on Meta::CPAN

}

sub block {
    my $self = shift;
    my $flags = 0;
    fcntl $self->{sock}, F_GETFL, $flags
	or die "fcntl(F_GETFL) failed: $!";
    fcntl $self->{sock}, F_SETFL, $flags & ~O_NONBLOCK
	or die "fcntl(F_SETFL) failed: $!";
    $self->{blocked} = 1;
}

sub unblock {
    my $self = shift;
    my $flags = 0;
    fcntl $self->{sock}, F_GETFL, $flags
	or die "fcntl(F_GETFL) failed: $!";
    fcntl $self->{sock}, F_SETFL, $flags | O_NONBLOCK
	or die "fcntl(F_SETFL) failed: $!";
    $self->{blocked} = 0;
}

# Protocol encoding
use constant KEY     => "ft_StUfF_key";
use constant KEYLEN  => length KEY;

sub write_more {
    my $self = shift;

    while (defined(my $expr = shift @{$self->{outq}{LP}})) {
	$self->{outbuf} .= $expr;
    }

    my $count;
    while (defined(my $b = syswrite($self->{sock}, $self->{outbuf}, 4096))) {
	print "wrote $b bytes\n" if DEBUG;
	last if $b == 0;

	$count += $b;
	substr($self->{outbuf}, 0, $b) = "";
	last if $self->{blocked} and $b < 4096;
    }

    return $count;
}

sub read_more {
    my $self = shift;
    my $fh = $self->{sock};

    my $count = 0;
    my $burf = sysread $fh, my($rbuf), 4096;
    print "read $burf bytes\n" if DEBUG;
    $self->{inbuf} .= $rbuf;

 CHUNK:
    while (length($self->{inbuf}) > 0) {
	# In the middle of a tag?
	if ($self->{intag}) {
	    # Look for the stuff key
	    if ((my $i = index($self->{inbuf}, KEY)) != $[-1) {
		if (substr($self->{inbuf}, $i+KEYLEN, 1) eq 'X') {
		    # If there's an X at the end, it's literal
		    substr($self->{inbuf}, $i+KEYLEN, 1) = "";
		} else {
		    # Otherwise, we've got a complete waveform/expr/whatever
		    push @{$self->{inq}{$self->{intag}}},
			substr($self->{inbuf}, 0, $i);
		    print "queued $i bytes of $self->{intag}\n" if DEBUG;
		    substr($self->{inbuf}, 0, $i+KEYLEN) = "";
		    $self->{intag} = "";
		    $count += $i;
		}
	    } else {
		# Maybe we got *part* of the stuff key at the end of
		# this block.  Stranger things have happened.
		my $leftover = "";
	    PARTIAL:
		for my $sub (1..KEYLEN-1) {
		    my $foo = \substr($self->{inbuf}, -$sub);
		    my $bar = substr(KEY, 0, $sub);
		    if ($$foo eq $bar) {
			$$foo = "";
			$leftover = $bar;
			last PARTIAL;
		    }
		}

		# In any case we don't have any more data
		push @{$self->{inq}{$self->{intag}}}, $self->{inbuf};
		print "queued ", length($self->{inbuf}), " bytes of $self->{intag}\n"
		    if DEBUG;
		$count += length($self->{inbuf});
		$self->{inbuf} = $leftover;

		# But don't keep looping if we left some stuff in there!
		last CHUNK if $leftover;
	    }
	} else {
	    if ($self->{inbuf} =~ s/^(WV|LP|ER|OK)\n//) {
		print "got tag $1\n" if DEBUG;
		$count += length($1);
		# We got a tag, so a new type of data is coming
		if ($1 eq 'OK') {
		    push @{$self->{inq}{OK}}, time;
		} elsif ($1 eq 'ER') {
		    push @{$self->{inq}{ER}}, time;
		} else {
		    $self->{intag} = $1;
		}
	    } else {
		# Should not actually be fatal, it's always possible
		# we just got the middle of a tag.
		last CHUNK;
	    }
	}
    }

    return $count;
}

sub server_eval_sync {
    my ($self, $lisp, $actions) = @_;
    $self->block;
    $self->server_eval($lisp);

    unless ($self->write_more) {
	$self->unblock;
	return undef;
    }
    while ($self->read_more) {
	while (defined(my $wav = $self->dequeue_wave)) {
	    $actions->{WV}->($wav) if exists $actions->{WV};
	}
	while (defined(my $lisp = $self->dequeue_lisp)) {
	    $actions->{LP}->($lisp) if exists $actions->{LP};
	}
	if (defined($self->dequeue_error)) {
	    $self->unblock;
	    return undef;
	}
	if (defined($self->dequeue_ok)) {
	    last;
	}
    }
    $self->unblock;
    return 1;
}

# Don't mix this with async operations :(
sub server_eval_sync_old {
    my ($self, $lisp, $actions) = @_;
    my $fh = $self->{sock};
    $self->block;

    local $|=1;

    my ($rbuf, $rest, $tag);
    print $fh $lisp;
    while (defined($rbuf = $rest) or defined($rbuf = <$fh>)) {
	undef $rest;
	if ($rbuf =~ s/^(WV|LP|ER|OK)\n$//s) {
	    $tag = $1;
	    last if $tag eq 'OK' or $tag eq 'ER';
	}

	if ((my $i = index($rbuf, KEY)) != $[-1) {
	    if (substr($rbuf, $i+KEYLEN, 1) eq 'X') {
		substr($rbuf, $i+KEYLEN, 1) = "";
	    } else {
		$rest = substr($rbuf, $i+KEYLEN);
		substr($rbuf, $i) = "";
	    }
	}

	if (defined $tag and exists $actions->{$tag}) {
	    $actions->{$tag}->($rbuf);
	}
    }

    $self->unblock;
    return defined($tag) && ($tag eq 'OK');
}

sub server_eval {
    my $self = shift;
    push @{$self->{outq}{LP}}, @_;
}

sub write_pending {
    my $self = shift;
    return @{$self->{outq}{LP}};
}

sub wave_pending {
    my $self = shift;
    return @{$self->{inq}{WV}};
}

sub lisp_pending {
    my $self = shift;
    return @{$self->{inq}{LP}};
}

sub ok_pending {
    my $self = shift;
    return @{$self->{inq}{OK}};
}

sub error_pending {
    my $self = shift;
    return @{$self->{inq}{ER}};
}

sub dequeue_wave {
    my $self = shift;
    shift @{$self->{inq}{WV}};
}

sub dequeue_lisp {
    my $self = shift;
    shift @{$self->{inq}{LP}};
}

sub dequeue_ok {
    my $self = shift;
    shift @{$self->{inq}{OK}};



( run in 1.061 second using v1.01-cache-2.11-cpan-5b529ec07f3 )