Langertha
view release on metacpan or search on metacpan
lib/Langertha/Role/Chat.pm view on Meta::CPAN
push @out, $self->_normalize_content_blocks($msg);
}
return \@out;
}
sub _normalize_content_blocks {
my ( $self, $msg ) = @_;
my $content = $msg->{content};
return $msg unless ref $content eq 'ARRAY';
my $needs_convert = 0;
for my $b (@$content) {
if ( blessed($b) && $b->does('Langertha::Content') ) {
$needs_convert = 1;
last;
}
}
return $msg unless $needs_convert;
my $fmt = $self->content_format;
my $method = "to_$fmt";
my @blocks = map {
if ( blessed($_) && $_->does('Langertha::Content') ) {
$_->$method;
}
elsif ( !ref $_ ) {
$fmt eq 'gemini'
? { text => $_ }
: { type => 'text', text => $_ };
}
else {
$_;
}
} @$content;
if ( $fmt eq 'gemini' ) {
my $role = ( $msg->{role} // 'user' ) eq 'assistant' ? 'model' : ( $msg->{role} // 'user' );
return { role => $role, parts => \@blocks };
}
return { %$msg, content => \@blocks };
}
sub simple_chat {
my ( $self, @messages ) = @_;
$log->debugf("[%s] simple_chat with %d message(s), model=%s",
ref $self, scalar @messages, $self->chat_model // 'default');
my $request = $self->chat(@messages);
my $response = $self->user_agent->request($request);
my $result = $request->response_call->($response);
if ($self->can('has_rate_limit') && $self->has_rate_limit && ref $result && $result->isa('Langertha::Response')) {
$result = $result->clone_with(rate_limit => $self->rate_limit);
}
return $result;
}
sub chat_stream {
my ( $self, @messages ) = @_;
croak "".(ref $self)." does not support streaming"
unless $self->can('chat_stream_request');
return $self->chat_stream_request($self->chat_messages(@messages));
}
sub simple_chat_stream {
my ( $self, $callback, @messages ) = @_;
croak "simple_chat_stream requires a callback as first argument"
unless ref $callback eq 'CODE';
$log->debugf("[%s] simple_chat_stream (%s format)", ref $self, $self->stream_format);
my $request = $self->chat_stream(@messages);
my $chunks = $self->execute_streaming_request($request, $callback);
$log->debugf("[%s] Stream completed: %d chunks", ref $self, scalar @$chunks);
return join('', map { $_->content } @$chunks);
}
sub simple_chat_stream_iterator {
my ( $self, @messages ) = @_;
require Langertha::Stream;
my $request = $self->chat_stream(@messages);
my $chunks = $self->execute_streaming_request($request);
return Langertha::Stream->new(chunks => $chunks);
}
# Future-based async methods
has _async_loop => (
is => 'ro',
lazy_build => 1,
);
sub _build__async_loop {
require IO::Async::Loop;
return IO::Async::Loop->new;
}
has _async_http => (
is => 'ro',
lazy_build => 1,
);
sub _build__async_http {
my ($self) = @_;
require Net::Async::HTTP;
my $http = Net::Async::HTTP->new;
$self->_async_loop->add($http);
return $http;
}
async sub simple_chat_f {
my ( $self, @messages ) = @_;
$log->debugf("[%s] simple_chat_f with %d message(s)", ref $self, scalar @messages);
return await $self->chat_f( messages => \@messages );
}
async sub chat_f {
my ( $self, %opts ) = @_;
my $messages = delete $opts{messages} // [];
my @messages = ref $messages eq 'ARRAY' ? @$messages : ($messages);
# Auto-fallback: forced named tool on an engine that cannot do
# native named-tool-forcing but supports json_schema response_format.
# Rewrite tools+tool_choice into a response_format and remember the
# tool name so we can synthesize a tool_calls entry afterwards.
my $synth_tool_name;
if ( exists $opts{tool_choice}
&& exists $opts{tools}
&& !$self->supports('tool_choice_named')
&& $self->supports('response_format_json_schema')
) {
my $tc = Langertha::ToolChoice->from_hash( $opts{tool_choice} );
if ( $tc && $tc->type eq 'tool' && defined $tc->name && length $tc->name ) {
my $name = $tc->name;
my ($tool) =
grep { defined $_ && $_->name eq $name }
map { Langertha::Tool->from_hash($_) }
@{ $opts{tools} };
if ($tool) {
delete $opts{tools};
delete $opts{tool_choice};
$opts{response_format} = {
type => 'json_schema',
json_schema => {
%{ $tool->to_json_schema },
strict => JSON->true,
},
};
$synth_tool_name = $name;
$log->debugf("[%s] forced-tool fallback: tool '%s' rerouted via response_format",
ref $self, $name);
}
}
}
my $request = $self->chat_request( $self->chat_messages(@messages), %opts );
my $response = await $self->_async_http->do_request( request => $request );
unless ($response->is_success) {
die "".(ref $self)." request failed: ".$response->status_line;
}
my $result = $request->response_call->($response);
if ( $synth_tool_name && blessed($result) && $result->isa('Langertha::Response') ) {
my $args = $self->decode_loose_json( $result->content );
if ( defined $args ) {
$result = $result->clone_with(
tool_calls => [{
name => $synth_tool_name,
arguments => $args,
synthetic => 1,
}],
);
}
}
if ( $self->can('has_rate_limit') && $self->has_rate_limit
&& ref $result && $result->isa('Langertha::Response') ) {
$result = $result->clone_with( rate_limit => $self->rate_limit );
}
return $result;
}
sub simple_chat_stream_f {
my ($self, @messages) = @_;
return $self->simple_chat_stream_realtime_f(undef, @messages);
}
async sub simple_chat_stream_realtime_f {
my ($self, $chunk_callback, @messages) = @_;
croak "".(ref $self)." does not support streaming"
unless $self->can('chat_stream_request');
my $request = $self->chat_stream_request($self->chat_messages(@messages));
my @all_chunks;
my $buffer = '';
my $format = $self->stream_format;
my $response_status;
await $self->_async_http->do_request(
request => $request,
on_header => sub {
my ($response) = @_;
$response_status = $response;
# Return a callback that handles each body chunk
return sub {
my ($data) = @_;
return unless defined $data; # undef signals end of body
$buffer .= $data;
my $chunks = $self->_process_stream_buffer(\$buffer, $format);
for my $chunk (@$chunks) {
push @all_chunks, $chunk;
$chunk_callback->($chunk) if $chunk_callback;
}
};
},
);
unless ($response_status->is_success) {
die "".(ref $self)." streaming request failed: ".$response_status->status_line;
}
# Process remaining buffer
if ($buffer ne '') {
my $chunks = $self->_process_stream_buffer(\$buffer, $format, 1);
for my $chunk (@$chunks) {
push @all_chunks, $chunk;
$chunk_callback->($chunk) if $chunk_callback;
}
}
my $content = join('', map { $_->content } @all_chunks);
return ($content, \@all_chunks);
}
sub aggregate_tool_calls {
my ( $self, $chunks ) = @_;
return [] unless ref($chunks) eq 'ARRAY';
my @tcs;
for my $c (@$chunks) {
next unless eval { $c->has_tool_calls };
push @tcs, @{ $c->tool_calls };
}
return \@tcs;
}
sub _process_stream_buffer {
my ($self, $buffer_ref, $format, $final) = @_;
my @chunks;
if ($format eq 'sse') {
while ($$buffer_ref =~ s/^(.*?)\n\n//s) {
my $block = $1;
for my $line (split /\n/, $block) {
next if $line eq '' || $line =~ /^:/;
if ($line =~ /^data:\s*(.*)$/) {
my $json_data = $1;
next if $json_data eq '[DONE]' || $json_data eq '';
my $parsed = $self->json->decode($json_data);
my $chunk = $self->parse_stream_chunk($parsed);
push @chunks, $chunk if $chunk;
}
}
}
} elsif ($format eq 'ndjson') {
while ($$buffer_ref =~ s/^(.*?)\n//s) {
my $line = $1;
next if $line eq '';
my $parsed = $self->json->decode($line);
my $chunk = $self->parse_stream_chunk($parsed);
push @chunks, $chunk if $chunk;
}
}
return \@chunks;
}
with 'Langertha::Role::ThinkTag', 'Langertha::Role::Langfuse';
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Langertha::Role::Chat - Role for APIs with normal chat functionality
=head1 VERSION
version 0.502
=head1 SYNOPSIS
# Synchronous chat
my $response = $engine->simple_chat('Hello, how are you?');
# Streaming with callback
$engine->simple_chat_stream(sub {
my ($chunk) = @_;
print $chunk->content;
}, 'Tell me a story');
# Streaming with iterator
my $stream = $engine->simple_chat_stream_iterator('Tell me a story');
while (my $chunk = $stream->next) {
print $chunk->content;
}
# Async with Future (traditional style)
my $future = $engine->simple_chat_f('Hello');
my $response = $future->get;
# Async with Future::AsyncAwait (recommended)
use Future::AsyncAwait;
async sub chat_example {
my ($engine) = @_;
my $response = await $engine->simple_chat_f('Hello');
say $response;
}
# Async streaming with real-time callback
async sub stream_example {
my ($engine) = @_;
my ($content, $chunks) = await $engine->simple_chat_stream_realtime_f(
sub { print shift->content },
'Tell me a story'
);
say "\nTotal chunks: ", scalar @$chunks;
}
=head1 DESCRIPTION
This role provides chat functionality for LLM engines. It includes both
synchronous and asynchronous (L<Future>-based) methods for chat and streaming.
The Future-based C<_f> methods are implemented using L<Future::AsyncAwait> and
L<Net::Async::HTTP>. These modules are loaded lazily only when you call a C<_f>
method, so synchronous-only usage does not require them.
=head2 chat_model
The model name used for chat requests. Lazily defaults to C<default_chat_model>
if the engine provides it, otherwise falls back to the general C<model>
attribute from L<Langertha::Role::Models>.
=head2 chat
my $request = $engine->chat(@messages);
Builds and returns a chat HTTP request object. Messages may be plain strings
(treated as C<user> role) or HashRefs with C<role> and C<content> keys. A
system prompt from L<Langertha::Role::SystemPrompt> is prepended automatically.
=head2 chat_messages
my $messages = $engine->chat_messages(@messages);
Normalises C<@messages> into the canonical ArrayRef-of-HashRef format expected
by C<chat_request>. Plain strings become C<{ role =E<gt> 'user', content =E<gt>
$string }>. If the engine has a C<system_prompt> set it is prepended as a
C<system> message.
=head2 simple_chat
my $response = $engine->simple_chat(@messages);
my $response = $engine->simple_chat('Hello, how are you?');
Sends a synchronous chat request and returns the response text. Blocks until
the request completes.
=head2 chat_stream
my $request = $engine->chat_stream(@messages);
Builds and returns a streaming chat HTTP request object. Croaks if the engine
does not implement C<chat_stream_request>. Use L</simple_chat_stream> or
L</simple_chat_stream_iterator> to execute the request.
=head2 simple_chat_stream
my $content = $engine->simple_chat_stream($callback, @messages);
$engine->simple_chat_stream(sub {
my ($chunk) = @_;
print $chunk->content;
}, 'Tell me a story');
Sends a synchronous streaming chat request. Calls C<$callback> with each
L<Langertha::Stream::Chunk> as it arrives. Returns the complete concatenated
content string when done. Blocks until the stream completes.
=head2 simple_chat_stream_iterator
my $stream = $engine->simple_chat_stream_iterator(@messages);
while (my $chunk = $stream->next) {
print $chunk->content;
}
Returns a L<Langertha::Stream> iterator. The full response is fetched
synchronously and buffered; iteration yields each L<Langertha::Stream::Chunk>
in order.
=head2 simple_chat_f
# Traditional Future style
my $response = $engine->simple_chat_f(@messages)->get;
# With async/await (recommended)
use Future::AsyncAwait;
async sub my_chat {
my $response = await $engine->simple_chat_f(@messages);
return $response;
}
Async version of L</simple_chat>. Returns a L<Future> that resolves to the
response text. Uses L<Net::Async::HTTP> internally; loaded lazily on first call.
For requests that need named arguments (tools, tool_choice,
response_format, etc.) use L</chat_f>; C<simple_chat_f> delegates to it.
=head2 chat_f
my $response = await $engine->chat_f(
messages => [ ... ],
tools => [ $tool, ... ],
tool_choice => { type => 'tool', name => 'extract' },
response_format => { ... },
# any other engine-specific extras pass straight through
);
Async I<single-turn> chat with named arguments. Returns a L<Future>
resolving to a L<Langertha::Response>. The caller is responsible for
acting on any C<tool_calls> the engine emits â C<chat_f> does not
loop. For the multi-turn MCP tool-calling loop use
L<Langertha::Role::Tools/chat_with_tools_f> instead.
C<tools> in C<chat_f> can be a mix of provider-shape HashRefs
(OpenAI, Anthropic, MCP, Gemini); the engine's C<chat_request> handles
the per-provider serialization. The L<Langertha::Tool> value object is
the canonical normalizer (C<from_hash> accepts every shape, the
C<to_PROVIDER> methods produce the wire payload).
When the caller asks for a forced named tool on an engine that cannot
do native named-tool-forcing but supports C<json_schema>
response_format (currently L<Langertha::Engine::Perplexity>), the
request is automatically rewritten to use the JSON Schema path and the
response is loose-parsed; the resulting L<Langertha::Response> exposes
the parsed arguments via L<Langertha::Response/tool_call_args> with
C<synthetic =E<gt> 1> on the synthesized tool_call entry.
=head2 simple_chat_stream_f
my ($content, $chunks) = $engine->simple_chat_stream_f(@messages)->get;
Async streaming without a real-time callback. Convenience wrapper around
L</simple_chat_stream_realtime_f> with C<undef> as the callback. Returns a
L<Future> that resolves to C<($content, \@chunks)>.
=head2 aggregate_tool_calls
my $tool_calls = $engine->aggregate_tool_calls( $chunks );
Walks an ArrayRef of L<Langertha::Stream::Chunk> objects and returns
the flat list of L<Langertha::ToolCall> objects collected from any
chunks that carry C<tool_calls>. Returns an empty ArrayRef if none of
the chunks emitted tool calls.
This is the streaming counterpart to L<Langertha::Response/tool_calls>.
Engines that need to assemble fragmented tool-call deltas (OpenAI's
C<delta.tool_calls> stream, Anthropic's C<input_json_delta>) are
expected to do that assembly inside C<parse_stream_chunk> and attach
the finished L<Langertha::ToolCall> to the relevant chunk; this
helper just collects them.
=head2 simple_chat_stream_realtime_f
# With async/await (recommended)
use Future::AsyncAwait;
async sub my_stream {
my ($content, $chunks) = await $engine->simple_chat_stream_realtime_f(
sub { print shift->content },
@messages
);
return $content;
}
# Traditional Future style
my $future = $engine->simple_chat_stream_realtime_f($callback, @messages);
my ($content, $chunks) = $future->get;
Async streaming with real-time callback. C<$callback> is called with each
L<Langertha::Stream::Chunk> as it arrives from the server. Returns a L<Future>
that resolves to C<($content, \@chunks)> where C<$content> is the full
concatenated text.
This is the recommended method for real-time streaming in async applications.
Pass C<undef> as the callback (or use L</simple_chat_stream_f>) if you only
need the final result.
=head2 content_format
my $fmt = $engine->content_format; # 'openai' | 'anthropic' | 'gemini'
Wire format for multimodal content blocks. Controls how
L<Langertha::Content> objects embedded in a message's C<content> arrayref
are serialized during L</chat_messages>. Defaults to C<'openai'>; overridden
by L<Langertha::Engine::AnthropicBase> and L<Langertha::Engine::Gemini>.
=head2 engine_capabilities
my $caps = $engine->engine_capabilities;
if ( $caps->{tool_choice_named} ) { ... }
Returns a HashRef of capability flags so callers can avoid passing
parameters the engine cannot honour.
The base implementation reports only what L<Langertha::Role::Chat>
itself provides (C<chat>). Every other capability-bearing role
(L<Langertha::Role::Tools>, L<Langertha::Role::ResponseFormat>,
L<Langertha::Role::Streaming>, L<Langertha::Role::Embedding>,
L<Langertha::Role::Transcription>, L<Langertha::Role::ImageGeneration>,
L<Langertha::Role::HermesTools>, L<Langertha::Role::Temperature>,
L<Langertha::Role::Seed>, L<Langertha::Role::ContextSize>,
L<Langertha::Role::ResponseSize>, L<Langertha::Role::SystemPrompt>,
L<Langertha::Role::ParallelToolUse>) hangs its own contribution into
this method via C<around engine_capabilities>. Engines override (also
via C<around>) when the wire reality differs from the role inventory
â for example to clear C<tool_choice_named> on providers that only
accept string forms.
Common keys produced by the bundled roles:
=over
=item * C<chat> â C<simple_chat>/C<simple_chat_f> work
=item * C<streaming> â C<chat_stream_request> is wired up
=item * C<tools_native> â engine accepts a C<tools> array on the wire
=item * C<tools_hermes> â tools are injected via Hermes-style XML
prompt rather than (or in addition to) the native API
=item * C<tool_choice_auto> / C<tool_choice_any> / C<tool_choice_none> â
which string-form C<tool_choice> values are accepted
=item * C<tool_choice_named> â C<{type =E<gt> 'tool', name =E<gt> '...'}>
forcing works (possibly translated internally â Gemini routes named
tools through C<allowed_function_names>, for example)
=item * C<response_format_json_object> â C<{type =E<gt> 'json_object'}>
=item * C<response_format_json_schema> â JSON Schema structured output
=item * C<embedding>, C<transcription>, C<image_generation> â auxiliary
capabilities matching the corresponding roles
=item * C<temperature>, C<seed>, C<context_size>, C<response_size>,
C<system_prompt>, C<parallel_tool_use> â generation-parameter knobs
the engine will honour
=back
Callers should treat the hash as advisory â a missing key means
"unknown / unsupported", a true value means "the engine claims it
will honour this".
=head1 SEE ALSO
=over
=item * L<Langertha::Role::Langfuse> - Observability integration (composed by this role)
=item * L<Langertha::Role::SystemPrompt> - System prompt injection
=item * L<Langertha::Role::Streaming> - Stream parsing (SSE / NDJSON)
=item * L<Langertha::Role::Tools> - Tool calling on top of chat
=item * L<Langertha::Role::Models> - Model selection
=item * L<Langertha::Stream> - Stream iterator
=item * L<Langertha::Stream::Chunk> - Individual stream chunk
=back
=head1 SUPPORT
=head2 Issues
Please report bugs and feature requests on GitHub at
L<https://github.com/Getty/langertha/issues>.
=head2 IRC
Join C<#langertha> on C<irc.perl.org> or message Getty directly.
( run in 1.031 second using v1.01-cache-2.11-cpan-5a3173703d6 )