App-bsky
view release on metacpan or search on metacpan
lib/App/bsky.pm view on Meta::CPAN
}
#~ warn ref $post;
#~ use Data::Dump;
#~ ddx $post;
# TODO: Support image embeds as raw links
$self->say(
'%s%s%s%s%s (%s)',
' ' x ( $depth * 4 ),
color('red'), $post->{author}{handle},
color('reset'),
defined $post->{author}{displayName} ? ' [' . $post->{author}{displayName} . ']' : '',
$post->{record}{createdAt}
);
if ( $post->{embed} && defined $post->{embed}{images} ) { # TODO: Check $post->embed->$type to match 'app.bsky.embed.images#view'
$self->say( '%s%s', ' ' x ( $depth * 4 ), $_->{fullsize} ) for @{ $post->{embed}{images} };
}
$self->say( '%s%s', ' ' x ( $depth * 4 ), $post->{record}{text} );
$self->say(
'%s â¤ï¸ %d ð¬ %d ð %d %s',
' ' x ( $depth * 4 ),
$post->{likeCount}, $post->{replyCount}, $post->{repostCount},
( builtin::blessed $post->{uri} ? $post->{uri}->as_string : $post->{uri} )
);
$self->say( '%s', ' ' x ( $depth * 4 ) );
}
method cmd_timeline (@args) {
GetOptionsFromArray( \@args, 'json!' => \my $json );
my $tl = $bsky->getTimeline();
if ( builtin::blessed $tl && $tl->isa('At::Error') ) {
return $self->err( "Error fetching timeline: " . $tl->message );
}
unless ( $tl && $tl->{feed} ) {
return $self->say("Timeline is empty.");
}
if ($json) {
$self->say( JSON::Tiny::to_json( $tl->{feed} ) );
}
else {
for my $item ( @{ $tl->{feed} } ) {
my $depth = 0;
if ( $item->{reply} && $item->{reply}{parent} ) {
$self->_dump_post( $depth, $item->{reply}{parent} );
$depth = 1;
}
$self->_dump_post( $depth, $item->{post} );
}
}
return scalar @{ $tl->{feed} };
}
method cmd_tl (@args) { $self->cmd_timeline(@args); }
method cmd_stream(@args) {
GetOptionsFromArray( \@args, 'json|j' => \my $json );
require Mojo::IOLoop; # Ensure Mojo is available for the event loop
require Archive::CAR::CID;
require Archive::CAR;
require Codec::CBOR;
# Keep the loop alive even if the connection drops briefly
my $keepalive = Mojo::IOLoop->recurring( 60 => sub { $self->say("[DEBUG] Firehose loop keepalive...") if $ENV{DEBUG}; } );
my %profile_cache;
my @profile_lru;
my $MAX_CACHE = 1000;
my $cache_profile = sub ($p) {
my $did = $p->{did};
if ( exists $profile_cache{$did} ) {
@profile_lru = grep { $_ ne $did } @profile_lru;
}
push @profile_lru, $did;
$profile_cache{$did} = $p;
if ( @profile_lru > $MAX_CACHE ) {
my $oldest = shift @profile_lru;
delete $profile_cache{$oldest};
}
};
my @post_queue;
my %dids_to_resolve;
my %did_fail_count;
my $render_queue = sub {
my @to_resolve = grep { ( $did_fail_count{$_} // 0 ) < 5 } keys %dids_to_resolve;
if (@to_resolve) {
say "[DEBUG] Resolving " . scalar(@to_resolve) . " DIDs..." if $ENV{DEBUG};
while (@to_resolve) {
my @chunk = splice @to_resolve, 0, 25;
my $res = $bsky->getProfiles( actors => \@chunk );
if ( ref $res eq 'ARRAY' || ( ref $res eq 'HASH' && $res->{profiles} ) ) {
my @profiles = ref $res eq 'ARRAY' ? @$res : @{ $res->{profiles} };
say "[DEBUG] Resolved " . scalar(@profiles) . " profiles" if $ENV{DEBUG};
for my $p (@profiles) {
$cache_profile->($p);
delete $dids_to_resolve{ $p->{did} };
delete $did_fail_count{ $p->{did} };
}
# If some didn't come back in the response, they might be invalid or deleted
# We'll increment their fail count if they are still in dids_to_resolve
for my $did (@chunk) {
if ( exists $dids_to_resolve{$did} ) {
$did_fail_count{$did}++;
}
}
}
else {
say "[DEBUG] getProfiles failed: " . ( $res // 'undef' ) if $ENV{DEBUG};
# Increment fail count for the whole chunk
for my $did (@chunk) {
$did_fail_count{$did}++;
}
}
}
}
if (@post_queue) {
say "[DEBUG] Processing post queue with " . scalar(@post_queue) . " items" if $ENV{DEBUG};
@post_queue = sort { $a->{record}{createdAt} cmp $b->{record}{createdAt} } @post_queue;
}
while (@post_queue) {
my $item = shift @post_queue;
my $repo = $item->{repo};
my $record = $item->{record};
my $ts = $item->{ts};
my $author = $profile_cache{$repo};
my $handle = ( ref $author eq 'HASH' ) ? ( $author->{handle} // $repo ) : $repo;
my $name = ( ref $author eq 'HASH' ) ? ( $author->{displayName} // '' ) : '';
my $text = $record->{text} // '[no text]';
my $reply_info = '';
if ( $record->{reply} && $record->{reply}{parent} ) {
my $parent_uri = $record->{reply}{parent}{uri};
if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
my $parent_did = $1;
my $parent_profile = $profile_cache{$parent_did};
my $parent_handle = ( ref $parent_profile eq 'HASH' ) ? $parent_profile->{handle} : $parent_did;
$reply_info = color('white') . " [in reply to \@" . $parent_handle . "]";
}
}
try {
$self->say( '%s%s %s (%s)%s%s', color('white'), $ts, $name, '@' . $handle, $reply_info, color('reset') );
my $indented = $text;
$indented =~ s/^/ /mg;
$self->say($indented);
$self->say("");
}
catch ($e) {
try {
my $out = $text;
utf8::encode($out) if utf8::is_utf8($out);
$out =~ s/[^\x20-\x7E]/ /g;
$self->say( '%s%s %s (%s)%s [sanitized]', color('white'), $ts, $name, '@' . $handle, $reply_info );
my $indented = $out;
$indented =~ s/^/ /mg;
$self->say($indented);
$self->say("");
}
catch ($e2) { }
}
}
};
# Trigger rendering every 5 seconds
Mojo::IOLoop->recurring( 5 => sub { $render_queue->() } );
my $start_stream;
$start_stream = sub {
$self->say('[DEBUG] Starting firehose stream...') if $ENV{DEBUG} || 1;
my $fh = $bsky->firehose(
sub ( $header, $body, $err ) {
try {
if ( defined $err ) {
warn 'Firehose error: ' . $err;
# Always try to reconnect if not explicitly fatal
if ( !$err->fatal ) {
$self->say('[DEBUG] Attempting to reconnect in 5 seconds...') if $ENV{DEBUG} || 1;
Mojo::IOLoop->timer( 5 => sub { $start_stream->() } );
}
else {
$self->say('[DEBUG] Fatal firehose error. Exiting.') if $ENV{DEBUG} || 1;
Mojo::IOLoop->remove($keepalive);
Mojo::IOLoop->stop;
}
return;
}
if ($json) {
$self->say( JSON::Tiny::to_json( { header => $header, body => $body } ) );
return;
}
# Only process commit events for now
unless ( defined $header->{t} && $header->{t} eq '#commit' ) {
return;
}
for my $op ( @{ $body->{ops} } ) {
next unless $op->{action} eq 'create';
next unless $op->{path} =~ /^app\.bsky\.feed\.post\//;
try {
# Decode the blocks to find the record
require Archive::CAR::v1;
my $car = Archive::CAR::v1->new();
open my $cfh, '<:raw', \$body->{blocks};
my %blocks = map { $_->{cid}->to_string => $_->{data} } $car->read($cfh)->blocks->@*;
require Archive::CAR::CID; # Ensure it's loaded for conversion
my $cid_raw = $op->{cid};
if ( ref $cid_raw eq 'HASH' && exists $cid_raw->{cid_raw} ) {
$cid_raw = $cid_raw->{cid_raw};
}
my $target_cid_obj = Archive::CAR::CID->from_raw($cid_raw);
my $record_bytes = $blocks{ $target_cid_obj->to_string };
next unless $record_bytes;
require Codec::CBOR;
my $codec = Codec::CBOR->new();
my $record = $codec->decode($record_bytes);
next unless $record;
my $repo = $body->{repo};
my $ts = $record->{createdAt} // '';
$ts =~ s/T/ /;
$ts =~ s/\..*Z//;
# Queue for later rendering
push @post_queue, { repo => $repo, record => $record, ts => $ts };
$dids_to_resolve{$repo} = 1 unless exists $profile_cache{$repo};
if ( $record->{reply} && $record->{reply}{parent} ) {
my $parent_uri = $record->{reply}{parent}{uri};
if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
my $parent_did = $1;
$dids_to_resolve{$parent_did} = 1 unless exists $profile_cache{$parent_did};
}
}
}
catch ($e) {
warn "CAR/CBOR decoding error for op on repo " . $body->{repo} . ": $e";
}
}
}
catch ($e) {
warn "Error processing firehose event: $e";
}
}
);
( run in 1.596 second using v1.01-cache-2.11-cpan-df04353d9ac )