AnyEvent-Twitter-Stream
view release on metacpan or search on metacpan
lib/AnyEvent/Twitter/Stream.pm view on Meta::CPAN
my %methods = (
filter => [ POST => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/filter.json" } ],
sample => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/sample.json" } ],
firehose => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/firehose.json" } ],
userstream => [ GET => sub { "$US_PROTOCOL://$USERSTREAM_SERVER/1.1/user.json" } ],
sitestream => [ GET => sub { "$PROTOCOL://$SITESTREAM_SERVER/1.1/site.json" } ],
# DEPRECATED
links => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/links.json" } ],
retweet => [ GET => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/retweet.json" } ],
);
sub new {
my $class = shift;
my %args = @_;
my $username = delete $args{username};
my $password = delete $args{password};
my $consumer_key = delete $args{consumer_key};
my $consumer_secret = delete $args{consumer_secret};
my $token = delete $args{token};
my $token_secret = delete $args{token_secret};
my $method = delete $args{method};
my $on_connect = delete $args{on_connect} || sub { };
my $on_tweet = delete $args{on_tweet};
my $on_error = delete $args{on_error} || sub { die @_ };
my $on_eof = delete $args{on_eof} || sub { };
my $on_keepalive = delete $args{on_keepalive} || sub { };
my $on_delete = delete $args{on_delete};
my $on_friends = delete $args{on_friends};
my $on_direct_message = delete $args{on_direct_message};
my $on_event = delete $args{on_event};
my $timeout = delete $args{timeout};
my $decode_json;
unless (delete $args{no_decode_json}) {
require JSON;
$decode_json = 1;
}
my ($zlib, $_zstatus);
if (delete $args{use_compression}){
($zlib, $_zstatus) = Compress::Raw::Zlib::Inflate->new(
-LimitOutput => 1,
-AppendOutput => 1,
-WindowBits => WANT_GZIP_OR_ZLIB,
);
die "Can't make inflator: $_zstatus" unless $zlib;
}
unless ($methods{$method} || exists $args{api_url} ) {
$on_error->("Method $method not available.");
return;
}
my $uri = URI->new(delete $args{api_url} || $methods{$method}[1]());
my $request_body;
my $request_method = delete $args{request_method} || $methods{$method}[0] || 'GET';
if ( $request_method eq 'POST' ) {
$request_body = join '&', map "$_=" . URI::Escape::uri_escape_utf8($args{$_}), keys %args;
}else{
$uri->query_form(%args);
}
my $auth;
if ($consumer_key) {
eval {require Net::OAuth;};
die $@ if $@;
my $request = Net::OAuth->request('protected resource')->new(
version => '1.0',
consumer_key => $consumer_key,
consumer_secret => $consumer_secret,
token => $token,
token_secret => $token_secret,
request_method => $request_method,
signature_method => 'HMAC-SHA1',
timestamp => time,
nonce => MIME::Base64::encode( time . $$ . rand ),
request_url => $uri,
$request_method eq 'POST' ? (extra_params => \%args) : (),
);
$request->sign;
$auth = $request->to_authorization_header;
}else{
$auth = "Basic ".MIME::Base64::encode("$username:$password", '');
}
my $self = bless {}, $class;
{
Scalar::Util::weaken(my $self = $self);
my $set_timeout = $timeout
? sub { $self->{timeout} = AE::timer($timeout, 0, sub { $on_error->('timeout') }) }
: sub {};
my $on_json_message = sub {
my ($json) = @_;
# Twitter stream returns "\x0a\x0d\x0a" if there's no matched tweets in ~30s.
$set_timeout->();
if ($json !~ /^\s*$/) {
my $tweet = $decode_json ? JSON::decode_json($json) : $json;
if ($on_delete && $tweet->{delete} && $tweet->{delete}->{status}) {
$on_delete->($tweet->{delete}->{status}->{id}, $tweet->{delete}->{status}->{user_id});
}elsif($on_friends && $tweet->{friends}) {
$on_friends->($tweet->{friends});
}elsif($on_direct_message && $tweet->{direct_message}) {
$on_direct_message->($tweet->{direct_message});
}elsif($on_event && $tweet->{event}) {
$on_event->($tweet);
}else{
$on_tweet->($tweet);
}
}
else {
$on_keepalive->();
}
};
( run in 1.729 second using v1.01-cache-2.11-cpan-483215c6ad5 )