Langertha

 view release on metacpan or  search on metacpan

lib/Langertha/Plugin/Langfuse.pm  view on Meta::CPAN

      $opts{metadata}    ? ( metadata    => $opts{metadata} )    : (),
      $opts{tags}        ? ( tags        => $opts{tags} )        : (),
      $opts{user_id}     ? ( userId      => $opts{user_id} )     : (),
      $opts{session_id}  ? ( sessionId   => $opts{session_id} )  : (),
    },
  };
  return $id;
}


sub create_generation {
  my ( $self, %opts ) = @_;
  return unless $self->enabled;
  my $id = $opts{id} || _id();
  push @{$self->_batch}, {
    id        => _id(),
    type      => 'generation-create',
    timestamp => _timestamp(),
    body      => {
      id       => $id,
      traceId  => $opts{trace_id} // croak("create_generation requires trace_id"),
      name     => $opts{name} // 'generation',
      $opts{model}                ? ( model              => $opts{model} )              : (),
      $opts{input}                ? ( input              => $opts{input} )              : (),
      $opts{output}               ? ( output             => $opts{output} )             : (),
      $opts{usage}                ? ( usage              => $opts{usage} )              : (),
      $opts{start_time}           ? ( startTime          => $opts{start_time} )         : (),
      $opts{end_time}             ? ( endTime            => $opts{end_time} )           : (),
      $opts{parent_observation_id}? ( parentObservationId => $opts{parent_observation_id} ) : (),
      $opts{model_parameters}     ? ( modelParameters    => $opts{model_parameters} )   : (),
    },
  };
  return $id;
}


sub create_span {
  my ( $self, %opts ) = @_;
  return unless $self->enabled;
  my $id = $opts{id} || _id();
  push @{$self->_batch}, {
    id        => _id(),
    type      => 'span-create',
    timestamp => _timestamp(),
    body      => {
      id      => $id,
      traceId => $opts{trace_id} // croak("create_span requires trace_id"),
      $opts{name}                 ? ( name               => $opts{name} )               : (),
      $opts{input}                ? ( input              => $opts{input} )              : (),
      $opts{output}               ? ( output             => $opts{output} )             : (),
      $opts{start_time}           ? ( startTime          => $opts{start_time} )         : (),
      $opts{end_time}             ? ( endTime            => $opts{end_time} )           : (),
      $opts{parent_observation_id}? ( parentObservationId => $opts{parent_observation_id} ) : (),
      $opts{metadata}             ? ( metadata           => $opts{metadata} )           : (),
    },
  };
  return $id;
}


sub update_trace {
  my ( $self, %opts ) = @_;
  return unless $self->enabled;
  my $id = $opts{id} // croak("update_trace requires id");
  push @{$self->_batch}, {
    id        => _id(),
    type      => 'trace-create',
    timestamp => _timestamp(),
    body      => {
      id => $id,
      $opts{output}   ? ( output   => $opts{output} )   : (),
      $opts{metadata} ? ( metadata => $opts{metadata} )  : (),
    },
  };
  return $id;
}


sub flush {
  my ( $self ) = @_;
  return unless $self->enabled;
  my $batch = $self->_batch;
  return unless @$batch;

  require LWP::UserAgent;
  my $ua = LWP::UserAgent->new(agent => 'Langertha-Plugin-Langfuse/'.$VERSION);

  my $auth = encode_base64(
    $self->public_key . ':' . $self->secret_key, ''
  );

  my $body = $self->_json->encode({ batch => $batch });

  my $request = HTTP::Request->new(
    POST => $self->url . '/api/public/ingestion',
    [
      'Content-Type'  => 'application/json',
      'Authorization' => 'Basic ' . $auth,
    ],
    $body,
  );

  my $response = $ua->request($request);
  $self->_batch([]);

  unless ($response->is_success) {
    warn "Langfuse ingestion failed: " . $response->status_line;
  }

  return $response;
}


sub reset_trace {
  my ( $self ) = @_;
  $self->_trace_id(undef);
  $self->_iter_start(undef);
}





( run in 1.728 second using v1.01-cache-2.11-cpan-fe3c2283af0 )