App-bsky
view release on metacpan or search on metacpan
lib/App/bsky.pm view on Meta::CPAN
say "[DEBUG] Processing post queue with " . scalar(@post_queue) . " items" if $ENV{DEBUG};
@post_queue = sort { $a->{record}{createdAt} cmp $b->{record}{createdAt} } @post_queue;
}
while (@post_queue) {
my $item = shift @post_queue;
my $repo = $item->{repo};
my $record = $item->{record};
my $ts = $item->{ts};
my $author = $profile_cache{$repo};
my $handle = ( ref $author eq 'HASH' ) ? ( $author->{handle} // $repo ) : $repo;
my $name = ( ref $author eq 'HASH' ) ? ( $author->{displayName} // '' ) : '';
my $text = $record->{text} // '[no text]';
my $reply_info = '';
if ( $record->{reply} && $record->{reply}{parent} ) {
my $parent_uri = $record->{reply}{parent}{uri};
if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
my $parent_did = $1;
my $parent_profile = $profile_cache{$parent_did};
my $parent_handle = ( ref $parent_profile eq 'HASH' ) ? $parent_profile->{handle} : $parent_did;
$reply_info = color('white') . " [in reply to \@" . $parent_handle . "]";
}
}
try {
$self->say( '%s%s %s (%s)%s%s', color('white'), $ts, $name, '@' . $handle, $reply_info, color('reset') );
my $indented = $text;
$indented =~ s/^/ /mg;
$self->say($indented);
$self->say("");
}
catch ($e) {
try {
my $out = $text;
utf8::encode($out) if utf8::is_utf8($out);
$out =~ s/[^\x20-\x7E]/ /g;
$self->say( '%s%s %s (%s)%s [sanitized]', color('white'), $ts, $name, '@' . $handle, $reply_info );
my $indented = $out;
$indented =~ s/^/ /mg;
$self->say($indented);
$self->say("");
}
catch ($e2) { }
}
}
};
# Trigger rendering every 5 seconds
Mojo::IOLoop->recurring( 5 => sub { $render_queue->() } );
my $start_stream;
$start_stream = sub {
$self->say('[DEBUG] Starting firehose stream...') if $ENV{DEBUG} || 1;
my $fh = $bsky->firehose(
sub ( $header, $body, $err ) {
try {
if ( defined $err ) {
warn 'Firehose error: ' . $err;
# Always try to reconnect if not explicitly fatal
if ( !$err->fatal ) {
$self->say('[DEBUG] Attempting to reconnect in 5 seconds...') if $ENV{DEBUG} || 1;
Mojo::IOLoop->timer( 5 => sub { $start_stream->() } );
}
else {
$self->say('[DEBUG] Fatal firehose error. Exiting.') if $ENV{DEBUG} || 1;
Mojo::IOLoop->remove($keepalive);
Mojo::IOLoop->stop;
}
return;
}
if ($json) {
$self->say( JSON::Tiny::to_json( { header => $header, body => $body } ) );
return;
}
# Only process commit events for now
unless ( defined $header->{t} && $header->{t} eq '#commit' ) {
return;
}
for my $op ( @{ $body->{ops} } ) {
next unless $op->{action} eq 'create';
next unless $op->{path} =~ /^app\.bsky\.feed\.post\//;
try {
# Decode the blocks to find the record
require Archive::CAR::v1;
my $car = Archive::CAR::v1->new();
open my $cfh, '<:raw', \$body->{blocks};
my %blocks = map { $_->{cid}->to_string => $_->{data} } $car->read($cfh)->blocks->@*;
require Archive::CAR::CID; # Ensure it's loaded for conversion
my $cid_raw = $op->{cid};
if ( ref $cid_raw eq 'HASH' && exists $cid_raw->{cid_raw} ) {
$cid_raw = $cid_raw->{cid_raw};
}
my $target_cid_obj = Archive::CAR::CID->from_raw($cid_raw);
my $record_bytes = $blocks{ $target_cid_obj->to_string };
next unless $record_bytes;
require Codec::CBOR;
my $codec = Codec::CBOR->new();
my $record = $codec->decode($record_bytes);
next unless $record;
my $repo = $body->{repo};
my $ts = $record->{createdAt} // '';
$ts =~ s/T/ /;
$ts =~ s/\..*Z//;
# Queue for later rendering
push @post_queue, { repo => $repo, record => $record, ts => $ts };
$dids_to_resolve{$repo} = 1 unless exists $profile_cache{$repo};
if ( $record->{reply} && $record->{reply}{parent} ) {
my $parent_uri = $record->{reply}{parent}{uri};
if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
my $parent_did = $1;
$dids_to_resolve{$parent_did} = 1 unless exists $profile_cache{$parent_did};
}
}
}
catch ($e) {
warn "CAR/CBOR decoding error for op on repo " . $body->{repo} . ": $e";
}
}
}
catch ($e) {
( run in 2.546 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )