Bot-ChatBots-Telegram

 view release on metacpan or  search on metacpan

lib/Bot/ChatBots/Telegram/LongPoll.pm  view on Meta::CPAN

);

has _start => (
   is       => 'ro',
   default  => sub { return 1 },
   init_arg => 'start',
);

has update_timeout => (
   is      => 'ro',
   default => sub { return 300 },
);

sub BUILD {
   my $self = shift;
   $self->start if $self->_start;
}

sub class_custom_pairs {
   my $self = shift;
   return (token => $self->token);
}

sub parse_response {
   my ($self, $res, $threshold_id) = @_;
   my $data = $res->json // {};
   return grep { $_->{update_id} >= $threshold_id } @{$data->{result}//[]}
      if $data->{ok}; # boolean flag from Telegram API

   my $error = $data->{description} // 'unknown error';
   $log->error('getUpdates error: ' . $error);
   if ($log->is_trace) {
      local $Data::Dumper::Indent = 1;
      for ([json => $data], [res => $res]) {
         (my $d = Dumper $_->[1]) =~ s{\A.*?=}{$_->[0] =>}mxs;
         $log->trace($d);
      }
   }
   return;
}

sub poller {
   my $self = shift;
   my $args = (@_ && ref($_[0])) ? $_[0] : {@_};

   my $update_timeout = $self->update_timeout;
   my %query = (
      offset => 0,
      telegram_method => 'getUpdates',
      timeout => $update_timeout,
   );

   my $sender = $self->sender;
   $sender->telegram->agent->connect_timeout($self->connect_timeout)
     ->inactivity_timeout($update_timeout + 5)
     ->max_redirects($self->max_redirects);

   # this flag tells us whether we're in a call already, avoiding
   # duplicates. It is set before sending a request, and reset when the
   # response is managed
   my $is_busy;

   my $on_data = sub {
      my ($ua, $tx) = @_;

      my @updates;
      try {
         @updates = $self->parse_response($tx->res, $query{offset});
      }
      catch {
         $log->error(bleep $_);
         die $_ if $self->should_rethrow($args);
      };

      my @retval = $self->process_updates(
         refs => {
            sender => $sender,
            tx     => $tx,
            ua     => $ua,
         },
         source_pairs => {
            query => \%query,
         },
         updates => \@updates,
         %$args, # may override it all!
      );

      for my $item (@retval) {
         next unless defined $item;
         defined(my $record = $item->{record})            or next;
         defined(my $outcome = $item->{outcome})          or next;
         defined(my $message = $outcome->{send_response}) or next;
         $sender->send_message($message, record => $record);
      }

      # if we get here, somehow me managed to get past this call... Get
      # ready for the next one. Just to be on the safe side, we will
      # advance $query{offset} anyway
      $query{offset} = 1 + max map { $_->{update_id} } @updates
         if @updates;
      $is_busy = 0;
   };

   return sub {
      return if $is_busy;
      $is_busy = 1; # $on_data below will reset $is_busy when ready
      $sender->send_message(\%query, callback => $on_data);
   };
} ## end sub callback

around process => sub {
   my ($orig, $self, $record) = @_;
   my $outcome = $orig->($self, $record);
   $record->{source}{query}{offset} = $record->{update}{update_id} + 1;
   return $outcome;
};

sub start {
   my $self = shift;
   Mojo::IOLoop->recurring($self->interval, $self->poller(@_));
   Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
   return $self;
}

1;



( run in 2.076 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )