At

 view release on metacpan or  search on metacpan

lib/At/Protocol/Firehose.pm  view on Meta::CPAN

class At::Protocol::Firehose 1.0 {
    use At::Error;
    use Codec::CBOR;
    field $at       : param;
    field $url      : param : reader //= 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos';
    field $callback : param;
    field $codec;
    ADJUST {
        # Ensure url is absolute
        $url   = 'wss://' . $url if $url !~ m[^wss?://];
        $codec = Codec::CBOR->new();
    }

    method start() {
        $at->http->websocket(
            $url => sub ( $msg, $err ) {
                return if !$msg && !defined $err;
                if ( defined $err ) {
                    $callback->( undef, undef, $err );
                    return;
                }
                return unless defined $msg && length $msg;
                try {
                    my @objects = $codec->decode_sequence($msg);
                    if ( @objects >= 2 ) {
                        $callback->( $objects[0], $objects[1], undef );
                    }
                    elsif ( @objects == 1 ) {
                        $callback->( $objects[0], undef, At::Error->new( message => 'Incomplete firehose message', fatal => 0 ) );
                    }
                }
                catch ($e) {
                    $callback->( undef, undef, At::Error->new( message => "Firehose decode failed: $e", fatal => 0 ) );
                }
            }
        );
    }
}
1;
__END__

=pod

=encoding utf-8

=head1 NAME

At::Protocol::Firehose - AT Protocol Firehose Client

=head1 SYNOPSIS

    my $fh = $at->firehose(sub ( $header, $body, $err ) {
        return warn $err if $err;
        say "Event type: " . $header->{t};
    });

    $fh->start();

=head1 DESCRIPTION

C<At::Protocol::Firehose> handles the real-time streaming of events from an AT Protocol relay or PDS. It decodes the
binary DAG-CBOR messages into Perl data structures using L<Codec::CBOR>.

Each message from the firehose consists of two parts:

=over

=item 1. B<Header>: A map containing the message type (C<t>) and optional operation (C<op>).

=item 2. B<Body>: The actual event data, which varies by message type.

=back

=head1 Methods

=head2 C<new( at =E<gt> $at, callback =E<gt> $cb, [ url =E<gt> $url ] )>

Constructor. C<url> defaults to the global Bluesky relay firehose.

=head2 C<start( )>

Starts the WebSocket connection. This is non-blocking and requires an event loop (like L<Mojo::IOLoop>) to be running.

=head1 SEE ALSO

L<At>, L<https://docs.bsky.app/docs/advanced-guides/firehose>

=head1 AUTHOR

Sanko Robinson E<lt>sanko@cpan.orgE<gt>

=head1 LICENSE

Copyright (c) 2026 Sanko Robinson. License: Artistic License 2.0.

=cut



( run in 0.718 second using v1.01-cache-2.11-cpan-140bd7fdf52 )