AnyEvent-Lingr
view release on metacpan or search on metacpan
lib/AnyEvent/Lingr.pm view on Meta::CPAN
has 'counter' => (
is => 'rw',
isa => 'Int',
);
has '_polling_guard' => (
is => 'rw',
clearer => '_clear_polling_guard',
);
no Mouse;
sub request {
my ($self, $http_method, $method, $params, $cb) = @_;
my $uri = URI->new($self->endpoint . $method);
$uri->query_form($params);
my $cb_wrap = sub {
my ($body, $hdr) = @_;
my $json = try { decode_json $body };
$cb->($json, $hdr);
};
if ($http_method eq 'GET') {
http_get $uri, $cb_wrap;
} elsif ($http_method eq 'POST') {
my $body = $uri->query;
$uri->query(undef);
http_post $uri, $body, $cb_wrap;
} else {
croak "unsupported http method: $http_method"
}
1;
}
sub get {
shift->request('GET', @_);
}
sub post {
shift->request('POST', @_);
}
sub _on_error {
my ($self, $res, $hdr) = @_;
$self->_clear_polling_guard;
if (my $cb = $self->on_error) {
if ($res) {
$cb->($res->{detail});
}
else {
$cb->($hdr->{Status} . ': ' . $hdr->{Reason});
}
}
else {
debugf 'on_error callback does not set';
critf "res:%s hdr:%s", ddf($res), ddf($hdr);
}
}
sub start_session {
my ($self) = @_;
debugf "starting session...";
if ($self->session) {
debugf "found old session:%s reusing...", $self->session;
$self->get('session/verify', { session => $self->session }, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
infof "session verified: %s", $res->{session};
$self->_get_channels;
}
else {
debugf "session verify failed: %s", ddf($res || $hdr);
$self->session(undef);
$self->_on_error($res, $hdr);
}
});
}
else {
debugf "create new session...";
$self->post('session/create', {
user => $self->user,
password => $self->password,
$self->api_key ? (api_key => $self->api_key) : (),
}, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
debugf "session created: %s", $res->{session};
$self->session( $res->{session} );
$self->_get_channels;
}
else {
debugf "session create failed: %s", ddf($res || $hdr);
$self->_on_error($res, $hdr);
}
});
}
Scalar::Util::weaken($self);
}
sub update_room_info {
my ($self) = @_;
$self->_get_channels;
}
sub _get_channels {
my ($self) = @_;
debugf "getting joined channels";
$self->get('user/get_rooms', { session => $self->session }, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
debugf "got rooms: %s", ddf($res->{rooms});
$self->_update_room_info( $res->{rooms} );
}
else {
$self->_on_error($res, $hdr);
}
});
Scalar::Util::weaken($self);
}
sub _update_room_info {
my ($self, $rooms) = @_;
$self->get('room/show', { session => $self->session, room => join ',', @{ $rooms } }, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
debugf "got room infos";
if ($self->on_room_info) {
$self->on_room_info->($res->{rooms});
}
else {
debugf "no room info callback";
}
$self->_start_observe($rooms);
}
else {
$self->_on_error($res, $hdr);
}
});
Scalar::Util::weaken($self);
}
sub _start_observe {
my ($self, $rooms) = @_;
$self->post('room/subscribe', {
session => $self->session,
rooms => join(',', @$rooms),
reset => 1,
}, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
$self->counter( $res->{counter} );
$self->_polling;
}
else {
$self->_on_error($res, $hdr);
}
});
Scalar::Util::weaken($self);
}
sub _polling {
my ($self) = @_;
if ($self->_polling_guard) {
debugf 'polling session is still active, ignoring this request';
return;
}
my $uri = URI->new( $self->endpoint . 'event/observe' );
$uri->port(8080);
$uri->query_form({ session => $self->session, counter => $self->counter });
my $guard = http_get $uri, timeout => 60, sub {
my ($body, $hdr) = @_;
return unless $self;
my $res = try { decode_json $body };
if ($res and $res->{status} eq 'ok') {
if ($res->{counter}) {
$self->counter( $res->{counter} );
}
if ($res->{events}) {
if (my $cb = $self->on_event) {
$cb->($_) for @{ $res->{events} };
}
else {
debugf "no on_event callback";
}
}
$self->_clear_polling_guard;
$self->_polling;
}
else {
$self->_on_error($res, $hdr);
}
};
Scalar::Util::weaken($self);
$self->_polling_guard( $guard );
}
sub say {
my ($self, $room, $msg, $cb) = @_;
$self->post('room/say', { session => $self->session, room => $room, text => $msg }, sub {
my ($res, $hdr) = @_;
return unless $self;
if ($res and $res->{status} eq 'ok') {
$cb->($res) if $cb;
}
else {
$self->_on_error($res, $hdr);
}
});
Scalar::Util::weaken($self);
}
1;
__END__
=head1 NAME
AnyEvent::Lingr - Asynchronous Lingr client.
=head1 SYNOPSIS
use AnyEvent;
use AnyEvent::Lingr;
my $lingr = AnyEvent::Lingr->new(
user => 'your lingr username',
password => 'your lingr password',
api_key => 'your lingr api_key', # optional
);
# error handler
$lingr->on_error(sub {
my ($msg) = @_;
warn 'Lingr error: ', $msg;
# reconnect after 5 seconds,
my $t; $t = AnyEvent->timer(
after => 5,
( run in 0.903 second using v1.01-cache-2.11-cpan-39bf76dae61 )