Finance-Bitcoin-Feed

 view release on metacpan or  search on metacpan

lib/Finance/Bitcoin/Feed/Pusher.pm  view on Meta::CPAN

# within these subroutines you will have access to the json response in a hash format.
sub trade {
    my $self = shift;
    my $data = shift;
    warn Data::Dumper->Dump([$data]);
    warn "*** I am the default trade()... you should overwride this method in your own package\n";
    return;
}

sub order_book {
    my $self = shift;
    my $data = shift;
    warn Data::Dumper->Dump([$data]);
    warn "** I am the default order_book()... you should overwride this method in your own package\n";
    return;
}

# end the methods you should definately override.

# This module is meant to be used as a base for your own module.
# Your own module will decide what to do with the incoming message through the
# trade() and order_book() routines.
#
# You should look at "test.pl" to see a basic example.

sub new { return (bless {} => shift)->init(@_) }

sub init {
    my ($self, %args) = @_;
    foreach my $attribute ($self->attributes) {
        $self->$attribute($args{$attribute}) if exists $args{$attribute};
    }
    return $self;
}

sub setup {
    my $self = shift;
    $self->channels([CHANNELS]) unless $self->channels;
    $self->protocol(PROTOCOL)   unless $self->protocol;
    $self->app_key(APP_KEY)     unless $self->app_key;
    $self->ssl(SSL)             unless $self->ssl;
    return;
}

sub go {
    my $self = shift;
    $self->setup;
    $self->handle;
    $self->just_wait;
    return;
}

sub handle {
    my $self = shift;
    $self->client(Protocol::WebSocket::Handshake::Client->new(url => $self->uri->as_string));
    $self->frame(Protocol::WebSocket::Frame->new);
    $self->{handle} = AnyEvent::Handle->new(
        connect => [$self->host, $self->port],
        tls     => $self->tls,
        tls_ctx     => {verify => 0},
        keepalive   => 1,
        wtimeout    => 50,
        on_connect  => $self->on_connect,
        on_read     => $self->on_read,
        on_wtimeout => $self->on_wtimeout,
        on_error    => $self->on_error,
        on_eof      => $self->on_eof,
    );
    return;
}

sub on_read {
    my $self = shift;
    return sub {
        my $handle = shift;
        my $chunk  = $handle->{rbuf};
        $handle->{rbuf} = undef;
        if (!$self->client->is_done) {
            $self->client->parse($chunk);
        }

        $self->frame->append($chunk);
        if ($self->frame->is_ping()) {
            $handle->push_write(
                $self->frame->new(
                    buffer => '',
                    type   => 'pong'
                )->to_bytes
            );
        }
        while (my $msg = $self->frame->next) {
            my $d;
            eval { $d = $self->json->decode($msg); } or do {
                my $e = $@;
                warn $self->now . ' - error: ' . $e;
                next;
            };

            if ($d->{event} eq 'pusher:connection_established') {
                say $self->now . ' - subscribing to events' if VERBOSE;
                foreach my $channel (@{$self->channels}) {
                    say $self->now . ' - requesting channel: ' . $channel
                        if VERBOSE;
                    $handle->push_write(
                        $self->frame->new(
                            $self->json->encode({
                                    event => 'pusher:subscribe',
                                    data  => {
                                        channel => $channel,
                                    },
                                })
                        )->to_bytes
                    );
                }
            } elsif ($d->{event} eq 'pusher_internal:subscription_succeeded') {
                printf("%s - subscribed to channel: %s\n", $self->now, $d->{channel})
                    if VERBOSE;
            }

            elsif ($d->{event} eq 'trade') {
                printf("%s - got %s request on channel: %s\n", $self->now, @{$d}{qw(event channel)})



( run in 2.445 seconds using v1.01-cache-2.11-cpan-ceb78f64989 )