FunctionalPerl

 view release on metacpan or  search on metacpan

lib/FP/IOStream.pm  view on Meta::CPAN

# bundled with this file.
#

=head1 NAME

FP::IOStream

=head1 SYNOPSIS

    use FP::IOStream ':all'; # xdirectory_items, xdirectory_paths
    use FP::Stream; # stream_map
    use FP::List ':all'; # first
    my $paths = stream_map sub { my ($item) = @_; "$base/$item" },
                          xdirectory_items $base;
    # which is the same as: my $paths = xdirectory_paths $base;
    my $firstpath = first $paths;
    # ...

=head1 DESCRIPTION

Lazy IO (well, input), by reading items lazily as stream items.

(It's arguable whether that is a good idea; Haskell uses different
approaches nowadays. But it's still a nice way to do things if you're
careful.)

=head1 NOTE

This is alpha software! Read the status section in the package README
or on the L<website|http://functional-perl.org/>.

=cut

package FP::IOStream;
use strict;
use warnings;
use warnings FATAL => 'uninitialized';
use Exporter "import";

our @EXPORT    = qw();
our @EXPORT_OK = qw(maybeIO_to_stream
    fh_to_stream
    perhaps_directory_items
    perhaps_directory_paths
    xdirectory_items
    xdirectory_paths
    xfile_lines xfile_lines0 xfile_lines0chop xfile_lines_chomp
    xfile_chars
    fh_to_lines
    fh_to_chunks
    timestream
    xstream_print
    xstream_to_file
    xfile_replace_lines
);
our %EXPORT_TAGS = (all => [@EXPORT, @EXPORT_OK]);

use FP::Lazy;
use Chj::xopendir qw(perhaps_opendir);
use FP::List ':all';
use FP::Stream qw(stream_map weaken Weakened);
use FP::PureArray qw(array_to_purearray);
use FP::Array_sort;
use FP::Ops 'the_method';
use Carp;
use Chj::singlequote ":all";
use Chj::xopen qw(
    xopen_read
    xopen_write
    xopen_append
    xopen_update
    possibly_fh_to_fh
    glob_to_fh
);
use Chj::xtmpfile qw(xtmpfile);
use FP::Carp;

# XX use this for the definitions further below instead of re-coding
# it each time?
sub maybeIO_to_stream {
    my ($maybeIO, $maybe_close) = @_;
    my $next;
    $next = sub {
        my $next = $next;
        lazyT {
            if (defined(my $v = &$maybeIO())) {
                cons($v, &$next)
            } else {
                if (defined $maybe_close) {
                    &$maybe_close()
                }
                null
            }
        }
        "FP::List::List"
    };
    &{ Weakened $next}
}

sub _perhaps_opendir_stream {
    @_ == 1 or fp_croak_arity 1;
    my ($path) = @_;
    if (my ($d) = perhaps_opendir $path) {
        my $next;
        $next = sub {
            my $next = $next;
            lazyT {
                if (defined(my $item = $d->xnread)) {
                    cons $item, &$next
                } else {
                    $d->xclose;
                    null
                }
            }
            "FP::List::List"
        };
        &{ Weakened $next}
    } else {
        ()
    }
}

lib/FP/IOStream.pm  view on Meta::CPAN

    = make_open_stream(\&xopen_read, the_method("xreadline0_chop"));

sub xfile_lines_chomp;
*xfile_lines_chomp
    = make_open_stream(\&xopen_read, the_method("xreadline_chomp"));

sub xfile_chars;
*xfile_chars = make_open_stream(\&xopen_read, the_method("xreadchar"));

# Clojure calls this line-seq
#  (http://clojure.github.io/clojure/clojure.core-api.html#clojure.core/line-seq)
sub fh_to_lines {
    @_ == 1 or fp_croak_arity 1;
    my ($fh) = @_;
    fh_to_stream(possibly_fh_to_fh($fh), the_method("xreadline"),
        the_method("xclose"))
}

# read filehandle in chunks, although the chunk size, even of the
# chunks before the last one, is only guaranteed to be non-zero, not
# bufiz (since only xsysreadcompletely would guarantee to fill size,
# but would die on mid-chunk EOF)

sub fh_to_chunks {
    @_ == 2 or fp_croak_arity 2;
    my ($fh, $bufsiz) = @_;
    fh_to_stream(
        possibly_fh_to_fh($fh),
        sub {
            my $buf;
            my $n = $fh->xsysread($buf, $bufsiz);
            $n == 0 ? undef : $buf
        },
        the_method("xclose")
    );
}

# A stream of floating-point unix timestamps representing the time
# when each cell is being forced. Optional argument in seconds
# (floating point) to sleep before returning the next element.

sub timestream {
    @_ >= 0 and @_ <= 1 or fp_croak_arity "0-1";
    my ($maybe_sleep) = @_;
    require Time::HiRes;
    my $lp;
    $lp = sub {
        lazyT {
            Time::HiRes::sleep($maybe_sleep) if $maybe_sleep;
            cons(Time::HiRes::time(), &$lp())
        }
        "FP::List::Pair"
    };
    Weakened($lp)->();
}

sub xstream_print {
    @_ == 1 or @_ == 2 or fp_croak_arity "1 or 2";
    my ($s, $maybe_fh) = @_;
    my $fh = $maybe_fh // glob_to_fh(*STDOUT);
    weaken $_[0];
    $s->for_each(
        sub {
            print $fh $_[0] or croak "xstream_print: writing to $fh: $!";
        }
    );
}

sub xstream_to_file {
    @_ == 2 or @_ == 3 or fp_croak_arity "2 or 3";
    my ($s, $path, $maybe_mode) = @_;
    my $out = xtmpfile $path;
    weaken $_[0];
    xstream_print($s, $out);
    $out->xclose;
    $out->xputback($maybe_mode);
}

# read and write back a file, passing its lines as a stream to the
# given function; written to temp file that's renamed into place upon
# successful completion.
sub xfile_replace_lines {
    @_ == 2 or fp_croak_arity 2;
    my ($path, $fn) = @_;
    xstream_to_file &$fn(xfile_lines $path), $path;
}

1



( run in 1.290 second using v1.01-cache-2.11-cpan-39bf76dae61 )