AnyEvent-Twitter-Stream

 view release on metacpan or  search on metacpan

t/streaming.t  view on Meta::CPAN

                                note(Dumper $tweet);
                                is($tweet->{user}, 'test');
                                is($tweet->{path}, $item->{path});
                                is_deeply($tweet->{param}, $item->{option});

                                if (%{$item->{option}}) {
                                    is($tweet->{request_method}, 'POST');
                                } else {
                                    is($tweet->{request_method}, 'GET');
                                }
                            } else {
                                $done->send, return if $tweet->{count} > $count_max;
                            }

                            $received++;
                        },
                        on_delete => sub {
                            my ($tweet_id, $user_id) = @_;
                            $deleted++;
                            $received++;
                        },
                        on_friends => sub {
                            my $friends = shift;
                            is_deeply($friends, [qw/1 2 3/]);
                        },
                        on_event => sub {
                            $event++;
                            $done->send;
                        },
                        on_error => sub {
                            my $msg = $_[2] || $_[0];
                            fail("on_error: $msg");
                            $done->send;
                        },
                        %{$item->{option}},
                    );
                    $streamer->{_guard_for_testing} = guard { $destroyed = 1 };

                    $done->recv;
                }

                if ($item->{method} eq 'sample') {
                    is $deleted, 1, 'deleted one tweet';
                } else {
                    is $deleted, 0, 'deleted no tweet';
                }

                if ($item->{method} =~ /userstream|sitestream/) {
                    is $event, 1, 'got one event';
                } else {
                    is $event, 0, 'got no event';
                    is($received, $count_max + 1, "received");
                }

                is $destroyed, 1, 'destroyed';
            }
        },
        server => sub {
            my $port = shift;

            run_streaming_server($port, $enable_chunked);
        },
    );
}

done_testing();

sub run_streaming_server {
    my ($port, $enable_chunked) = @_;

    my $streaming = sub {
        my $env = shift;
        my $req = Plack::Request->new($env);

        return sub {
            my $respond = shift;

            my $writer = $respond->([200, [
                'Content-Type' => 'application/json',
                'Server' => 'Jetty(6.1.17)',
            ]]);

            $writer->write(encode_json({
                hello => 1,
                path => $req->path,
                request_method => $req->method,
                user => $env->{REMOTE_USER},
                param => $req->parameters->mixed,
            }) . "\x0D\x0A");

            my $count = 1;
            my $t; $t = AE::timer(0, 0.2, sub {
                try {
                    $writer->write(encode_json({
                        body => 'x' x 500,
                        count => $count++,
                    }) . "\x0D\x0A");
                } catch {
                    undef $t;
                };
                if ($req->path =~ /sample/ && $count == 2) {
                    try {
                        $writer->write(encode_json({
                            delete => {status => {id => 1, user_id => 1}},
                            count => $count++,
                        }) . "\x0D\x0A");
                    } catch {
                        undef $t;
                    };
                }
            });
        };
    };

    my $user_stream = sub {
        my $env = shift;
        my $req = Plack::Request->new($env);

        return sub {
            my $respond = shift;

            my $writer = $respond->([200, [
                'Content-Type' => 'application/json',
                'Server' => 'Jetty(6.1.17)',
            ]]);
            $writer->write(encode_json({
                friends => [qw/1 2 3/],
            }) . "\x0D\x0A");

            my $t; $t = AE::timer(0, 0.2, sub {
                try {
                    $writer->write(encode_json({
                        event => {foo => 'bar'},
                    }) . "\x0D\x0A");
                }catch{
                    undef $t;
                };
            });
        };
    };

    my $app = builder {
        enable 'Auth::Basic', realm => 'Firehose', authenticator => sub {
            my ($user, $pass) = @_;

            return $user eq 'test' && $pass eq 's3cr3t';
        };
        enable 'Chunked' if $enable_chunked;

        mount '/1.1/statuses/' => $streaming;
        mount '/'              => $user_stream;
    };

    my $server = Plack::Handler::Twiggy->new(
        host => '127.0.0.1',
        port => $port,
    )->run($app);
}



( run in 0.765 second using v1.01-cache-2.11-cpan-39bf76dae61 )