Claude-Agent

 view release on metacpan or  search on metacpan

lib/Claude/Agent/Query.pm  view on Meta::CPAN

package Claude::Agent::Query;

use 5.020;
use strict;
use warnings;

use Claude::Agent::Logger '$log';
use Time::HiRes ();
use Types::Common -types;
use Marlin
    'prompt!',                                     # Required prompt (string or async generator)
    'options' => sub { Claude::Agent::Options->new() },
    'loop?',                                       # Optional external IO::Async loop
    '_loop==.',                                    # Internal loop reference (rw, no init_arg)
    '_process==.',                                 # IO::Async::Process handle
    '_stdin==.',                                   # stdin pipe for sending messages
    '_messages==.' => sub { [] },                  # Message queue
    '_pending_futures==.' => sub { [] },           # Futures waiting for messages
    '_session_id==.',                              # Session ID from init message
    '_finished==.' => sub { 0 },                   # Process finished flag
    '_error==.',                                   # Error message if process failed
    '_sdk_servers==.' => sub { {} },               # SDK server wrappers (name => SDKServer)
    '_hook_executor==.',                            # Hook executor for Perl callbacks
    '_pending_tool_uses==.' => sub { {} },          # Track tool uses awaiting results
    '_processing_message==.' => sub { 0 },           # Guard against concurrent message processing
    '_cleaned_up==.' => sub { 0 },                     # Track if cleanup() has been called
    '_jsonl==.' => sub {
        JSON::Lines->new(
            utf8     => 1,
            error_cb => sub {
                my ($action, $error, $data) = @_;
                # Only log at trace level since parse errors are common
                # with streaming JSON and partial data
                $log->trace("JSON::Lines $action error: $error");
                return;
            },
        )
    };

use IO::Async::Loop;
use IO::Async::Process;
use Future;
use Future::AsyncAwait;
use JSON::Lines;
use Try::Tiny;
use File::Which qw(which);
use File::Spec;

use Claude::Agent::Options;
use Claude::Agent::Message;
use Claude::Agent::Error;
use Claude::Agent::MCP::SDKServer;
use Claude::Agent::Hook::Executor;
use Claude::Agent::DryRun qw(create_dry_run_hooks);

=head1 NAME

Claude::Agent::Query - Query iterator for Claude Agent SDK

=head1 SYNOPSIS

    use Claude::Agent::Query;
    use Claude::Agent::Options;

    my $query = Claude::Agent::Query->new(
        prompt  => "Find all TODO comments",
        options => Claude::Agent::Options->new(
            allowed_tools => ['Read', 'Glob', 'Grep'],
        ),
    );

    # Blocking iteration
    while (my $msg = $query->next) {
        if ($msg->isa('Claude::Agent::Message::Result')) {
            print $msg->result, "\n";
            last;
        }
    }

=head1 DESCRIPTION

This module handles communication with the Claude CLI process and provides
both blocking and async iteration over response messages.

=head1 CONSTRUCTOR

    my $query = Claude::Agent::Query->new(
        prompt  => "Find all TODO comments",
        options => $options,
        loop    => $loop,    # optional, for async integration
    );

=head2 Arguments

lib/Claude/Agent/Query.pm  view on Meta::CPAN

            push @cmd, '--agents', $json;
        }
    }

    # Add setting sources
    if ($opts->has_setting_sources && $opts->setting_sources && @{$opts->setting_sources}) {
        push @cmd, '--setting-sources', join(',', @{$opts->setting_sources});
    }

    # Add JSON schema for structured outputs
    if ($opts->has_output_format && $opts->output_format) {
        my $format = $opts->output_format;
        if (ref $format eq 'HASH' && $format->{schema}) {
            my $json = $self->_jsonl->encode([$format->{schema}]);
            chomp $json;
            push @cmd, '--json-schema', $json;
        }
    }

    # For string prompts, use --print mode with -- separator
    # For async generators, use stream-json input format
    if (!ref($self->prompt)) {
        # Sanitize terminal escape sequences from prompt to prevent injection attacks.
        # This uses a comprehensive allowlist approach after stripping known escapes.
        # See: https://invisible-island.net/xterm/ctlseqs/ctlseqs.html
        my $sanitized_prompt = $self->prompt;

        # Strip ANSI CSI sequences: ESC [ ... <letter>
        $sanitized_prompt =~ s/\x1b\[[0-9;:?]*[a-zA-Z]//g;

        # Strip OSC sequences: ESC ] ... (BEL or ST) - title changes, hyperlinks, etc.
        $sanitized_prompt =~ s/\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)//gs;

        # Strip DCS/SOS/PM/APC sequences: ESC P/X/^/_ ... ST
        $sanitized_prompt =~ s/\x1b[PX^_][^\x1b]*(?:\x1b\\|\x07)//gs;

        # Strip SS2/SS3 single shift: ESC N, ESC O followed by character
        $sanitized_prompt =~ s/\x1b[NO].//g;

        # Strip VT52 mode sequences: ESC <letter>
        $sanitized_prompt =~ s/\x1b[a-zA-Z]//g;

        # Strip C1 control codes (8-bit escape equivalents: 0x80-0x9F)
        # These can be used to bypass ESC-based filtering in some terminals
        $sanitized_prompt =~ s/[\x80-\x9f]//g;

        # Remove dangerous control chars but preserve tab (\x09), newline (\x0a), carriage return (\x0d)
        $sanitized_prompt =~ s/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/ /g;

        # Final safety: remove any remaining ESC followed by anything
        # This catches any escape sequences not matched above
        $sanitized_prompt =~ s/\x1b.//g;

        # Ultimate safety: strip any remaining bare ESC (0x1B) bytes entirely
        # This handles edge cases with multi-byte sequences in exotic terminal emulators
        $sanitized_prompt =~ s/\x1b//g;

        push @cmd, '--print', '--', $sanitized_prompt;
    }
    else {
        # For streaming input (async generator), use stream-json input format
        push @cmd, '--input-format', 'stream-json';
    }

    return @cmd;
}

