Festival-Client-Async
view release on metacpan or search on metacpan
}
sub block {
my $self = shift;
my $flags = 0;
fcntl $self->{sock}, F_GETFL, $flags
or die "fcntl(F_GETFL) failed: $!";
fcntl $self->{sock}, F_SETFL, $flags & ~O_NONBLOCK
or die "fcntl(F_SETFL) failed: $!";
$self->{blocked} = 1;
}
sub unblock {
my $self = shift;
my $flags = 0;
fcntl $self->{sock}, F_GETFL, $flags
or die "fcntl(F_GETFL) failed: $!";
fcntl $self->{sock}, F_SETFL, $flags | O_NONBLOCK
or die "fcntl(F_SETFL) failed: $!";
$self->{blocked} = 0;
}
# Protocol encoding
use constant KEY => "ft_StUfF_key";
use constant KEYLEN => length KEY;
sub write_more {
my $self = shift;
while (defined(my $expr = shift @{$self->{outq}{LP}})) {
$self->{outbuf} .= $expr;
}
my $count;
while (defined(my $b = syswrite($self->{sock}, $self->{outbuf}, 4096))) {
print "wrote $b bytes\n" if DEBUG;
last if $b == 0;
$count += $b;
substr($self->{outbuf}, 0, $b) = "";
last if $self->{blocked} and $b < 4096;
}
return $count;
}
sub read_more {
my $self = shift;
my $fh = $self->{sock};
my $count = 0;
my $burf = sysread $fh, my($rbuf), 4096;
print "read $burf bytes\n" if DEBUG;
$self->{inbuf} .= $rbuf;
CHUNK:
while (length($self->{inbuf}) > 0) {
# In the middle of a tag?
if ($self->{intag}) {
# Look for the stuff key
if ((my $i = index($self->{inbuf}, KEY)) != $[-1) {
if (substr($self->{inbuf}, $i+KEYLEN, 1) eq 'X') {
# If there's an X at the end, it's literal
substr($self->{inbuf}, $i+KEYLEN, 1) = "";
} else {
# Otherwise, we've got a complete waveform/expr/whatever
push @{$self->{inq}{$self->{intag}}},
substr($self->{inbuf}, 0, $i);
print "queued $i bytes of $self->{intag}\n" if DEBUG;
substr($self->{inbuf}, 0, $i+KEYLEN) = "";
$self->{intag} = "";
$count += $i;
}
} else {
# Maybe we got *part* of the stuff key at the end of
# this block. Stranger things have happened.
my $leftover = "";
PARTIAL:
for my $sub (1..KEYLEN-1) {
my $foo = \substr($self->{inbuf}, -$sub);
my $bar = substr(KEY, 0, $sub);
if ($$foo eq $bar) {
$$foo = "";
$leftover = $bar;
last PARTIAL;
}
}
# In any case we don't have any more data
push @{$self->{inq}{$self->{intag}}}, $self->{inbuf};
print "queued ", length($self->{inbuf}), " bytes of $self->{intag}\n"
if DEBUG;
$count += length($self->{inbuf});
$self->{inbuf} = $leftover;
# But don't keep looping if we left some stuff in there!
last CHUNK if $leftover;
}
} else {
if ($self->{inbuf} =~ s/^(WV|LP|ER|OK)\n//) {
print "got tag $1\n" if DEBUG;
$count += length($1);
# We got a tag, so a new type of data is coming
if ($1 eq 'OK') {
push @{$self->{inq}{OK}}, time;
} elsif ($1 eq 'ER') {
push @{$self->{inq}{ER}}, time;
} else {
$self->{intag} = $1;
}
} else {
# Should not actually be fatal, it's always possible
# we just got the middle of a tag.
last CHUNK;
}
}
}
return $count;
}
sub server_eval_sync {
my ($self, $lisp, $actions) = @_;
$self->block;
$self->server_eval($lisp);
unless ($self->write_more) {
$self->unblock;
return undef;
}
while ($self->read_more) {
while (defined(my $wav = $self->dequeue_wave)) {
$actions->{WV}->($wav) if exists $actions->{WV};
}
while (defined(my $lisp = $self->dequeue_lisp)) {
$actions->{LP}->($lisp) if exists $actions->{LP};
}
if (defined($self->dequeue_error)) {
$self->unblock;
return undef;
}
if (defined($self->dequeue_ok)) {
last;
}
}
$self->unblock;
return 1;
}
# Don't mix this with async operations :(
sub server_eval_sync_old {
my ($self, $lisp, $actions) = @_;
my $fh = $self->{sock};
$self->block;
local $|=1;
my ($rbuf, $rest, $tag);
print $fh $lisp;
while (defined($rbuf = $rest) or defined($rbuf = <$fh>)) {
undef $rest;
if ($rbuf =~ s/^(WV|LP|ER|OK)\n$//s) {
$tag = $1;
last if $tag eq 'OK' or $tag eq 'ER';
}
if ((my $i = index($rbuf, KEY)) != $[-1) {
if (substr($rbuf, $i+KEYLEN, 1) eq 'X') {
substr($rbuf, $i+KEYLEN, 1) = "";
} else {
$rest = substr($rbuf, $i+KEYLEN);
substr($rbuf, $i) = "";
}
}
if (defined $tag and exists $actions->{$tag}) {
$actions->{$tag}->($rbuf);
}
}
$self->unblock;
return defined($tag) && ($tag eq 'OK');
}
sub server_eval {
my $self = shift;
push @{$self->{outq}{LP}}, @_;
}
sub write_pending {
my $self = shift;
return @{$self->{outq}{LP}};
}
sub wave_pending {
my $self = shift;
return @{$self->{inq}{WV}};
}
sub lisp_pending {
my $self = shift;
return @{$self->{inq}{LP}};
}
sub ok_pending {
my $self = shift;
return @{$self->{inq}{OK}};
}
sub error_pending {
my $self = shift;
return @{$self->{inq}{ER}};
}
sub dequeue_wave {
my $self = shift;
shift @{$self->{inq}{WV}};
}
sub dequeue_lisp {
my $self = shift;
shift @{$self->{inq}{LP}};
}
sub dequeue_ok {
my $self = shift;
shift @{$self->{inq}{OK}};
( run in 1.061 second using v1.01-cache-2.11-cpan-5b529ec07f3 )