Claude-Agent
view release on metacpan or search on metacpan
lib/Claude/Agent/Query.pm view on Meta::CPAN
package Claude::Agent::Query;
use 5.020;
use strict;
use warnings;
use Claude::Agent::Logger '$log';
use Time::HiRes ();
use Types::Common -types;
use Marlin
'prompt!', # Required prompt (string or async generator)
'options' => sub { Claude::Agent::Options->new() },
'loop?', # Optional external IO::Async loop
'_loop==.', # Internal loop reference (rw, no init_arg)
'_process==.', # IO::Async::Process handle
'_stdin==.', # stdin pipe for sending messages
'_messages==.' => sub { [] }, # Message queue
'_pending_futures==.' => sub { [] }, # Futures waiting for messages
'_session_id==.', # Session ID from init message
'_finished==.' => sub { 0 }, # Process finished flag
'_error==.', # Error message if process failed
'_sdk_servers==.' => sub { {} }, # SDK server wrappers (name => SDKServer)
'_hook_executor==.', # Hook executor for Perl callbacks
'_pending_tool_uses==.' => sub { {} }, # Track tool uses awaiting results
'_processing_message==.' => sub { 0 }, # Guard against concurrent message processing
'_cleaned_up==.' => sub { 0 }, # Track if cleanup() has been called
'_jsonl==.' => sub {
JSON::Lines->new(
utf8 => 1,
error_cb => sub {
my ($action, $error, $data) = @_;
# Only log at trace level since parse errors are common
# with streaming JSON and partial data
$log->trace("JSON::Lines $action error: $error");
return;
},
)
};
use IO::Async::Loop;
use IO::Async::Process;
use Future;
use Future::AsyncAwait;
use JSON::Lines;
use Try::Tiny;
use File::Which qw(which);
use File::Spec;
use Claude::Agent::Options;
use Claude::Agent::Message;
use Claude::Agent::Error;
use Claude::Agent::MCP::SDKServer;
use Claude::Agent::Hook::Executor;
use Claude::Agent::DryRun qw(create_dry_run_hooks);
=head1 NAME
Claude::Agent::Query - Query iterator for Claude Agent SDK
=head1 SYNOPSIS
use Claude::Agent::Query;
use Claude::Agent::Options;
my $query = Claude::Agent::Query->new(
prompt => "Find all TODO comments",
options => Claude::Agent::Options->new(
allowed_tools => ['Read', 'Glob', 'Grep'],
),
);
# Blocking iteration
while (my $msg = $query->next) {
if ($msg->isa('Claude::Agent::Message::Result')) {
print $msg->result, "\n";
last;
}
}
=head1 DESCRIPTION
This module handles communication with the Claude CLI process and provides
both blocking and async iteration over response messages.
=head1 CONSTRUCTOR
my $query = Claude::Agent::Query->new(
prompt => "Find all TODO comments",
options => $options,
loop => $loop, # optional, for async integration
);
=head2 Arguments
lib/Claude/Agent/Query.pm view on Meta::CPAN
push @cmd, '--agents', $json;
}
}
# Add setting sources
if ($opts->has_setting_sources && $opts->setting_sources && @{$opts->setting_sources}) {
push @cmd, '--setting-sources', join(',', @{$opts->setting_sources});
}
# Add JSON schema for structured outputs
if ($opts->has_output_format && $opts->output_format) {
my $format = $opts->output_format;
if (ref $format eq 'HASH' && $format->{schema}) {
my $json = $self->_jsonl->encode([$format->{schema}]);
chomp $json;
push @cmd, '--json-schema', $json;
}
}
# For string prompts, use --print mode with -- separator
# For async generators, use stream-json input format
if (!ref($self->prompt)) {
# Sanitize terminal escape sequences from prompt to prevent injection attacks.
# This uses a comprehensive allowlist approach after stripping known escapes.
# See: https://invisible-island.net/xterm/ctlseqs/ctlseqs.html
my $sanitized_prompt = $self->prompt;
# Strip ANSI CSI sequences: ESC [ ... <letter>
$sanitized_prompt =~ s/\x1b\[[0-9;:?]*[a-zA-Z]//g;
# Strip OSC sequences: ESC ] ... (BEL or ST) - title changes, hyperlinks, etc.
$sanitized_prompt =~ s/\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)//gs;
# Strip DCS/SOS/PM/APC sequences: ESC P/X/^/_ ... ST
$sanitized_prompt =~ s/\x1b[PX^_][^\x1b]*(?:\x1b\\|\x07)//gs;
# Strip SS2/SS3 single shift: ESC N, ESC O followed by character
$sanitized_prompt =~ s/\x1b[NO].//g;
# Strip VT52 mode sequences: ESC <letter>
$sanitized_prompt =~ s/\x1b[a-zA-Z]//g;
# Strip C1 control codes (8-bit escape equivalents: 0x80-0x9F)
# These can be used to bypass ESC-based filtering in some terminals
$sanitized_prompt =~ s/[\x80-\x9f]//g;
# Remove dangerous control chars but preserve tab (\x09), newline (\x0a), carriage return (\x0d)
$sanitized_prompt =~ s/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/ /g;
# Final safety: remove any remaining ESC followed by anything
# This catches any escape sequences not matched above
$sanitized_prompt =~ s/\x1b.//g;
# Ultimate safety: strip any remaining bare ESC (0x1B) bytes entirely
# This handles edge cases with multi-byte sequences in exotic terminal emulators
$sanitized_prompt =~ s/\x1b//g;
push @cmd, '--print', '--', $sanitized_prompt;
}
else {
# For streaming input (async generator), use stream-json input format
push @cmd, '--input-format', 'stream-json';
}
return @cmd;
}
sub _start_process {
my ($self) = @_;
my @cmd = $self->_build_command();
$log->debug("Query: Starting Claude CLI process");
$log->trace(sprintf("Query: Command: %s with %d arguments", $cmd[0], scalar(@cmd) - 1));
my $process = IO::Async::Process->new(
command => \@cmd,
stdin => { via => 'pipe_write' },
stdout => {
on_read => sub {
my ($stream, $buffref) = @_;
while ($$buffref =~ s/^([^\n]+)\n//) {
my $line = $1;
$self->_handle_line($line);
}
return 0;
},
},
stderr => {
on_read => sub {
my ($stream, $buffref) = @_;
# Log stderr but don't treat as fatal
while ($$buffref =~ s/^([^\n]+)\n//) {
$log->debug(sprintf("Claude CLI stderr: %s", $1));
}
return 0;
},
},
on_finish => sub {
my ($proc, $exitcode) = @_;
$self->_finished(1);
# Extract actual exit status (WEXITSTATUS equivalent)
my $exit_status = $exitcode >> 8;
$log->debug(sprintf("Query: Process finished with exit code %d", $exit_status));
if ($exit_status != 0) {
$self->_error("Claude CLI exited with code $exit_status");
}
# Resolve any pending async futures
$self->_resolve_pending_futures_on_finish();
},
on_exception => sub {
my ($proc, $exception, $errno, $exitcode) = @_;
$self->_finished(1);
$log->debug(sprintf("Query: Process exception: %s", $exception));
$self->_error("Claude CLI exception: $exception");
# Resolve any pending async futures
$self->_resolve_pending_futures_on_finish();
},
);
# Store references before adding to loop to avoid race conditions
$self->_process($process);
$self->_stdin($process->stdin);
$self->_loop->add($process);
# For --print mode, prompt is in argv, stdin can be closed
if (!ref($self->prompt)) {
$self->_loop->later(sub {
$self->_stdin->close_when_empty if $self->_stdin;
});
}
# For ref prompts (streaming input), caller will send messages via send_user_message
return;
}
sub _handle_line {
my ($self, $line) = @_;
return unless defined $line && length $line;
# Use JSON::Lines decode method for single line
my @decoded = $self->_jsonl->decode($line);
if ($log->is_trace) {
$log->trace(sprintf("Raw line length: %d", length($line)));
$log->trace(sprintf("Raw line: %s", $line));
$log->trace(sprintf("Decoded %d objects", scalar(@decoded)));
$log->trace(sprintf("JSON::Lines buffer remaining: %d chars", length($self->_jsonl->remaining)));
if ($self->_jsonl->remaining) {
$log->trace(sprintf("Buffer content (first 200): %s", substr($self->_jsonl->remaining, 0, 200)));
}
}
# Guard against buffer overflow from accumulated malformed data
# Configurable threshold via environment variable, default 1MB, max 10MB
# Note: Typical JSON messages from Claude CLI are 1-50KB; tool results (especially
# large file reads or grep outputs) may be 100KB-1MB. The minimum of 100KB ensures
# legitimate large tool results are not prematurely truncated.
# Set CLAUDE_AGENT_JSONL_BUFFER_MAX to at least 2x your largest expected message size.
my $buffer_threshold = $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX} // 1_000_000;
my $min_threshold = 100_000; # Minimum 100KB to accommodate large tool results
my $max_threshold = 10_000_000; # Hard cap at 10MB
if (!defined $buffer_threshold || $buffer_threshold !~ /^\d+$/) {
if (defined $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX}) {
# Sanitize env var value to prevent log injection (remove control characters)
my $safe_val = $ENV{CLAUDE_AGENT_JSONL_BUFFER_MAX} // '';
$safe_val =~ s/[[:cntrl:]]//g;
$log->warning(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX='%s' is not a valid integer, using default 1000000", $safe_val));
}
$buffer_threshold = 1_000_000;
}
elsif ($buffer_threshold < $min_threshold) {
$log->warning(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX=%d is below minimum (%d), using %d. "
. "Very small values may truncate legitimate large tool results.",
$buffer_threshold, $min_threshold, $min_threshold));
$buffer_threshold = $min_threshold;
}
elsif ($buffer_threshold > $max_threshold) {
$log->debug(sprintf("CLAUDE_AGENT_JSONL_BUFFER_MAX=%d exceeds maximum (%d), using %d",
$buffer_threshold, $max_threshold, $max_threshold));
$buffer_threshold = $max_threshold;
}
# Check buffer size regardless of decode success to prevent unbounded growth
if ($self->_jsonl->remaining && length($self->_jsonl->remaining) > $buffer_threshold) {
$log->debug(sprintf("JSON::Lines buffer overflow detected (size: %d, threshold: %d), reinitializing",
length($self->_jsonl->remaining), $buffer_threshold));
$self->_jsonl(JSON::Lines->new(
utf8 => 1,
error_cb => sub {
my ($action, $error, $data) = @_;
$log->trace(sprintf("JSON::Lines %s error: %s", $action, $error));
return;
lib/Claude/Agent/Query.pm view on Meta::CPAN
my $id = $query->session_id;
Returns the session ID once available (after init message).
=cut
sub session_id {
my ($self) = @_;
return $self->_session_id;
}
=head2 is_finished
if ($query->is_finished) { ... }
Returns true if the query has finished (process exited).
=cut
sub is_finished {
my ($self) = @_;
return $self->_finished;
}
=head2 error
if (my $err = $query->error) { ... }
Returns error message if the process failed.
=cut
sub error {
my ($self) = @_;
return $self->_error;
}
=head2 interrupt
$query->interrupt;
Send interrupt signal to abort current operation.
=cut
sub interrupt {
my ($self) = @_;
return unless $self->_stdin && !$self->_finished;
my $msg = $self->_jsonl->encode([{ type => 'interrupt' }]);
return unless defined $msg && length $msg;
$self->_stdin->write($msg);
return;
}
=head2 send_user_message
$query->send_user_message("Continue with the next step");
Send a follow-up user message during streaming.
=cut
sub send_user_message {
my ($self, $content) = @_;
return unless $self->_stdin && !$self->_finished;
my $msg = $self->_jsonl->encode([{
type => 'user',
message => {
role => 'user',
content => $content,
},
}]);
my $result = 0; # Default to failure
try {
$result = $self->_stdin->write($msg);
} catch {
$log->debug(sprintf("send_user_message write error: %s", $_));
$result = 0;
};
return $result;
}
=head2 set_permission_mode
$query->set_permission_mode('acceptEdits');
Change permission mode during streaming.
=cut
sub set_permission_mode {
my ($self, $mode) = @_;
return unless $self->_stdin && !$self->_finished;
my $msg = $self->_jsonl->encode([{
type => 'set_permission_mode',
permission_mode => $mode,
}]);
try {
$self->_stdin->write($msg);
} catch {
$log->debug(sprintf("set_permission_mode write error: %s", $_));
};
return;
}
=head2 respond_to_permission
$query->respond_to_permission($tool_use_id, {
behavior => 'allow',
updated_input => $input,
});
Respond to a permission request.
=cut
sub respond_to_permission {
my ($self, $tool_use_id, $response) = @_;
return unless $self->_stdin && !$self->_finished;
my $msg = $self->_jsonl->encode([{
type => 'permission_response',
tool_use_id => $tool_use_id,
response => $response,
}]);
$self->_stdin->write($msg);
return;
}
=head2 rewind_files
$query->rewind_files;
Revert file changes to the checkpoint state.
=cut
sub rewind_files {
my ($self) = @_;
return unless $self->_stdin && !$self->_finished;
my $msg = $self->_jsonl->encode([{ type => 'rewind_files' }]);
$self->_stdin->write($msg);
( run in 0.614 second using v1.01-cache-2.11-cpan-140bd7fdf52 )