KinoSearch1

 view release on metacpan or  search on metacpan

lib/KinoSearch1/Util/SortExternal.pm  view on Meta::CPAN


sub new {
    my $class = shift;
    verify_args( \%instance_vars, @_ );
    my %args = ( %instance_vars, @_ );
    my $invindex = $args{invindex};

    $class = ref($class) || $class;

    my $filename = "$args{seg_name}.srt";
    $invindex->delete_file($filename) if $invindex->file_exists($filename);
    my $outstream = $invindex->open_outstream($filename);

    return _new( $class, $outstream,
        @args{qw( invindex seg_name mem_threshold )} );
}

# Prepare to start fetching sorted results.
sub sort_all {
    my $self = shift;

    # deal with any items in the cache right now
    if ( $self->_get_num_runs == 0 ) {
        # if we've never exceeded mem_threshold, sort in-memory
        $self->_sort_cache;
    }
    else {
        # create a run from whatever's in the cache right now
        $self->_sort_run;
    }

    # done adding elements, so close file and reopen as an instream
    $self->_get_outstream->close;
    my $filename = $self->_get_seg_name . ".srt";
    my $instream = $self->_get_invindex()->open_instream($filename);
    $self->_set_instream($instream);

    # allow fetching now that we're set up
    $self->_enable_fetch;
}

sub close { shift->_get_instream()->close }

1;

__END__

__XS__

MODULE = KinoSearch1    PACKAGE = KinoSearch1::Util::SortExternal

void
_new(class, outstream_sv, invindex_sv, seg_name_sv, mem_threshold)
    char         *class;
    SV           *outstream_sv;
    SV           *invindex_sv;
    SV           *seg_name_sv;
    I32           mem_threshold;
PREINIT:
    SortExternal *sortex;
PPCODE:
    sortex = Kino1_SortEx_new(outstream_sv, invindex_sv, seg_name_sv,
        mem_threshold);
    ST(0)  = sv_newmortal();
    sv_setref_pv( ST(0), class, (void*)sortex );
    XSRETURN(1);

=for comment

Add one or more items to the sort pool.

=cut

void
feed(sortex, ...)
    SortExternal *sortex;
PREINIT:
    I32      i;
PPCODE:
    for (i = 1; i < items; i++) {   
        SV const * item_sv = ST(i);
        if (!SvPOK(item_sv))
            continue;
        sortex->feed(sortex, SvPVX(item_sv), SvCUR(item_sv));
    }

=for comment

Fetch the next sorted item from the sort pool.  sort_all must be called first.

=cut

SV*
fetch(sortex)
    SortExternal *sortex;
PREINIT:
    ByteBuf *bb;
CODE:
    bb = sortex->fetch(sortex);
    if (bb == NULL) {
        RETVAL = newSV(0);
    }
    else {
        RETVAL = newSVpvn(bb->ptr, bb->size);
        Kino1_BB_destroy(bb);
    }
OUTPUT: RETVAL

=for comment

Sort all items currently in memory.

=cut

void
_sort_cache(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_sort_cache(sortex);

=for comment

Sort everything in memory and write the sorted elements to disk, creating a
SortExRun C object.

=cut

void
_sort_run(sortex);
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_sort_run(sortex);

=for comment

Turn on fetching.

=cut

void
_enable_fetch(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_enable_fetch(sortex);
    
SV*
_set_or_get(sortex, ...)
    SortExternal *sortex;
ALIAS:
    _set_outstream = 1
    _get_outstream = 2
    _set_instream  = 3
    _get_instream  = 4
    _set_num_runs  = 5
    _get_num_runs  = 6
    _set_invindex  = 7
    _get_invindex  = 8
    _set_seg_name  = 9
    _get_seg_name  = 10
CODE:
{
    KINO_START_SET_OR_GET_SWITCH

    case 1:  SvREFCNT_dec(sortex->outstream_sv);
             sortex->outstream_sv = newSVsv( ST(1) );
             Kino1_extract_struct(sortex->outstream_sv, sortex->outstream, 
                OutStream*, "KinoSearch1::Store::OutStream");
             /* fall through */
    case 2:  RETVAL = newSVsv(sortex->outstream_sv);
             break;
             
    case 3:  SvREFCNT_dec(sortex->instream_sv);
             sortex->instream_sv = newSVsv( ST(1) );
             Kino1_extract_struct(sortex->instream_sv, sortex->instream, 
                InStream*, "KinoSearch1::Store::InStream");
             /* fall through */
    case 4:  RETVAL = newSVsv(sortex->instream_sv);
             break;

    case 5:  Kino1_confess("can't set num_runs");
             /* fall through */
    case 6:  RETVAL = newSViv(sortex->num_runs);
             break;

    case 7:  Kino1_confess("can't set_invindex");
             /* fall through */
    case 8:  RETVAL = newSVsv(sortex->invindex_sv);
             break;
             
    case 9:  Kino1_confess("can't set_seg_name");
             /* fall through */
    case 10: RETVAL = newSVsv(sortex->seg_name_sv);
             break;

    KINO_END_SET_OR_GET_SWITCH
}
OUTPUT: RETVAL

void
DESTROY(sortex)
    SortExternal *sortex;
PPCODE:
    Kino1_SortEx_destroy(sortex);

__H__

#ifndef H_KINOSEARCH_UTIL_SORT_EXTERNAL
#define H_KINOSEARCH_UTIL_SORT_EXTERNAL 1

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "KinoSearch1StoreInStream.h"
#include "KinoSearch1StoreOutStream.h"
#include "KinoSearch1UtilByteBuf.h"
#include "KinoSearch1UtilCClass.h"
#include "KinoSearch1UtilMemManager.h"

typedef struct sortexrun {
    double     start;
    double     file_pos;
    double     end;
    ByteBuf  **cache;
    I32        cache_cap;
    I32        cache_elems;
    I32        cache_pos;
    I32        slice_size;
} SortExRun;

typedef struct sortexternal {
    ByteBuf   **cache;            /* item cache, both incoming and outgoing */
    I32         cache_cap;        /* allocated limit for cache */
    I32         cache_elems;      /* number of elems in cache */ 
    I32         cache_pos;        /* index of current element in cache */
    ByteBuf   **scratch;          /* memory for use by mergesort */
    I32         scratch_cap;      /* allocated limit for scratch */
    I32         mem_threshold;    /* bytes of mem allowed for cache */
    I32         cache_bytes;      /* bytes of mem occupied by cache */
    I32         run_cache_limit;  /* bytes of mem allowed each run cache */
    SortExRun **runs;
    I32         num_runs;
    SV         *outstream_sv;
    OutStream  *outstream;
    SV         *instream_sv;
    InStream   *instream;
    SV         *invindex_sv;
    SV         *seg_name_sv;
    void      (*feed) (struct sortexternal*, char*, I32);
    ByteBuf*  (*fetch)(struct sortexternal*);
} SortExternal;

SortExternal* Kino1_SortEx_new(SV*, SV*, SV*, I32);
void          Kino1_SortEx_feed(SortExternal*, char*, I32);
ByteBuf*      Kino1_SortEx_fetch(SortExternal*);
ByteBuf*      Kino1_SortEx_fetch_death(SortExternal*);
void          Kino1_SortEx_enable_fetch(SortExternal*);
void          Kino1_SortEx_sort_cache(SortExternal*);
void          Kino1_SortEx_sort_run(SortExternal*);
void          Kino1_SortEx_destroy(SortExternal*);

#endif /* include guard */



( run in 0.483 second using v1.01-cache-2.11-cpan-5511b514fd6 )