sub _start_process {
    my ($self) = @_;

    my @cmd = $self->_build_command();

    $log->debug("Query: Starting Claude CLI process");
    $log->trace(sprintf("Query: Command: %s with %d arguments", $cmd[0], scalar(@cmd) - 1));

    my $process = IO::Async::Process->new(
        command => \@cmd,
        stdin  => { via => 'pipe_write' },
        stdout => {
            on_read => sub {
                my ($stream, $buffref) = @_;
                while ($$buffref =~ s/^([^\n]+)\n//) {
                    my $line = $1;
                    $self->_handle_line($line);
                }
                return 0;
            },
        },
        stderr => {
            on_read => sub {
                my ($stream, $buffref) = @_;
                # Log stderr but don't treat as fatal
                while ($$buffref =~ s/^([^\n]+)\n//) {
                    $log->debug(sprintf("Claude CLI stderr: %s", $1));
                }
                return 0;
            },
        },
        on_finish => sub {
            my ($proc, $exitcode) = @_;
            $self->_finished(1);
            # Extract actual exit status (WEXITSTATUS equivalent)
            my $exit_status = $exitcode >> 8;
            $log->debug(sprintf("Query: Process finished with exit code %d", $exit_status));
            if ($exit_status != 0) {
                $self->_error("Claude CLI exited with code $exit_status");
            }
            # Resolve any pending async futures
            $self->_resolve_pending_futures_on_finish();
        },
        on_exception => sub {
            my ($proc, $exception, $errno, $exitcode) = @_;
            $self->_finished(1);
            $log->debug(sprintf("Query: Process exception: %s", $exception));
            $self->_error("Claude CLI exception: $exception");
            # Resolve any pending async futures
            $self->_resolve_pending_futures_on_finish();
        },
    );

    # Store references before adding to loop to avoid race conditions
    $self->_process($process);
    $self->_stdin($process->stdin);
    $self->_loop->add($process);

    # For --print mode, prompt is in argv, stdin can be closed
    if (!ref($self->prompt)) {
        $self->_loop->later(sub {
            $self->_stdin->close_when_empty if $self->_stdin;
        });
    }
    # For ref prompts (streaming input), caller will send messages via send_user_message
    return;
}

