Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/FileStream.pm  view on Meta::CPAN

#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk

package IO::Async::FileStream;

use strict;
use warnings;

our $VERSION = '0.70';

use base qw( IO::Async::Stream );

use IO::Async::File;

use Carp;
use Fcntl qw( SEEK_SET SEEK_CUR );

=head1 NAME

C<IO::Async::FileStream> - read the tail of a file

=head1 SYNOPSIS

 use IO::Async::FileStream;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new;

 open my $logh, "<", "var/logs/daemon.log" or
    die "Cannot open logfile - $!";

 my $filestream = IO::Async::FileStream->new(
    read_handle => $logh,

    on_initial => sub {
       my ( $self ) = @_;
       $self->seek_to_last( "\n" );
    },

    on_read => sub {
       my ( $self, $buffref ) = @_;

       while( $$buffref =~ s/^(.*\n)// ) {
          print "Received a line $1";
       }

       return 0;
    },
 );

 $loop->add( $filestream );

 $loop->run;

=head1 DESCRIPTION

This subclass of L<IO::Async::Stream> allows reading the end of a regular file
which is being appended to by some other process. It invokes the C<on_read>
event when more data has been added to the file. 

This class provides an API identical to L<IO::Async::Stream> when given a
C<read_handle>; it should be treated similarly. In particular, it can be given
an C<on_read> handler, or subclassed to provide an C<on_read> method, or even
used as the C<transport> for an L<IO::Async::Protocol::Stream> object.

It will not support writing.

To watch a file, directory, or other filesystem entity for updates of other
properties, such as C<mtime>, see also L<IO::Async::File>.

=cut

=head1 EVENTS

The following events are invoked, either using subclass methods or CODE
references in parameters.

Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the
events supported by C<Stream> relating to the read handle are supported here.
This is not a full list; see also the documentation relating to
L<IO::Async::Stream>.

=head2 $ret = on_read \$buffer, $eof

Invoked when more data is available in the internal receiving buffer.

Note that C<$eof> only indicates that all the data currently available in the
file has now been read; in contrast to a regular L<IO::Async::Stream>, this
object will not stop watching after this condition. Instead, it will continue
watching the file for updates.

=head2 on_truncated

local/lib/perl5/IO/Async/FileStream.pm  view on Meta::CPAN

Normally this would be used to seek to the end of the file, for example

 on_initial => sub {
    my ( $self, $filesize ) = @_;
    $self->seek( $filesize );
 }

=cut

sub seek
{
   my $self = shift;
   my ( $offset, $whence ) = @_;

   $self->{running_initial} or croak "Cannot ->seek except during on_initial";

   defined $whence or $whence = SEEK_SET;

   sysseek( $self->read_handle, $offset, $whence );
}

=head2 seek_to_last

   $success = $filestream->seek_to_last( $str_pattern, %opts )

Callable only during the C<on_initial> event. Attempts to move the read
position in the filehandle to just after the last occurance of a given match.
C<$str_pattern> may be a literal string or regexp pattern. 

Returns a true value if the seek was successful, or false if not. Takes the
following named arguments:

=over 8

=item blocksize => INT

Optional. Read the file in blocks of this size. Will take a default of 8KiB if
not defined.

=item horizon => INT

Optional. Give up looking for a match after this number of bytes. Will take a
default value of 4 times the blocksize if not defined.

To force it to always search through the entire file contents, set this
explicitly to C<0>.

=back

Because regular file reading happens synchronously, this entire method
operates entirely synchronously. If the file is very large, it may take a
while to read back through the entire contents. While this is happening no
other events can be invoked in the process.

When looking for a string or regexp match, this method appends the
previously-read buffer to each block read from the file, in case a match
becomes split across two reads. If C<blocksize> is reduced to a very small
value, take care to ensure it isn't so small that a match may not be noticed.

This is most likely useful for seeking after the last complete line in a
line-based log file, to commence reading from the end, while still managing to
capture any partial content that isn't yet a complete line.

 on_initial => sub {
    my $self = shift;
    $self->seek_to_last( "\n" );
 }

=cut

sub seek_to_last
{
   my $self = shift;
   my ( $str_pattern, %opts ) = @_;

   $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial";

   my $offset = $self->{last_size};

   my $blocksize = $opts{blocksize} || 8192;

   defined $opts{horizon} or $opts{horizon} = $blocksize * 4;
   my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0;
   $horizon = 0 if $horizon < 0;

   my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/;

   my $prev = "";
   while( $offset > $horizon ) {
      my $len = $blocksize;
      $len = $offset if $len > $offset;
      $offset -= $len;

      sysseek( $self->read_handle, $offset, SEEK_SET );
      sysread( $self->read_handle, my $buffer, $blocksize );

      # TODO: If $str_pattern is a plain string this could be more efficient
      # using rindex
      if( () = ( $buffer . $prev ) =~ m/$re/sg ) {
         # $+[0] will be end of last match
         my $pos = $offset + $+[0];
         $self->seek( $pos );
         return 1;
      }

      $prev = $buffer;
   }

   $self->seek( $horizon );
   return 0;
}

=head1 TODO

=over 4

=item *

Move the actual file update watching code into L<IO::Async::Loop>, possibly as
a new watch/unwatch method pair C<watch_file>.



( run in 1.072 second using v1.01-cache-2.11-cpan-39bf76dae61 )