view release on metacpan or search on metacpan
## [v0.0.3] - 2026-02-17
### Changed
- Adding an optional timeout to `await_read` and `await_write`.
- Allow fibers to return complex data (AV*, HV*).
## [v0.0.2] - 2026-02-17
### Fixed
- Fixed segfault in `coro_yield` by adding NULL checks for destroyed or missing fibers.
- Resolved stall in exception handling by introducing `last_sender` tracking to prevent `parent_id` cycles.
### Changed
- Made unit tests a lot more noisy
## [v0.0.1] - 2026-02-16
### Changes
- It exists! It shouldn't but it does.
[Unreleased]: https://github.com/sanko/Acme-Parataxis.pm/compare/v0.0.10...HEAD
## Full Coroutines
Fibers in Parataxis are "full coroutines." This means they can suspend from anywhere in the callstack. You can call
`yield( )` from deeply nested functions, and the entire fiber stack will be suspended until the fiber is resumed.
## Transferring Control
While `call( )` and `yield( )` manage a stack-like chain of execution, `transfer( )` provides an unstructured way to
switch between fibers. When you transfer to a fiber, the current one is suspended, and the target fiber resumes. Unlike
`call( )`, transferring does not establish a parent/child relationship. It's more like a `goto` for execution
contexts.
```
$other_fiber->transfer( );
```
## Fibers vs. Threads
In Parataxis, your **Perl code** always runs on a single OS thread. However, when you call an `await_*` function, the
current fiber is suspended, and the actual blocking work is performed on a **different** OS thread in a native pool.
```perl
my $fiber = Acme::Parataxis->new(code => sub {
my $arg = Acme::Parataxis->yield("Initial data");
return "Done with $arg";
});
```
## `call( @args )`
Explicitly switches control to the fiber and passes `@args`. Arguments can be scalars, hash/array references, or
objects. This establishes a parent/child relationship: when the fiber yields or completes, control returns to the
caller.
## `transfer( @args )`
A "symmetric" switch. Suspends the current context and moves directly to the target fiber. No parent/child relationship
is established. Like `call`, it supports passing arbitrary Perl data via `@args`.
# PREEMPTION
## `maybe_yield( )`
Increments an internal operation counter for the current fiber. If the counter reaches the threshold set by
`set_preempt_threshold`, the fiber automatically yields.
```perl
# INTEGRATING SYNCHRONOUS MODULES
To use synchronous modules (like `HTTP::Tiny`) in a non-blocking way, you can subclass their handle or transport
methods and use a `while` loop combined with `yield('WAITING')`. This ensures the fiber yields control until the
underlying I/O is ready.
```perl
# Example: A cooperative HTTP::Tiny subclass
{
package My::HTTP;
use parent 'HTTP::Tiny';
sub _open_handle {
my ($self, $request, $scheme, $host, $port, $peer) = @_;
return My::HTTP::Handle->new(
timeout => $self->{timeout},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout}
)->connect($scheme, $host, $port, $peer);
}
sub request {
my ($self, $method, $url, $args) = @_;
$content .= $data;
return 1;
};
my $res = $self->SUPER::request($method, $url, \%new_args);
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package My::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
exceptions within fibers.
## Signal Handling
Signals are delivered to the main process thread. Perl handles these at 'safe points,' which in this module typically
occur during a context switch (yield, transfer, or call). If you send a signal while a fiber is suspended, it will
generally be processed when the fiber is resumed and hits the next internal Perl opcode.
## The 'Final Transfer' Requirement
In a symmetric coroutine model (using `transfer( )`), fibers don't have a natural 'parent' to return to. I've added
fallback logic to return to the `last_sender` or the main thread on exit but it's good practice to explicitly
`transfer( )` back to a partner fiber or the `root( )` context to ensure your application logic remains predictable.
Leaving a fiber to just 'fall off the end' is like walking out of a room without closing the door; eventually, the
draft will bother someone.
## `is_done( )` vs. Destruction
A fiber being `is_done( )` simply means its Perl code has finished executing. The underlying C-level memory (stacks,
context, etc.) is not immediately freed until the `Acme::Parataxis` object is destroyed or the runtime performs its
final `cleanup( )`. This is why you might see memory usage stay flat even after a fiber finishes, until the garbage
eg/http_tiny.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
$|++;
#
package My::HTTP {
use parent 'HTTP::Tiny';
sub _open_handle( $self, $request, $scheme, $host, $port, $peer ) {
My::HTTP::Handle->new(
timeout => $self->{timeout},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL}
)->connect( $scheme, $host, $port, $peer );
}
eg/http_tiny.pl view on Meta::CPAN
$content .= $data;
return 1;
};
my $res = $self->SUPER::request( $method, $url, \%new_args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
#
package My::HTTP::Handle {
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
use Acme::Parataxis qw[await_read await_write];
sub _do_timeout ( $self, $type, $timeout //= $self->{timeout} // 60 ) {
if ( $self->{fh} ) {
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
lib/Acme/Parataxis.c view on Meta::CPAN
HV * defstash; /**< Default package stash */
SV * errors; /**< Outstanding queued errors */
SV * user_cv; /**< The Perl sub/coderef this fiber is running */
SV * self_ref; /**< The Acme::Parataxis Perl object wrapper */
SV * transfer_data; /**< Arguments or return values passed during yield/transfer */
int id; /**< Numeric ID of this fiber */
int finished; /**< Flag: 1 if the fiber has completed its entry_point */
int parent_id; /**< ID of the fiber that 'called' this one (asymmetric) */
int last_sender; /**< ID of the fiber that last switched control to this one */
} para_fiber_t;
/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1 /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}
lib/Acme/Parataxis.c view on Meta::CPAN
* last resumed or called this fiber.
*
* @param ret_val The Perl SV to "return" to the caller.
* @return SV* The value passed in when this fiber is eventually resumed.
*/
DLLEXPORT SV * coro_yield(SV * ret_val) {
dTHX;
if (current_fiber_id == -1)
return &PL_sv_undef;
para_fiber_t * self = fibers[current_fiber_id];
int parent = self->parent_id;
if (parent != -1 && (!fibers[parent] || fibers[parent]->finished))
parent = self->last_sender;
else if (parent == -1)
parent = self->last_sender;
if (parent >= 0 && (!fibers[parent] || fibers[parent]->finished))
parent = -1;
para_fiber_t * caller = (parent == -1) ? &main_context : fibers[parent];
/* Pass return value to caller */
if (caller->transfer_data != ret_val) {
if (caller->transfer_data && caller->transfer_data != &PL_sv_undef)
SvREFCNT_dec(caller->transfer_data);
caller->transfer_data = ret_val;
if (ret_val && ret_val != &PL_sv_undef)
SvREFCNT_inc(ret_val);
}
perform_switch(parent);
/* Retrieve value passed back during resume */
SV * res = self->transfer_data;
self->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/**
lib/Acme/Parataxis.c view on Meta::CPAN
if (!c)
return -3;
memset(c, 0, sizeof(para_fiber_t));
c->user_cv = user_code;
if (user_code && user_code != &PL_sv_undef)
SvREFCNT_inc(user_code);
c->self_ref = self_ref;
if (self_ref && self_ref != &PL_sv_undef)
SvREFCNT_inc(self_ref);
c->id = idx;
c->parent_id = -1;
c->last_sender = -1;
c->transfer_data = &PL_sv_undef;
fibers[idx] = c;
/* Initialize Perl stacks */
init_perl_stacks(c);
#ifdef _WIN32
c->context = CreateFiber(0, fiber_entry, c);
#else
lib/Acme/Parataxis.c view on Meta::CPAN
c->context.uc_link = &main_context.context;
makecontext(&c->context, (void (*)())posix_entry, 1, c->id);
#endif
return idx;
}
/**
* @brief Resumes a fiber (asymmetric call).
*
* Suspends the caller and switches execution to the specified fiber.
* Sets the caller as the 'parent' for future yields.
*
* @param fiber_id Fiber ID to call.
* @param args Perl SV (usually arrayref) to pass as arguments to the fiber.
* @return SV* Result yielded by the fiber.
*/
DLLEXPORT SV * coro_call(int fiber_id, SV * args) {
dTHX;
if (fiber_id < 0 || fiber_id >= MAX_FIBERS || !fibers[fiber_id] || fibers[fiber_id]->finished)
return &PL_sv_undef;
if (fibers[fiber_id]->transfer_data != args) {
if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef)
SvREFCNT_dec(fibers[fiber_id]->transfer_data);
fibers[fiber_id]->transfer_data = args;
if (args && args != &PL_sv_undef)
SvREFCNT_inc(args);
}
fibers[fiber_id]->parent_id = current_fiber_id;
perform_switch(fiber_id);
if (fibers[fiber_id] && fibers[fiber_id]->finished) {
if (fibers[fiber_id]->transfer_data && fibers[fiber_id]->transfer_data != &PL_sv_undef) {
SvREFCNT_dec(fibers[fiber_id]->transfer_data);
fibers[fiber_id]->transfer_data = &PL_sv_undef;
}
}
para_fiber_t * me = (current_fiber_id == -1) ? &main_context : fibers[current_fiber_id];
SV * res = me->transfer_data;
me->transfer_data = &PL_sv_undef;
if (res && res != &PL_sv_undef)
sv_2mortal(res);
return res;
}
/**
* @brief Transfers control directly to another fiber (symmetric).
*
* Suspends the current fiber and switches directly to the target. No
* parent/child relationship is established.
*
* @param target_id Fiber ID to transfer to.
* @param args Arguments to pass to the target.
* @return SV* Data eventually transferred back to this fiber.
*/
DLLEXPORT SV * coro_transfer(int target_id, SV * args) {
dTHX;
if (target_id < -1 || (target_id >= 0 && (target_id >= MAX_FIBERS || !fibers[target_id])))
return &PL_sv_undef;
if (target_id >= 0 && fibers[target_id]->finished)
lib/Acme/Parataxis.pod view on Meta::CPAN
=head2 Full Coroutines
Fibers in Parataxis are "full coroutines." This means they can suspend from anywhere in the callstack. You can call
C<yield( )> from deeply nested functions, and the entire fiber stack will be suspended until the fiber is resumed.
=head2 Transferring Control
While C<call( )> and C<yield( )> manage a stack-like chain of execution, C<transfer( )> provides an unstructured way to
switch between fibers. When you transfer to a fiber, the current one is suspended, and the target fiber resumes. Unlike
C<call( )>, transferring does not establish a parent/child relationship. It's more like a C<goto> for execution
contexts.
$other_fiber->transfer( );
=head2 Fibers vs. Threads
In Parataxis, your B<Perl code> always runs on a single OS thread. However, when you call an C<await_*> function, the
current fiber is suspended, and the actual blocking work is performed on a B<different> OS thread in a native pool.
Once the task completes, your fiber is automatically queued for resumption on the main thread.
lib/Acme/Parataxis.pod view on Meta::CPAN
Instantiates a new fiber. The C<code> argument must be a subroutine reference.
my $fiber = Acme::Parataxis->new(code => sub {
my $arg = Acme::Parataxis->yield("Initial data");
return "Done with $arg";
});
=head2 C<call( @args )>
Explicitly switches control to the fiber and passes C<@args>. Arguments can be scalars, hash/array references, or
objects. This establishes a parent/child relationship: when the fiber yields or completes, control returns to the
caller.
=head2 C<transfer( @args )>
A "symmetric" switch. Suspends the current context and moves directly to the target fiber. No parent/child relationship
is established. Like C<call>, it supports passing arbitrary Perl data via C<@args>.
=head1 PREEMPTION
=head2 C<maybe_yield( )>
Increments an internal operation counter for the current fiber. If the counter reaches the threshold set by
C<set_preempt_threshold>, the fiber automatically yields.
while (my $row = $sth->fetch) {
lib/Acme/Parataxis.pod view on Meta::CPAN
=head1 INTEGRATING SYNCHRONOUS MODULES
To use synchronous modules (like C<HTTP::Tiny>) in a non-blocking way, you can subclass their handle or transport
methods and use a C<while> loop combined with C<yield('WAITING')>. This ensures the fiber yields control until the
underlying I/O is ready.
# Example: A cooperative HTTP::Tiny subclass
{
package My::HTTP;
use parent 'HTTP::Tiny';
sub _open_handle {
my ($self, $request, $scheme, $host, $port, $peer) = @_;
return My::HTTP::Handle->new(
timeout => $self->{timeout},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout}
)->connect($scheme, $host, $port, $peer);
}
sub request {
my ($self, $method, $url, $args) = @_;
lib/Acme/Parataxis.pod view on Meta::CPAN
$content .= $data;
return 1;
};
my $res = $self->SUPER::request($method, $url, \%new_args);
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package My::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
lib/Acme/Parataxis.pod view on Meta::CPAN
exceptions within fibers.
=head2 Signal Handling
Signals are delivered to the main process thread. Perl handles these at 'safe points,' which in this module typically
occur during a context switch (yield, transfer, or call). If you send a signal while a fiber is suspended, it will
generally be processed when the fiber is resumed and hits the next internal Perl opcode.
=head2 The 'Final Transfer' Requirement
In a symmetric coroutine model (using C<transfer( )>), fibers don't have a natural 'parent' to return to. I've added
fallback logic to return to the C<last_sender> or the main thread on exit but it's good practice to explicitly
C<transfer( )> back to a partner fiber or the C<root( )> context to ensure your application logic remains predictable.
Leaving a fiber to just 'fall off the end' is like walking out of a room without closing the door; eventually, the
draft will bother someone.
=head2 C<is_done( )> vs. Destruction
A fiber being C<is_done( )> simply means its Perl code has finished executing. The underlying C-level memory (stacks,
context, etc.) is not immediately freed until the C<Acme::Parataxis> object is destroyed or the runtime performs its
final C<cleanup( )>. This is why you might see memory usage stay flat even after a fiber finishes, until the garbage
t/001_basic.t view on Meta::CPAN
$|++;
#
diag '$Acme::Parataxis::VERSION = ' . $Acme::Parataxis::VERSION;
diag 'Testing basic asymmetric coroutine flow...';
subtest 'Asymmetric Coroutines' => sub {
diag 'Creating a new Acme::Parataxis object (Fiber)...';
my $f = Acme::Parataxis->new(
code => sub ($name) {
diag "Inside fiber. Hello, $name!";
is( $name, 'Alice', 'Received arg' );
diag 'Yielding back to parent...';
my $val = Acme::Parataxis->yield( 'Hello ' . $name );
diag "Resumed in fiber. Received: $val";
is( $val, 'Bob', 'Received resume arg' );
return 'Goodbye ' . $name;
}
);
diag "Calling fiber with 'Alice'...";
my $res1 = $f->call('Alice');
is( $res1, 'Hello Alice', 'First call result' );
diag "First result: $res1";
t/002_exceptions.t view on Meta::CPAN
my $fiber2 = Acme::Parataxis->new(
code => sub {
diag 'Coro2: About to die...';
die 'Deep death';
}
);
$fiber2->call();
return 'Coro2 survived?';
}
);
like dies { $fiber1->call() }, qr/Deep death/, 'Caught deep death in top-level parent';
};
done_testing();
t/003_signals.t view on Meta::CPAN
code => sub {
diag 'Inside fiber: Yielding READY...';
Acme::Parataxis->yield('READY');
diag 'Inside fiber: Resumed after signal delivery.';
return 'Finished';
}
);
diag 'Calling fiber (First step)...';
my $y = $fiber->call();
is $y, 'READY', 'Fiber suspended at yield';
diag 'Sending signal to parent process...';
kill 'INT', $$;
diag 'Signal count before resume: ' . $signaled;
diag 'Resuming fiber...';
$fiber->call();
is $signaled, 1, 'Signal handled between yield and resume';
diag "Signal count final: $signaled";
};
done_testing();
t/007_data_types.t view on Meta::CPAN
subtest 'Returning object from fiber' => sub {
my $res;
{
my $fiber = Acme::Parataxis->new( code => sub { Local::Destructor->new('B') } );
$res = $fiber->call();
isa_ok $res, ['Local::Destructor'], 'Fiber returned object';
# Fiber is technically done, but we manually flag it to ensure
# the Perl-side wrapper drops its internal references.
$fiber->is_done();
is $DESTROYED, 0, 'Object still alive in parent var';
}
$res = undef;
# Force a stack cycle to clear the mortal reference returned by the XS call
flush_stack();
is $DESTROYED, 1, 'Object destroyed in parent after release';
};
};
#
done_testing();
t/008_preemption.t view on Meta::CPAN
code => sub {
for ( 1 .. 10 ) {
$log .= 'B';
Acme::Parataxis->maybe_yield();
}
}
);
# Call C1. It should run 5 times and then maybe_yield will switch back to main
# because the threshold is hit.
# WAIT: who is the parent? Main.
diag 'Calling C1...';
my $res1 = $c1->call();
is $log, 'AAAAA', 'C1 yielded after 5 iterations';
diag 'Calling C2...';
my $res2 = $c2->call();
is $log, 'AAAAABBBBB', 'C2 yielded after 5 iterations';
diag 'Resuming C1...';
$c1->call();
is $log, 'AAAAABBBBBAAAAA', 'C1 finished its remaining iterations';
diag 'Resuming C2...';
t/009_http_tiny.t view on Meta::CPAN
use blib;
use Acme::Parataxis;
use HTTP::Tiny;
use IO::Socket::INET;
use Time::HiRes qw[time];
use Socket qw[SHUT_WR];
use POSIX ();
$|++;
#
package Acme::Parataxis::Test::HTTP {
use parent 'HTTP::Tiny';
sub _open_handle {
my ( $self, $request, $scheme, $host, $port, $peer ) = @_;
my $handle = Acme::Parataxis::Test::HTTP::Handle->new(
timeout => $self->{timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL},
);
return $handle->connect( $scheme, $host, $port, $peer );
}
t/009_http_tiny.t view on Meta::CPAN
return 1;
};
my $res = $self->SUPER::request( $method, $url, $args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package Acme::Parataxis::Test::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout};
if ( $self->{fh} ) {
my $start = time();
while (1) {
# Immediate check using original select (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
t/013_real_http.t view on Meta::CPAN
# Check for network connectivity
my $http_check = HTTP::Tiny->new( timeout => 2 );
if ( !$http_check->get('http://www.google.com')->{success} && !$http_check->get('http://www.example.com')->{success} ) {
skip_all('No network connectivity detected');
}
#
{
package Acme::Parataxis::Test::RealHTTP;
use parent 'HTTP::Tiny';
sub _open_handle {
my ( $self, $request, $scheme, $host, $port, $peer ) = @_;
my $handle = Acme::Parataxis::Test::RealHTTP::Handle->new(
timeout => $self->{timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout},
);
t/013_real_http.t view on Meta::CPAN
return 1;
};
my $res = $self->SUPER::request( $method, $url, \%new_args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package Acme::Parataxis::Test::RealHTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
t/014_http_pool.t view on Meta::CPAN
# Check for network connectivity
my $http_check = HTTP::Tiny->new( timeout => 2 );
if ( !$http_check->get('http://www.google.com')->{success} && !$http_check->get('http://www.example.com')->{success} ) {
skip_all('No network connectivity detected');
}
#
{
package Acme::Parataxis::Test::PoolHTTP;
use parent 'HTTP::Tiny';
sub _open_handle {
my ( $self, $request, $scheme, $host, $port, $peer ) = @_;
my $handle = Acme::Parataxis::Test::PoolHTTP::Handle->new(
timeout => $self->{timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout},
);
t/014_http_pool.t view on Meta::CPAN
};
no warnings 'uninitialized';
my $res = $self->SUPER::request( $method, $url, \%new_args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
{
package Acme::Parataxis::Test::PoolHTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
t/015_http_mock_pool.t view on Meta::CPAN
use v5.40;
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
use HTTP::Tiny;
use IO::Socket::INET;
use Time::HiRes qw[time];
use POSIX ();
package Acme::Parataxis::Test::MockPoolHTTP {
use parent 'HTTP::Tiny';
sub _open_handle {
my ( $self, $request, $scheme, $host, $port, $peer ) = @_;
my $handle = Acme::Parataxis::Test::MockPoolHTTP::Handle->new(
timeout => $self->{timeout},
SSL_options => $self->{SSL_options},
verify_SSL => $self->{verify_SSL},
keep_alive => $self->{keep_alive},
keep_alive_timeout => $self->{keep_alive_timeout}
);
t/015_http_mock_pool.t view on Meta::CPAN
$content .= $data;
return 1;
};
my $res = $self->SUPER::request( $method, $url, \%new_args );
$res->{content} = $content unless $orig_cb;
return $res;
}
}
package Acme::Parataxis::Test::MockPoolHTTP::Handle {
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;