Eixo-Docker

 view release on metacpan or  search on metacpan

lib/Eixo/Docker/RequestRawStream.pm  view on Meta::CPAN


	action=>undef,

	host=>undef,

	method=>undef,

	url_args=>[],

	query_args=>[],

	args=>{},

	f_process=>$Eixo::Docker::IDENTITY_FUNC,

	f_line=> $Eixo::Docker::IDENTITY_FUNC,

	f_stdin=>undef,

	f_stdout=>undef,

	f_stderr=>undef,

	f_end=>undef,

	queue_in=>undef,

	queue_out=>undef,

	jobs => [],

    line_termination => "\n",

    timeout => 60,

    tty_mode => undef,
);


sub process{
	my ($self,$multiplexed) = @_;

	#
	# We create a queue to handle communication
	#
	$self->queue_in(Thread::Queue->new);
	$self->queue_out(Thread::Queue->new);

	threads->new(sub {
		my ($self, $multiplexed) = @_;

        $self->_process($multiplexed);

	}, $self,$multiplexed)->detach;

	#
	# We encapsulate a callback to send commands
	#
    if($self->args->{stdin}){
	    
        # return 2 callbacks, 
        # 1 to send messages to docker , 
        # 2 to receive response|ack
        (
            sub {

	        	$self->queue_in->enqueue([++$JOB_ID, $_[0]]);

	        	push @{$self->jobs}, Eixo::Docker::Job->new(
	        	
	        		id => $JOB_ID,
	        		params => $_[0],
	        		status => 'SEND',
                    wait_for_results => ($self->args->{stdout} || $self->args->{stderr})
	        	);

	        	$JOB_ID;
	        },

	        sub {
	        	$self->wait_for_job($_[0])
	        }
        )
    }
    else{
        
        # callback to take messages from container
        sub {
            my $return = undef;

            while(defined(my $res = $self->queue_out->dequeue())){
                # print "resposta".Dumper($res);use Data::Dumper;
                # in perl 5.18 there is a q->end
                last if($res eq "END");

                # call f_process callback (for stream mode)
                $self->f_process->($res->[1]);
                $return = $res->[1];
           }

           $return;
        }
    }
}


sub wait_for_job{
    my ($self, $job_id) = @_;

    # if no job_id was passed, wait for all
    $job_id = '' unless(defined($job_id));

    # job could be finished before wait
    if(my ($job) = grep {$_->finished && $_->id eq $job_id} @{$self->jobs}){

        return $job->results;
    }  


    # wait to finish the job
    while(grep { !$_->finished } @{$self->jobs}){



( run in 0.508 second using v1.01-cache-2.11-cpan-99c4e6809bf )