App-bsky

 view release on metacpan or  search on metacpan

lib/App/bsky.pm  view on Meta::CPAN

            }

            #~ warn ref $post;
            #~ use Data::Dump;
            #~ ddx $post;
            # TODO: Support image embeds as raw links
            $self->say(
                '%s%s%s%s%s (%s)',
                ' ' x ( $depth * 4 ),
                color('red'), $post->{author}{handle},
                color('reset'),
                defined $post->{author}{displayName} ? ' [' . $post->{author}{displayName} . ']' : '',
                $post->{record}{createdAt}
            );
            if ( $post->{embed} && defined $post->{embed}{images} ) {    # TODO: Check $post->embed->$type to match 'app.bsky.embed.images#view'
                $self->say( '%s%s', ' ' x ( $depth * 4 ), $_->{fullsize} ) for @{ $post->{embed}{images} };
            }
            $self->say( '%s%s', ' ' x ( $depth * 4 ), $post->{record}{text} );
            $self->say(
                '%s ❤️ %d 💬 %d 🔄 %d %s',
                ' ' x ( $depth * 4 ),
                $post->{likeCount}, $post->{replyCount}, $post->{repostCount},
                ( builtin::blessed $post->{uri} ? $post->{uri}->as_string : $post->{uri} )
            );
            $self->say( '%s', ' ' x ( $depth * 4 ) );
        }

        method cmd_timeline (@args) {
            GetOptionsFromArray( \@args, 'json!' => \my $json );
            my $tl = $bsky->getTimeline();
            if ( builtin::blessed $tl && $tl->isa('At::Error') ) {
                return $self->err( "Error fetching timeline: " . $tl->message );
            }
            unless ( $tl && $tl->{feed} ) {
                return $self->say("Timeline is empty.");
            }
            if ($json) {
                $self->say( JSON::Tiny::to_json( $tl->{feed} ) );
            }
            else {
                for my $item ( @{ $tl->{feed} } ) {
                    my $depth = 0;
                    if ( $item->{reply} && $item->{reply}{parent} ) {
                        $self->_dump_post( $depth, $item->{reply}{parent} );
                        $depth = 1;
                    }
                    $self->_dump_post( $depth, $item->{post} );
                }
            }
            return scalar @{ $tl->{feed} };
        }
        method cmd_tl (@args) { $self->cmd_timeline(@args); }

        method cmd_stream(@args) {
            GetOptionsFromArray( \@args, 'json|j' => \my $json );
            require Mojo::IOLoop;    # Ensure Mojo is available for the event loop
            require Archive::CAR::CID;
            require Archive::CAR;
            require Codec::CBOR;

            # Keep the loop alive even if the connection drops briefly
            my $keepalive = Mojo::IOLoop->recurring( 60 => sub { $self->say("[DEBUG] Firehose loop keepalive...") if $ENV{DEBUG}; } );
            my %profile_cache;
            my @profile_lru;
            my $MAX_CACHE     = 1000;
            my $cache_profile = sub ($p) {
                my $did = $p->{did};
                if ( exists $profile_cache{$did} ) {
                    @profile_lru = grep { $_ ne $did } @profile_lru;
                }
                push @profile_lru, $did;
                $profile_cache{$did} = $p;
                if ( @profile_lru > $MAX_CACHE ) {
                    my $oldest = shift @profile_lru;
                    delete $profile_cache{$oldest};
                }
            };
            my @post_queue;
            my %dids_to_resolve;
            my %did_fail_count;
            my $render_queue = sub {
                my @to_resolve = grep { ( $did_fail_count{$_} // 0 ) < 5 } keys %dids_to_resolve;
                if (@to_resolve) {
                    say "[DEBUG] Resolving " . scalar(@to_resolve) . " DIDs..." if $ENV{DEBUG};
                    while (@to_resolve) {
                        my @chunk = splice @to_resolve, 0, 25;
                        my $res   = $bsky->getProfiles( actors => \@chunk );
                        if ( ref $res eq 'ARRAY' || ( ref $res eq 'HASH' && $res->{profiles} ) ) {
                            my @profiles = ref $res eq 'ARRAY' ? @$res : @{ $res->{profiles} };
                            say "[DEBUG] Resolved " . scalar(@profiles) . " profiles" if $ENV{DEBUG};
                            for my $p (@profiles) {
                                $cache_profile->($p);
                                delete $dids_to_resolve{ $p->{did} };
                                delete $did_fail_count{ $p->{did} };
                            }

                            # If some didn't come back in the response, they might be invalid or deleted
                            # We'll increment their fail count if they are still in dids_to_resolve
                            for my $did (@chunk) {
                                if ( exists $dids_to_resolve{$did} ) {
                                    $did_fail_count{$did}++;
                                }
                            }
                        }
                        else {
                            say "[DEBUG] getProfiles failed: " . ( $res // 'undef' ) if $ENV{DEBUG};

                            # Increment fail count for the whole chunk
                            for my $did (@chunk) {
                                $did_fail_count{$did}++;
                            }
                        }
                    }
                }
                if (@post_queue) {
                    say "[DEBUG] Processing post queue with " . scalar(@post_queue) . " items" if $ENV{DEBUG};
                    @post_queue = sort { $a->{record}{createdAt} cmp $b->{record}{createdAt} } @post_queue;
                }
                while (@post_queue) {
                    my $item       = shift @post_queue;
                    my $repo       = $item->{repo};
                    my $record     = $item->{record};
                    my $ts         = $item->{ts};
                    my $author     = $profile_cache{$repo};
                    my $handle     = ( ref $author eq 'HASH' ) ? ( $author->{handle}      // $repo ) : $repo;
                    my $name       = ( ref $author eq 'HASH' ) ? ( $author->{displayName} // '' )    : '';
                    my $text       = $record->{text} // '[no text]';
                    my $reply_info = '';

                    if ( $record->{reply} && $record->{reply}{parent} ) {
                        my $parent_uri = $record->{reply}{parent}{uri};
                        if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
                            my $parent_did     = $1;
                            my $parent_profile = $profile_cache{$parent_did};
                            my $parent_handle  = ( ref $parent_profile eq 'HASH' ) ? $parent_profile->{handle} : $parent_did;
                            $reply_info = color('white') . " [in reply to \@" . $parent_handle . "]";
                        }
                    }
                    try {
                        $self->say( '%s%s    %s (%s)%s%s', color('white'), $ts, $name, '@' . $handle, $reply_info, color('reset') );
                        my $indented = $text;
                        $indented =~ s/^/   /mg;
                        $self->say($indented);
                        $self->say("");
                    }
                    catch ($e) {
                        try {
                            my $out = $text;
                            utf8::encode($out) if utf8::is_utf8($out);
                            $out =~ s/[^\x20-\x7E]/ /g;
                            $self->say( '%s%s    %s (%s)%s [sanitized]', color('white'), $ts, $name, '@' . $handle, $reply_info );
                            my $indented = $out;
                            $indented =~ s/^/   /mg;
                            $self->say($indented);
                            $self->say("");
                        }
                        catch ($e2) { }
                    }
                }
            };

            # Trigger rendering every 5 seconds
            Mojo::IOLoop->recurring( 5 => sub { $render_queue->() } );
            my $start_stream;
            $start_stream = sub {
                $self->say('[DEBUG] Starting firehose stream...') if $ENV{DEBUG} || 1;
                my $fh = $bsky->firehose(
                    sub ( $header, $body, $err ) {
                        try {
                            if ( defined $err ) {
                                warn 'Firehose error: ' . $err;

                                # Always try to reconnect if not explicitly fatal
                                if ( !$err->fatal ) {
                                    $self->say('[DEBUG] Attempting to reconnect in 5 seconds...') if $ENV{DEBUG} || 1;
                                    Mojo::IOLoop->timer( 5 => sub { $start_stream->() } );
                                }
                                else {
                                    $self->say('[DEBUG] Fatal firehose error. Exiting.') if $ENV{DEBUG} || 1;
                                    Mojo::IOLoop->remove($keepalive);
                                    Mojo::IOLoop->stop;
                                }
                                return;
                            }
                            if ($json) {
                                $self->say( JSON::Tiny::to_json( { header => $header, body => $body } ) );
                                return;
                            }

                            # Only process commit events for now
                            unless ( defined $header->{t} && $header->{t} eq '#commit' ) {
                                return;
                            }
                            for my $op ( @{ $body->{ops} } ) {
                                next unless $op->{action} eq 'create';
                                next unless $op->{path} =~ /^app\.bsky\.feed\.post\//;
                                try {
                                    # Decode the blocks to find the record
                                    require Archive::CAR::v1;
                                    my $car = Archive::CAR::v1->new();
                                    open my $cfh, '<:raw', \$body->{blocks};
                                    my %blocks = map { $_->{cid}->to_string => $_->{data} } $car->read($cfh)->blocks->@*;
                                    require Archive::CAR::CID;    # Ensure it's loaded for conversion
                                    my $cid_raw = $op->{cid};
                                    if ( ref $cid_raw eq 'HASH' && exists $cid_raw->{cid_raw} ) {
                                        $cid_raw = $cid_raw->{cid_raw};
                                    }
                                    my $target_cid_obj = Archive::CAR::CID->from_raw($cid_raw);
                                    my $record_bytes   = $blocks{ $target_cid_obj->to_string };
                                    next unless $record_bytes;
                                    require Codec::CBOR;
                                    my $codec  = Codec::CBOR->new();
                                    my $record = $codec->decode($record_bytes);
                                    next unless $record;
                                    my $repo = $body->{repo};
                                    my $ts   = $record->{createdAt} // '';
                                    $ts =~ s/T/ /;
                                    $ts =~ s/\..*Z//;

                                    # Queue for later rendering
                                    push @post_queue, { repo => $repo, record => $record, ts => $ts };
                                    $dids_to_resolve{$repo} = 1 unless exists $profile_cache{$repo};
                                    if ( $record->{reply} && $record->{reply}{parent} ) {
                                        my $parent_uri = $record->{reply}{parent}{uri};
                                        if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
                                            my $parent_did = $1;
                                            $dids_to_resolve{$parent_did} = 1 unless exists $profile_cache{$parent_did};
                                        }
                                    }
                                }
                                catch ($e) {
                                    warn "CAR/CBOR decoding error for op on repo " . $body->{repo} . ": $e";
                                }
                            }
                        }
                        catch ($e) {
                            warn "Error processing firehose event: $e";
                        }
                    }
                );



( run in 1.596 second using v1.01-cache-2.11-cpan-df04353d9ac )