At
view release on metacpan or search on metacpan
@lexicon_paths = map { path($_) } ( ref $lexicon_paths_param eq 'ARRAY' ? @$lexicon_paths_param : ($lexicon_paths_param) );
if ( !defined $http ) {
my $ua_class;
try {
require Mojo::UserAgent;
$ua_class = 'At::UserAgent::Mojo';
}
catch ($e) {
$ua_class = 'At::UserAgent::Tiny';
}
$http = $ua_class->new();
}
$host = 'https://' . $host unless $host =~ /^https?:/;
$host = URI->new($host) unless builtin::blessed $host;
}
# OAuth Implementation
method _get_dpop_key() {
unless ($dpop_key) {
$dpop_key = Crypt::PK::ECC->new();
$dpop_key->generate_key('secp256r1');
}
return $dpop_key;
}
method oauth_discover ($handle) {
my $res = $self->resolve_handle($handle);
if ( builtin::blessed($res) && $res->isa('At::Error') ) { $res->throw; }
return unless $res && $res->{did};
my $pds = $self->pds_for_did( $res->{did} );
unless ($pds) { die 'Could not resolve PDS for DID: ' . $res->{did}; }
my ($protected) = $http->get( $pds . '/.well-known/oauth-protected-resource' );
if ( builtin::blessed($protected) && $protected->isa('At::Error') ) { $protected->throw; }
return unless $protected && $protected->{authorization_servers};
my $auth_server = $protected->{authorization_servers}[0];
my ($metadata) = $http->get( $auth_server . '/.well-known/oauth-authorization-server' );
if ( builtin::blessed($metadata) && $metadata->isa('At::Error') ) { $metadata->throw; }
return { pds => $pds, auth_server => $auth_server, metadata => $metadata, did => $res->{did} };
}
method oauth_start ( $handle, $client_id, $redirect_uri, $scope = 'atproto' ) {
my $discovery = $self->oauth_discover($handle);
die 'Failed to discover OAuth metadata for ' . $handle unless $discovery;
my $chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~';
my $code_verifier = Crypt::PRNG::random_string_from( $chars, 43 );
my $code_challenge = encode_base64url( sha256($code_verifier) );
$code_challenge =~ s/=+$//;
my $state = Crypt::PRNG::random_string_from( $chars, 16 );
$oauth_state = {
discovery => $discovery,
code_verifier => $code_verifier,
state => $state,
redirect_uri => $redirect_uri,
client_id => $client_id,
handle => $handle,
scope => $scope
};
# Prepare UA for DPoP
$http->set_tokens( undef, undef, 'DPoP', $self->_get_dpop_key() );
my $par_endpoint = $discovery->{metadata}{pushed_authorization_request_endpoint};
my $par_content = {
client_id => $client_id,
response_type => 'code',
code_challenge => $code_challenge,
code_challenge_method => 'S256',
redirect_uri => $redirect_uri,
state => $state,
scope => $scope,
aud => $discovery->{pds},
};
say '[DEBUG] [At] PAR request: ' . JSON::PP->new->ascii->encode($par_content) if $ENV{DEBUG};
my ($par_res) = $http->post(
$par_endpoint => {
headers => { DPoP => $http->_generate_dpop_proof( $par_endpoint, 'POST', 1 ) },
encoding => 'form',
content => $par_content,
skip_ath => 1
}
);
die 'PAR failed: ' . ( $par_res . '' ) if builtin::blessed $par_res;
say '[DEBUG] [At] PAR response: ' . JSON::PP->new->ascii->encode($par_res) if $ENV{DEBUG};
my $auth_uri = URI->new( $discovery->{metadata}{authorization_endpoint} );
$auth_uri->query_form( client_id => $client_id, request_uri => $par_res->{request_uri} );
return $auth_uri->as_string;
}
method oauth_callback ( $code, $state ) {
die 'OAuth state mismatch' unless $oauth_state && $state eq $oauth_state->{state};
my $token_endpoint = $oauth_state->{discovery}{metadata}{token_endpoint};
my $key = $self->_get_dpop_key();
my ($token_res) = $http->post(
$token_endpoint => {
headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
encoding => 'form',
content => {
grant_type => 'authorization_code',
code => $code,
client_id => $oauth_state->{client_id},
redirect_uri => $oauth_state->{redirect_uri},
code_verifier => $oauth_state->{code_verifier},
aud => $oauth_state->{discovery}{pds}
},
skip_ath => 1
}
);
die 'Token exchange failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
say '[DEBUG] [At] Token response: ' . JSON::PP->new->ascii->encode($token_res) if $ENV{DEBUG};
$session = At::Protocol::Session->new(
did => $token_res->{sub},
accessJwt => $token_res->{access_token},
refreshJwt => $token_res->{refresh_token},
handle => $oauth_state->{handle},
token_type => 'DPoP',
dpop_key_jwk => $key->export_key_jwk('private'),
client_id => $oauth_state->{client_id},
scope => $token_res->{scope},
pds => $oauth_state->{discovery}{pds}
);
$self->set_host( $oauth_state->{discovery}{pds} );
$http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
}
method oauth_refresh() {
return unless $session && $session->refreshJwt && $session->token_type eq 'DPoP';
my $discovery = $self->oauth_discover( $session->handle );
return unless $discovery;
my $token_endpoint = $discovery->{metadata}{token_endpoint};
my $key = $self->_get_dpop_key();
my $refresh_content = {
grant_type => 'refresh_token',
refresh_token => $session->refreshJwt,
client_id => $session->client_id // '',
aud => $discovery->{pds},
};
say '[DEBUG] [At] Refresh request: ' . JSON::PP->new->ascii->encode($refresh_content) if $ENV{DEBUG};
my ($token_res) = $http->post(
$token_endpoint => {
headers => { DPoP => $http->_generate_dpop_proof( $token_endpoint, 'POST', 1 ) },
encoding => 'form',
content => $refresh_content,
skip_ath => 1
}
);
die 'Refresh failed: ' . ( $token_res . '' ) if builtin::blessed $token_res;
$session = At::Protocol::Session->new(
did => $token_res->{sub},
accessJwt => $token_res->{access_token},
refreshJwt => $token_res->{refresh_token},
handle => $session->handle,
token_type => 'DPoP',
dpop_key_jwk => $key->export_key_jwk('private'),
client_id => $session->client_id,
scope => $token_res->{scope},
pds => $discovery->{pds}
);
$self->set_host( $discovery->{pds} );
return $http->set_tokens( $token_res->{access_token}, $token_res->{refresh_token}, 'DPoP', $key );
}
method collection_scope ( $collection, $action = 'create' ) {
return "repo:$collection?action=$action";
}
# Legacy Auth
method login( $identifier, $password ) {
warnings::warnif( At => 'login() (com.atproto.server.createSession) is deprecated. Please use OAuth instead.' );
my $res = $self->post( 'com.atproto.server.createSession' => { identifier => $identifier, password => $password } );
if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
else { $session = $res; }
return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, undef, undef ) : $session;
}
method resume ( $accessJwt, $refreshJwt, $token_type = 'Bearer', $dpop_key_jwk = (), $client_id = (), $handle = (), $pds = (), $scope = () ) {
my $access = $self->_decode_token($accessJwt);
my $refresh = $self->_decode_token($refreshJwt);
return unless $access;
my $key;
if ( $token_type eq 'DPoP' && $dpop_key_jwk ) {
$key = Crypt::PK::ECC->new();
$key->import_key( \$dpop_key_jwk );
$dpop_key = $key;
}
if ( $refresh && time > $access->{exp} && time < $refresh->{exp} ) {
if ( $token_type eq 'DPoP' ) { return $self->oauth_refresh(); }
else {
my $res = $self->post( 'com.atproto.server.refreshSession' => { refreshJwt => $refreshJwt } );
if ( $res && !builtin::blessed($res) ) { $session = At::Protocol::Session->new(%$res); }
else { $session = $res; }
return $session ? $http->set_tokens( $session->accessJwt, $session->refreshJwt, $token_type, $key ) : $session;
}
}
$session = At::Protocol::Session->new(
did => $access->{sub},
accessJwt => $accessJwt,
refreshJwt => $refreshJwt,
token_type => $token_type,
dpop_key_jwk => $dpop_key_jwk,
client_id => $client_id,
}
method peer_id_for_did ($did) {
my $doc = $self->resolve_did($did);
return unless $doc && ref $doc eq 'HASH' && $doc->{verificationMethod};
# Look for the primary signing key (usually the first one)
my $vm = $doc->{verificationMethod}[0];
my $pub_key_multibase = $vm->{publicKeyMultibase} // return;
# publicKeyMultibase for secp256k1 in atproto usually starts with 'z' (base58btc)
# and has a multicodec prefix.
require InterPlanetary::Multibase;
my $raw = InterPlanetary::Multibase->decode($pub_key_multibase);
# For secp256k1 (0xe7 multicodec), we need to extract the 33-byte compressed key
# and wrap it in the libp2p Protobuf PublicKey.
require Net::Libp2p::Crypto;
require InterPlanetary::Utils;
# Simplified: assume it's secp256k1 for now as per bsky standard
my $key_bytes = substr( $raw, 2 ); # Skip multicodec prefix (usually 2 bytes for 0xe7)
# Wrap in libp2p Protobuf (Type 2 = Secp256k1)
my $pk_pb = pack( 'C', ( 1 << 3 ) | 0 ) . InterPlanetary::Utils::encode_varint(2);
$pk_pb .= pack( 'C', ( 2 << 3 ) | 2 ) . InterPlanetary::Utils::encode_varint( length($key_bytes) ) . $key_bytes;
return Net::Libp2p::Crypto->peer_id_from_public_key($pk_pb);
}
method session() { $session //= $self->get('com.atproto.server.getSession'); $session; }
method get_block ( $cid_str, $target_peer_id = undef, $did = undef ) {
if ($ipfs_node) {
my $cid = InterPlanetary::CID->from_string($cid_str);
my $data = $ipfs_node->blockstore->get($cid);
return Future->done($data) if $data;
if ($target_peer_id) {
#~ say "[IPFS] Block $cid_str not found locally, attempting Bitswap from $target_peer_id...";
return $ipfs_node->host->dial( $target_peer_id, '/ipfs/bitswap/1.2.0' )->then(
sub ($ss) {
# The local bitswap handler needs to 'own' this stream to read the response
$bitswap->handle_stream($ss);
return $bitswap->request_block( $ss, $cid );
}
)->else(
sub {
my ($e) = @_;
#~ say "[IPFS] Bitswap failed: $e. Falling back to HTTP...";
return $self->_get_block_http( $cid_str, $did );
}
);
}
}
return $self->_get_block_http( $cid_str, $did );
}
method _get_block_http ( $cid_str, $did ) {
# If no DID provided, we can't fallback to sync endpoints
return Future->done(undef) unless $did;
#~ say "[HTTP] Fetching block $cid_str for $did via com.atproto.sync.getBlocks...";
# com.atproto.sync.getBlocks returns a CAR file
return Future->call(
sub {
my $car_data = $self->get( 'com.atproto.sync.getBlocks' => { did => $did, cids => [$cid_str] } );
return undef unless $car_data;
require Archive::CAR;
require Archive::CAR::v1;
my $car = Archive::CAR::v1->new();
open my $fh, '<', \$car_data;
$car->read($fh);
# Archive::CAR blocks is an arrayref of { cid => CIDobj, data => ... }
for my $block ( @{ $car->blocks } ) {
# CIDs are complex objects, to_string should work for matching
# But the CID in the CAR might be different multibase than $cid_str
# For now, simplistic comparison or raw check
return $block->{data} if $block->{cid}->to_string eq $cid_str;
}
return undef;
}
);
}
method get_repo_head ($did) { $self->get( 'com.atproto.sync.getHead' => { did => $did } ) }
sub _now { Time::Moment->now }
method _duration ($seconds) { $seconds || return '0 seconds'; $seconds = abs $seconds; return $seconds . ' seconds' }
method ratelimit_ ( $headers, $type, $meta //= () ) {
my %h = map { lc($_) => $headers->{$_} } keys %$headers;
return unless exists $h{'ratelimit-limit'};
my $rate = {
limit => $h{'ratelimit-limit'},
remaining => $h{'ratelimit-remaining'},
reset => $h{'ratelimit-reset'},
policy => $h{'ratelimit-policy'},
};
defined $meta ? $ratelimits{$type}{$meta} = $rate : $ratelimits{$type} = $rate;
}
method _ratecheck( $type, $meta //= () ) {
my $rate = defined $meta ? $ratelimits{$type}{$meta} : $ratelimits{$type};
return unless $rate && $rate->{reset};
if ( $rate->{remaining} <= 0 && time < $rate->{reset} ) {
my $wait = $rate->{reset} - time;
warnings::warnif( At => "Rate limit exceeded for $type. Reset in $wait seconds." );
}
elsif ( $rate->{remaining} < ( $rate->{limit} * 0.1 ) ) {
warnings::warnif( At => "Approaching rate limit for $type ($rate->{remaining} remaining)." );
}
}
}
1;
__END__
=pod
=encoding utf-8
See L<https://docs.bsky.app/docs/advanced-guides/rate-limits>
=head1 Getting Started
If you are new to the AT Protocol, the first thing to understand is that it is decentralized. Your data lives on a
Personal Data Server (PDS), but your identity is portable.
=head2 Identity (Handles and DIDs)
=over
=item * B<Handle>: A human-friendly name like C<alice.bsky.social>.
=item * B<DID>: A persistent, machine-friendly identifier like C<did:plc:z72i7...>.
=back
=head1 Authentication and Session Management
There are two ways to authenticate: the modern OAuth system and the legacy password system. Once authenticated, all
other methods (like C<get>, C<post>, and C<subscribe>) work the same way.
Developers of new code should be aware that the AT protocol is transitioning to OAuth and this library strongly
encourages its use.
=head2 The OAuth System (Recommended)
OAuth is the secure, modern way to authenticate. It uses DPoP (Demonstrating Proof-of-Possession) to ensure tokens
cannot be stolen and reused. It's a three step process:
=over
=item 1. Start the flow:
my $auth_url = $at->oauth_start(
'user.bsky.social',
'http://localhost', # Client ID
'http://127.0.0.1:8888/callback', # Redirect URI
'atproto transition:generic' # Scopes
);
=item 2. Redirect the user:
Open C<$auth_url> in a browser. After they approve, they will be redirected to your callback URL with C<code> and
C<state> parameters.
=item 3. Complete the callback:
$at->oauth_callback( $code, $state );
See the demonstration scripts C<eg/bsky_oauth.pl> and C<eg/mojo_oauth.pl> for both a CLI and web based examples.
=back
Once authenticated, you should store your session data securely so you can resume it later without requiring the user
to log in again.
=head3 Resuming an OAuth Session
You need to store the tokens, the DPoP key, and the PDS endpoint. The C<_raw> method on the session object provides a
simple hash for this purpose:
# After login, save the session
my $data = $at->session->_raw;
# ... store $data securely ...
# Later, resume the session
$at->resume(
$data->{accessJwt},
$data->{refreshJwt},
$data->{token_type},
$data->{dpop_key_jwk},
$data->{client_id},
$data->{handle},
$data->{pds}
);
=head2 The Legacy System (App Passwords)
Legacy authentication is simpler but less secure. It uses a single call to C<login>. B<Never use your main password;
always use an App Password.>
$at->login( 'user.bsky.social', 'your-app-password' );
Once authenticated, you should store your session data securely so you can resume it later without requiring the user
to log in again.
=head3 Resuming a Legacy Session
Legacy sessions only require the access and refresh tokens:
$at->resume( $access_jwt, $refresh_jwt );
B<Note:> In both cases, if the access token has expired, C<resume()> will automatically attempt to refresh it using the
refresh token.
=head1 Account Management
=head2 Creating an Account
You can create a new account using C<com.atproto.server.createAccount>. Note that PDS instances I<may> require an
invite code.
my $res = $at->post( 'com.atproto.server.createAccount' => {
handle => 'newuser.bsky.social',
email => 'user@example.com',
password => 'secure-password',
inviteCode => 'bsky-social-abcde'
});
=head1 Working With Data: Records and Repositories
Data in the AT Protocol is stored in 'repositories' as 'records'. Each record belongs to a 'collection' (defined by a
Lexicon).
=head2 Creating a Post
Posts are records in, for example, the C<app.bsky.feed.post> collection.
$at->post( 'com.atproto.repo.createRecord' => {
repo => $at->did,
collection => 'app.bsky.feed.post',
record => {
'$type' => 'app.bsky.feed.post',
text => 'Content of the post',
createdAt => At::_now->to_string,
}
});
=head2 Listing Records
To see what's in a collection:
my $res = $at->get( 'com.atproto.repo.listRecords' => {
repo => $at->did,
collection => 'app.bsky.feed.post',
limit => 10
});
for my $record (@{$res->{records}}) {
say $record->{value}{text};
}
=head1 Drinking from the Firehose: Real-time Streaming
The Firehose is a real-time stream of B<all> events happening on the network (or a specific PDS). This includes new
posts, likes, handle changes, deletions, and more.
=head2 Subscribing to the Firehose
my $fh = $at->firehose(sub ( $header, $body, $err ) {
if ($err) {
warn "Firehose error: $err";
return;
}
if ($header->{t} eq '#commit') {
say 'New commit in repo: ' . $body->{repo};
}
});
$fh->start();
B<Note:> The Firehose requires L<Codec::CBOR> and an async event loop to keep the connection alive. Currently, At.pm
supports L<Mojo::UserAgent> so you should usually use L<Mojo::IOLoop>:
use Mojo::IOLoop;
# ... setup firehose ...
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head1 Lexicon Caching
The AT Protocol defines its API endpoints using "Lexicons" (JSON schemas). This library uses these schemas to
automatically coerce API responses into Perl objects.
=head2 How it works
When you call a method like C<app.bsky.actor.getProfile>, the library:
=over
=item 1. B<Checks user-provided paths:> It looks in any directories passed to C<lexicon_paths>.
=item 2. B<Checks local storage:> It looks for the schema in the distribution's C<share> directory.
=item 3. B<Checks user cache:> It looks in C<~/.cache/atproto/lexicons/>.
=item 4. B<Downloads if missing:> If not found, it automatically downloads the schema from the
official AT Protocol repository and saves it to your user cache.
=back
This system ensures that the library can support new or updated features without requiring a new release of the Perl
module.
=head1 METHODS
=head2 C<new( [ host => ..., share => ... ] )>
Constructor.
Expected parameters include:
=over
=item C<host>
Host for the service. Defaults to C<bsky.social>.
=item C<share>
Location of lexicons. Defaults to the C<share> directory under the distribution.
=item C<lexicon_paths>
An optional path string or arrayref of paths to search for Lexicons before checking the default cache locations. Useful
for local development with a checkout of the C<atproto> repository.
=item C<http>
A pre-instantiated L<At::UserAgent> object. By default, this is auto-detected by checking for L<Mojo::UserAgent>,
falling back to L<HTTP::Tiny>.
=back
=head2 C<oauth_start( $handle, $client_id, $redirect_uri, [ $scope ] )>
Initiates the OAuth 2.0 Authorization Code flow. Returns the authorization URL.
=head2 C<oauth_callback( $code, $state )>
Exchanges the authorization code for tokens and completes the OAuth flow.
=head2 C<firehose( $callback, [ $url ] )>
Returns a new L<At::Protocol::Firehose> client. C<$url> defaults to the Bluesky relay firehose.
=head2 C<resolve_handle( $handle )>
Resolves a handle to a DID.
=head2 C<resolve_did_to_handle( $did )>
Reverse resolution: resolves a DID to its primary handle.
=head2 C<atproto_proxy( [ $service_did ] )>
Gets or sets the C<atproto-proxy> header value on the underlying user agent. When set, requests will be sent to the
primary C<host> but include this header, signaling the PDS to proxy the request to the specified service.
Example for Bluesky Chat:
$at->http->at_protocol_proxy("did:web:api.bsky.chat#bsky_chat");
=head2 C<upload_blob( $data, $mime_type )>
Uploads a raw binary blob to the PDS. Returns the blob's metadata (CID, etc).
=head2 C<create_record( $collection, $record, [ $rkey ] )>
Helper to create a new record in a specific collection. Automatically uses the authenticated user's DID.
=head2 C<delete_record( $collection, $rkey )>
Helper to delete a record from a specific collection.
=head2 C<put_record( $collection, $rkey, $record, [ $swapRecord ] )>
Helper to write a record (creating or updating it) at a specific rkey.
=head2 C<apply_writes( $writes, [ $swapCommit ] )>
Atomic multi-record update. C<$writes> should be an arrayref of create/update/delete operations.
=head2 C<collection_scope( $collection, [ $action ] )>
Helper to generate granular OAuth scopes (e.g., C<repo:app.bsky.feed.post?action=create>).
=head2 C<session()>
Returns the current L<At::Protocol::Session> object.
=head2 C<did()>
Returns the DID of the authenticated user.
=head2 C<peer_id_for_did( $did )>
Resolves an AT Protocol DID to a libp2p PeerID. This is used to discover the user's data on the P2P network.
=head2 C<get_repo_head( $did )>
Retrieves the current MST (Merkle Search Tree) root CID for a user's repository via the C<com.atproto.sync.getHead>
endpoint.
=head2 C<get_block( $cid_str, [ $target_peer_id ] )>
Retrieves a raw block by its CID. If an C<ipfs_node> was provided to the constructor, this method will:
=over
=item Check the local blockstore.
=item Attempt to fetch the block via Bitswap from the provided C<$target_peer_id>.
=item Fall back to the centralized PDS via HTTP if the block is not found in the P2P network.
=back
Returns a L<Future> that resolves to the block data.
=head1 Decentralized Data Synchronization
When an C<ipfs_node> is provided to the L<At> constructor, the library enables peer-to-peer data synchronization
compliant with the AT Protocol Sync specification (L<https://atproto.com/specs/sync>).
=head2 Peer-to-Peer Repository Mirroring
By combining C<peer_id_for_did> and C<get_block>, this library can mirror entire user repositories without relying on a
centralized Relay or PDS. The process involves:
=over
=item Identity bridging: Converting the user's DID to a libp2p PeerID.
=item Root resolution: Getting the latest MST root CID.
=item MST traversal: Recursively walking the Merkle Search Tree.
=item Block exchange: Using Bitswap to fetch missing blocks from peers.
=back
This decentralized approach significantly reduces the load on centralized infrastructure and enables data availability
even during outages of primary service providers.
=head1 ERROR HANDLING
Exception handling is carried out by returning L<At::Error> objects which have untrue boolean values.
=head1 See Also
L<Bluesky> - Bluesky client library
L<App::bsky> - Bluesky client on the command line
L<https://docs.bsky.app/docs/api/>
=head1 LICENSE
Copyright (C) Sanko Robinson.
This library is free software; you can redistribute it and/or modify it under the terms found in the Artistic License
2. Other copyrights, terms, and conditions may apply to data transmitted through this module.
( run in 0.755 second using v1.01-cache-2.11-cpan-5837b0d9d2c )