AnyEvent-Twitter-Stream

 view release on metacpan or  search on metacpan

t/timeout.t  view on Meta::CPAN

use AnyEvent::Twitter::Stream;
use AnyEvent::Util qw(guard);
use Data::Dumper;
use JSON;
use Test::More;
use Test::TCP;
use Test::Requires qw(Plack::Builder Plack::Handler::Twiggy Try::Tiny);
use Test::Requires { 'Plack::Request' => '0.99' };

my %pattern = (
    wait_0_0_0 => 3,
    wait_5_0_0 => 0,
    wait_0_5_0 => 0,
    wait_0_0_5 => 1,
);

test_tcp(
    client => sub {
        my $port = shift;

        $AnyEvent::Twitter::Stream::STREAMING_SERVER = "localhost:$port";
        $AnyEvent::Twitter::Stream::PROTOCOL = 'http'; # real world API uses https

        foreach my $w (keys %pattern) {
            my $destroyed;
            my $received = 0;

            {
                my $done = AE::cv;
                my $streamer = AnyEvent::Twitter::Stream->new(
                    username => 'test',
                    password => 's3cr3t',
                    method => 'filter',
                    track => $w,
                    timeout => 2,
                    on_tweet => sub {
                        my $tweet = shift;
                        $done->send, return if $tweet->{count} > 2;

                        note(Dumper $tweet);
                        $received++;
                    },
                    on_error => sub {
                        my $msg = $_[2] || $_[0];
                        note("on_error: $msg");
                        $done->send;
                    }
                );
                $streamer->{_guard_for_testing} = guard { $destroyed = 1 };

                $done->recv;
            }

            is($destroyed, 1, "$w: destroyed");
            is($received, $pattern{$w}, "received");
        }
    },
    server => sub {
        my $port = shift;

        run_streaming_server($port);
    },
);

done_testing();


sub run_streaming_server {
    my $port = shift;

    my $streaming = sub {
        my $env = shift;
        my $req = Plack::Request->new($env);

        my $track = $req->param('track');
        my (undef, $wait_a, $wait_b, $wait_c) = split(/_/, $track);

        return sub {
            my $respond = shift;

            my $count = 0;
            my $writer;

            my $send_tweet = sub {
                my ($tweet) = @_;

                try {
                    $writer->write(encode_json({count => $count++, rand => rand}) . "\x0D\x0A");
                } catch {
                    note($_);
                };
            };

            my $t1; $t1 = AE::timer $wait_a, 0, sub {
                $writer = $respond->([200, [
                    'Content-Type' => 'application/json'
                ]]);

                undef $t1;
            };

            my $t2; $t2 = AE::timer $wait_a + $wait_b, 0, sub {
                $send_tweet->();

                undef $t2;
            };

            my $t3; $t3 = AE::timer $wait_a + $wait_b + $wait_c, 0.5, sub {
                $send_tweet->();

                $t3;
            };
        };
    };


    my $app = builder {
        enable 'Auth::Basic', realm => 'Firehose', authenticator => sub {
            my ($user, $pass) = @_;

            return $user eq 'test' && $pass eq 's3cr3t';
        };
        mount '/1.1/' => $streaming;
    };

    my $server = Plack::Handler::Twiggy->new(
        host => '127.0.0.1',
        port => $port,
    )->run($app);
}



( run in 0.873 second using v1.01-cache-2.11-cpan-39bf76dae61 )