sub _handle_line {
    my ($self, $line) = @_;

    return unless defined $line && length $line;

    # Use JSON::Lines decode method for single line
    my @decoded = $self->_jsonl->decode($line);
    if ($log->is_trace) {
        $log->trace(sprintf("Raw line length: %d", length($line)));
        $log->trace(sprintf("Raw line: %s", $line));
        $log->trace(sprintf("Decoded %d objects", scalar(@decoded)));
        $log->trace(sprintf("JSON::Lines buffer remaining: %d chars", length($self->_jsonl->remaining)));
        if ($self->_jsonl->remaining) {
            $log->trace(sprintf("Buffer content (first 200): %s", substr($self->_jsonl->remaining, 0, 200)));
        }
    }

    # Guard against buffer overflow from accumulated malformed data
    # Configurable threshold via environment variable, default 1MB, max 10MB
    # Note: Typical JSON messages from Claude CLI are 1-50KB; tool results (especially
    # large file reads or grep outputs) may be 100KB-1MB. The minimum of 100KB ensures
    # legitimate large tool results are not prematurely truncated.
    # Set CLAUDE_AGENT_JSONL_BUFFER_MAX to at least 2x your largest expected message size.
    my $buffer_threshold = $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX} // 1_000_000;
    my $min_threshold = 100_000;  # Minimum 100KB to accommodate large tool results
    my $max_threshold = 10_000_000;  # Hard cap at 10MB
    if (!defined $buffer_threshold || $buffer_threshold !~ /^\d+$/) {
        if (defined $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX}) {
            # Sanitize env var value to prevent log injection (remove control characters)
            my $safe_val = $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX} // '';
            $safe_val =~ s/[[:cntrl:]]//g;
            $log->warning(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX='%s' is not a valid integer, using default 1000000", $safe_val));
        }
        $buffer_threshold = 1_000_000;
    }
    elsif ($buffer_threshold < $min_threshold) {
        $log->warning(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX=%d is below minimum (%d), using %d. "
            . "Very small values may truncate legitimate large tool results.",
            $buffer_threshold, $min_threshold, $min_threshold));
        $buffer_threshold = $min_threshold;
    }
    elsif ($buffer_threshold > $max_threshold) {
        $log->debug(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX=%d exceeds maximum (%d), using %d",
            $buffer_threshold, $max_threshold, $max_threshold));
        $buffer_threshold = $max_threshold;
    }

    # Check buffer size regardless of decode success to prevent unbounded growth
    if ($self->_jsonl->remaining && length($self->_jsonl->remaining) > $buffer_threshold) {
        $log->debug(sprintf("JSON::Lines buffer overflow detected (size: %d, threshold: %d), reinitializing",
            length($self->_jsonl->remaining), $buffer_threshold));
        $self->_jsonl(JSON::Lines->new(
            utf8     => 1,
            error_cb => sub {
                my ($action, $error, $data) = @_;
                $log->trace(sprintf("JSON::Lines %s error: %s", $action, $error));
                return;

lib/Claude/Agent/Query.pm  view on Meta::CPAN

    my $id = $query->session_id;

Returns the session ID once available (after init message).

=cut

sub session_id {
    my ($self) = @_;
    return $self->_session_id;
}

=head2 is_finished

    if ($query->is_finished) { ... }

Returns true if the query has finished (process exited).

=cut

sub is_finished {
    my ($self) = @_;
    return $self->_finished;
}

=head2 error

    if (my $err = $query->error) { ... }

Returns error message if the process failed.

=cut

sub error {
    my ($self) = @_;
    return $self->_error;
}

=head2 interrupt

    $query->interrupt;

Send interrupt signal to abort current operation.

=cut

sub interrupt {
    my ($self) = @_;

    return unless $self->_stdin && !$self->_finished;

    my $msg = $self->_jsonl->encode([{ type => 'interrupt' }]);
    return unless defined $msg && length $msg;
    $self->_stdin->write($msg);
    return;
}

=head2 send_user_message

    $query->send_user_message("Continue with the next step");

Send a follow-up user message during streaming.

=cut

sub send_user_message {
    my ($self, $content) = @_;

    return unless $self->_stdin && !$self->_finished;

    my $msg = $self->_jsonl->encode([{
        type    => 'user',
        message => {
            role    => 'user',
            content => $content,
        },
    }]);
    my $result = 0;  # Default to failure
    try {
        $result = $self->_stdin->write($msg);
    } catch {
        $log->debug(sprintf("send_user_message write error: %s", $_));
        $result = 0;
    };
    return $result;
}

=head2 set_permission_mode

    $query->set_permission_mode('acceptEdits');

Change permission mode during streaming.

=cut

sub set_permission_mode {
    my ($self, $mode) = @_;

    return unless $self->_stdin && !$self->_finished;

    my $msg = $self->_jsonl->encode([{
        type            => 'set_permission_mode',
        permission_mode => $mode,
    }]);
    try {
        $self->_stdin->write($msg);
    } catch {
        $log->debug(sprintf("set_permission_mode write error: %s", $_));
    };
    return;
}

=head2 respond_to_permission

    $query->respond_to_permission($tool_use_id, {
        behavior      => 'allow',
        updated_input => $input,
    });

Respond to a permission request.

=cut

sub respond_to_permission {
    my ($self, $tool_use_id, $response) = @_;

    return unless $self->_stdin && !$self->_finished;

    my $msg = $self->_jsonl->encode([{
        type        => 'permission_response',
        tool_use_id => $tool_use_id,
        response    => $response,
    }]);
    $self->_stdin->write($msg);
    return;
}

=head2 rewind_files

    $query->rewind_files;

Revert file changes to the checkpoint state.

=cut

sub rewind_files {
    my ($self) = @_;

    return unless $self->_stdin && !$self->_finished;

    my $msg = $self->_jsonl->encode([{ type => 'rewind_files' }]);
    $self->_stdin->write($msg);



( run in 0.614 second using v1.01-cache-2.11-cpan-140bd7fdf52 )