API-Eulerian
view release on metacpan or search on metacpan
lib/API/Eulerian/EDW/Peer/Thin.pm view on Meta::CPAN
# @param $command - Eulerian Data Warehouse Command.
#
# @return Reply context.
#
use Data::Dumper;
sub create
{
my ( $self, $command ) = @_;
my $response;
my $status;
# Get Valid Headers
$status = $self->headers();
if( ! $status->error() ) {
# Post new JOB to Eulerian Data Warehouse Platform
$status = API::Eulerian::EDW::Request->post(
$self->url(), $status->{ headers }, $command, 'text/plain'
);
if( ! $status->error() ) {
my $json = API::Eulerian::EDW::Request->json( $status->{ response } );
if( defined( $json ) && $json->{ status }->[ 1 ] != 0 ) {
$status = API::Eulerian::EDW::Status->new();
$status->error( 1 );
$status->msg( $json->{ status }->[ 0 ] );
$status->code( $json->{ status }->[ 1 ] );
}
}
}
return $status;
}
#
# @brief Dispatch Eulerian Data Warehouse Analytics Analysis result messages to
# their matching callback hook.
#
# @param $ws - Eulerian WebSocket.
# @param $buf - Received buffer.
#
sub dispatcher
{
my ( $ws, $buf ) = @_;
$buf = encode( 'utf-8', $buf );
my $json = decode_json( $buf );
my $type = $json->{ message };
my $uuid = $json->{ uuid };
my $self = $ws->{ _THIN };
my $hook = $self->hook();
my $rows = $json->{ rows };
switch( $type ) {
case 'add' {
$hook->on_add( $json->{ uuid }, $json->{ rows } );
}
case 'replace' {
$hook->on_replace( $json->{ uuid }, $json->{ rows } );
}
case 'headers' {
#print Dumper( $json ) . "\n";
$self->{ uuid } = $uuid;
$hook->on_headers(
$json->{ uuid }, $json->{ timerange }->[ 0 ],
$json->{ timerange }->[ 1 ], $json->{ columns }
);
}
case 'progress' {
$hook->on_progress(
$json->{ uuid }, $json->{ progress }
);
}
case 'status' {
$hook->on_status(
$json->{ uuid }, $json->{ aes }, $json->{ status }->[ 1 ],
$json->{ status }->[ 0 ], $json->{ status }->[ 2 ]
);
}
else {}
}
}
#
# @brief Join Websocket stream, raise callback hook accordingly to received
# messages types.
#
# @param $self - API::Eulerian::EDW::Peer:Thin instance.
# @param $rc - Reply context of JOB creation.
#
# @return Reply context.
#
sub join
{
my ( $self, $status ) = @_;
my $json = API::Eulerian::EDW::Request->json( $status->{ response } );
my $ws = API::Eulerian::EDW::WebSocket->new(
$self->host(), $self->ports()->[ $self->secure() ]
);
my $url = $self->url( $json->{ aes } );
$ws->{ _THIN } = $self;
return $ws->join( $url, \&dispatcher );
}
#
# @brief Do Request on Eulerian Data Warehouse Platform.
#
# @param $self - Eulerian Data Warehouse Peer.
# @param $command - Eulerian Data Warehouse Command.
#
sub request
{
my ( $self, $command ) = @_;
my $bench = new API::Eulerian::EDW::Bench();
my $response;
my $status;
my $json;
# Create Job on Eulerian Data Warehouse Platform
$bench->start();
$status = $self->create( $command );
$bench->stage( 'create' );
if( ! $status->error() ) {
# Join Websocket call user specific callback hook
$bench->start();
$status = $self->join( $status );
( run in 0.836 second using v1.01-cache-2.11-cpan-39bf76dae61 )