App-bsky

 view release on metacpan or  search on metacpan

lib/App/bsky.pm  view on Meta::CPAN

                    say "[DEBUG] Processing post queue with " . scalar(@post_queue) . " items" if $ENV{DEBUG};
                    @post_queue = sort { $a->{record}{createdAt} cmp $b->{record}{createdAt} } @post_queue;
                }
                while (@post_queue) {
                    my $item       = shift @post_queue;
                    my $repo       = $item->{repo};
                    my $record     = $item->{record};
                    my $ts         = $item->{ts};
                    my $author     = $profile_cache{$repo};
                    my $handle     = ( ref $author eq 'HASH' ) ? ( $author->{handle}      // $repo ) : $repo;
                    my $name       = ( ref $author eq 'HASH' ) ? ( $author->{displayName} // '' )    : '';
                    my $text       = $record->{text} // '[no text]';
                    my $reply_info = '';

                    if ( $record->{reply} && $record->{reply}{parent} ) {
                        my $parent_uri = $record->{reply}{parent}{uri};
                        if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
                            my $parent_did     = $1;
                            my $parent_profile = $profile_cache{$parent_did};
                            my $parent_handle  = ( ref $parent_profile eq 'HASH' ) ? $parent_profile->{handle} : $parent_did;
                            $reply_info = color('white') . " [in reply to \@" . $parent_handle . "]";
                        }
                    }
                    try {
                        $self->say( '%s%s    %s (%s)%s%s', color('white'), $ts, $name, '@' . $handle, $reply_info, color('reset') );
                        my $indented = $text;
                        $indented =~ s/^/   /mg;
                        $self->say($indented);
                        $self->say("");
                    }
                    catch ($e) {
                        try {
                            my $out = $text;
                            utf8::encode($out) if utf8::is_utf8($out);
                            $out =~ s/[^\x20-\x7E]/ /g;
                            $self->say( '%s%s    %s (%s)%s [sanitized]', color('white'), $ts, $name, '@' . $handle, $reply_info );
                            my $indented = $out;
                            $indented =~ s/^/   /mg;
                            $self->say($indented);
                            $self->say("");
                        }
                        catch ($e2) { }
                    }
                }
            };

            # Trigger rendering every 5 seconds
            Mojo::IOLoop->recurring( 5 => sub { $render_queue->() } );
            my $start_stream;
            $start_stream = sub {
                $self->say('[DEBUG] Starting firehose stream...') if $ENV{DEBUG} || 1;
                my $fh = $bsky->firehose(
                    sub ( $header, $body, $err ) {
                        try {
                            if ( defined $err ) {
                                warn 'Firehose error: ' . $err;

                                # Always try to reconnect if not explicitly fatal
                                if ( !$err->fatal ) {
                                    $self->say('[DEBUG] Attempting to reconnect in 5 seconds...') if $ENV{DEBUG} || 1;
                                    Mojo::IOLoop->timer( 5 => sub { $start_stream->() } );
                                }
                                else {
                                    $self->say('[DEBUG] Fatal firehose error. Exiting.') if $ENV{DEBUG} || 1;
                                    Mojo::IOLoop->remove($keepalive);
                                    Mojo::IOLoop->stop;
                                }
                                return;
                            }
                            if ($json) {
                                $self->say( JSON::Tiny::to_json( { header => $header, body => $body } ) );
                                return;
                            }

                            # Only process commit events for now
                            unless ( defined $header->{t} && $header->{t} eq '#commit' ) {
                                return;
                            }
                            for my $op ( @{ $body->{ops} } ) {
                                next unless $op->{action} eq 'create';
                                next unless $op->{path} =~ /^app\.bsky\.feed\.post\//;
                                try {
                                    # Decode the blocks to find the record
                                    require Archive::CAR::v1;
                                    my $car = Archive::CAR::v1->new();
                                    open my $cfh, '<:raw', \$body->{blocks};
                                    my %blocks = map { $_->{cid}->to_string => $_->{data} } $car->read($cfh)->blocks->@*;
                                    require Archive::CAR::CID;    # Ensure it's loaded for conversion
                                    my $cid_raw = $op->{cid};
                                    if ( ref $cid_raw eq 'HASH' && exists $cid_raw->{cid_raw} ) {
                                        $cid_raw = $cid_raw->{cid_raw};
                                    }
                                    my $target_cid_obj = Archive::CAR::CID->from_raw($cid_raw);
                                    my $record_bytes   = $blocks{ $target_cid_obj->to_string };
                                    next unless $record_bytes;
                                    require Codec::CBOR;
                                    my $codec  = Codec::CBOR->new();
                                    my $record = $codec->decode($record_bytes);
                                    next unless $record;
                                    my $repo = $body->{repo};
                                    my $ts   = $record->{createdAt} // '';
                                    $ts =~ s/T/ /;
                                    $ts =~ s/\..*Z//;

                                    # Queue for later rendering
                                    push @post_queue, { repo => $repo, record => $record, ts => $ts };
                                    $dids_to_resolve{$repo} = 1 unless exists $profile_cache{$repo};
                                    if ( $record->{reply} && $record->{reply}{parent} ) {
                                        my $parent_uri = $record->{reply}{parent}{uri};
                                        if ( $parent_uri =~ m[^at://(did:[^/]+)] ) {
                                            my $parent_did = $1;
                                            $dids_to_resolve{$parent_did} = 1 unless exists $profile_cache{$parent_did};
                                        }
                                    }
                                }
                                catch ($e) {
                                    warn "CAR/CBOR decoding error for op on repo " . $body->{repo} . ": $e";
                                }
                            }
                        }
                        catch ($e) {



( run in 2.546 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )