AnyEvent-Twitter-Stream

 view release on metacpan or  search on metacpan

lib/AnyEvent/Twitter/Stream.pm  view on Meta::CPAN

my %methods = (
    filter     => [ POST => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/filter.json"   } ],
    sample     => [ GET  => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/sample.json"   } ],
    firehose   => [ GET  => sub { "$PROTOCOL://$STREAMING_SERVER/1.1/statuses/firehose.json" } ],
    userstream => [ GET  => sub { "$US_PROTOCOL://$USERSTREAM_SERVER/1.1/user.json"          } ],
    sitestream => [ GET  => sub { "$PROTOCOL://$SITESTREAM_SERVER/1.1/site.json"             } ],

    # DEPRECATED
    links      => [ GET  => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/links.json"   } ],
    retweet    => [ GET  => sub { "$PROTOCOL://$STREAMING_SERVER/1/statuses/retweet.json" } ],
);

sub new {
    my $class = shift;
    my %args  = @_;

    my $username        = delete $args{username};
    my $password        = delete $args{password};
    my $consumer_key    = delete $args{consumer_key};
    my $consumer_secret = delete $args{consumer_secret};
    my $token           = delete $args{token};
    my $token_secret    = delete $args{token_secret};
    my $method          = delete $args{method};
    my $on_connect      = delete $args{on_connect} || sub { };
    my $on_tweet        = delete $args{on_tweet};
    my $on_error        = delete $args{on_error} || sub { die @_ };
    my $on_eof          = delete $args{on_eof} || sub { };
    my $on_keepalive    = delete $args{on_keepalive} || sub { };
    my $on_delete       = delete $args{on_delete};
    my $on_friends      = delete $args{on_friends};
    my $on_direct_message = delete $args{on_direct_message};
    my $on_event        = delete $args{on_event};
    my $timeout         = delete $args{timeout};

    my $decode_json;
    unless (delete $args{no_decode_json}) {
        require JSON;
        $decode_json = 1;
    }

    my ($zlib, $_zstatus);
    if (delete $args{use_compression}){
        ($zlib, $_zstatus)  = Compress::Raw::Zlib::Inflate->new(
            -LimitOutput => 1,
            -AppendOutput => 1,
            -WindowBits => WANT_GZIP_OR_ZLIB,
        );
        die "Can't make inflator: $_zstatus" unless $zlib;
    }

    unless ($methods{$method} || exists $args{api_url} ) {
        $on_error->("Method $method not available.");
        return;
    }

    my $uri = URI->new(delete $args{api_url} || $methods{$method}[1]());

    my $request_body;
    my $request_method = delete $args{request_method} || $methods{$method}[0] || 'GET';
    if ( $request_method eq 'POST' ) {
        $request_body = join '&', map "$_=" . URI::Escape::uri_escape_utf8($args{$_}), keys %args;
    }else{
        $uri->query_form(%args);
    }

    my $auth;
    if ($consumer_key) {
        eval {require Net::OAuth;};
        die $@ if $@;

        my $request = Net::OAuth->request('protected resource')->new(
            version          => '1.0',
            consumer_key     => $consumer_key,
            consumer_secret  => $consumer_secret,
            token            => $token,
            token_secret     => $token_secret,
            request_method   => $request_method,
            signature_method => 'HMAC-SHA1',
            timestamp        => time,
            nonce            => MIME::Base64::encode( time . $$ . rand ),
            request_url      => $uri,
            $request_method eq 'POST' ? (extra_params => \%args) : (),
        );
        $request->sign;
        $auth = $request->to_authorization_header;
    }else{
        $auth = "Basic ".MIME::Base64::encode("$username:$password", '');
    }

    my $self = bless {}, $class;

    {
        Scalar::Util::weaken(my $self = $self);

        my $set_timeout = $timeout
            ? sub { $self->{timeout} = AE::timer($timeout, 0, sub { $on_error->('timeout') }) }
            : sub {};

        my $on_json_message = sub {
            my ($json) = @_;

            # Twitter stream returns "\x0a\x0d\x0a" if there's no matched tweets in ~30s.
            $set_timeout->();
            if ($json !~ /^\s*$/) {
                my $tweet = $decode_json ? JSON::decode_json($json) : $json;
                if ($on_delete && $tweet->{delete} && $tweet->{delete}->{status}) {
                    $on_delete->($tweet->{delete}->{status}->{id}, $tweet->{delete}->{status}->{user_id});
                }elsif($on_friends && $tweet->{friends}) {
                    $on_friends->($tweet->{friends});
                }elsif($on_direct_message && $tweet->{direct_message}) {
                    $on_direct_message->($tweet->{direct_message});
                }elsif($on_event && $tweet->{event}) {
                    $on_event->($tweet);
                }else{
                    $on_tweet->($tweet);
                }
            }
            else {
                $on_keepalive->();
            }
        };



( run in 1.729 second using v1.01-cache-2.11-cpan-483215c6ad5 )