AI-MXNet
view release on metacpan or search on metacpan
lib/AI/MXNet/KVStore.pm view on Meta::CPAN
package AI::MXNet::KVStore;
use strict;
use warnings;
use AI::MXNet::Base;
use AI::MXNet::NDArray;
use AI::MXNet::Optimizer;
use MIME::Base64;
use Storable;
use Mouse;
use AI::MXNet::Function::Parameters;
=head1 NAME
AI::MXNet::KVStore - Key value store interface of MXNet.
=head1 DESCRIPTION
Key value store interface of MXNet for parameter synchronization, over multiple devices.
=cut
has 'handle' => (is => 'ro', isa => 'KVStoreHandle', required => 1);
has '_updater' => (is => 'rw', isa => 'AI::MXNet::Updater');
has '_updater_func' => (is => 'rw', isa => 'CodeRef');
sub DEMOLISH
{
check_call(AI::MXNetCAPI::KVStoreFree(shift->handle));
}
=head2 init
Initialize a single or a sequence of key-value pairs into the store.
For each key, one must init it before push and pull.
Only worker 0's (rank == 0) data are used.
This function returns after data have been initialized successfully
Parameters
----------
key : str or an array ref of str
The keys.
value : NDArray or an array ref of NDArray objects
The values.
Examples
--------
>>> # init a single key-value pair
>>> $shape = [2,3]
>>> $kv = mx->kv->create('local')
>>> $kv->init(3, mx->nd->ones($shape)*2)
>>> $a = mx->nd->zeros($shape)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 2 2 2]
[ 2 2 2]]
>>> # init a list of key-value pairs
>>> $keys = [5, 7, 9]
>>> $kv->init(keys, [map { mx->nd->ones($shape) } 0..@$keys-1])
=cut
method init(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] $value
)
{
my ($keys, $vals) = _key_value($key, $value);
check_call(
AI::MXNetCAPI::KVStoreInitEx(
$self->handle, scalar(@{ $keys }), $keys, $vals
)
);
}
=head2 push
Push a single or a sequence of key-value pairs into the store.
Data consistency:
1. this function returns after adding an operator to the engine.
2. push is always called after all previous push and pull on the same
key are finished.
3. there is no synchronization between workers. One can use _barrier()
to sync all workers.
Parameters
----------
key : str or array ref of str
value : NDArray or array ref of NDArray or array ref of array refs of NDArray
priority : int, optional
The priority of the push operation.
The higher the priority, the faster this action is likely
to be executed before other push actions.
Examples
--------
>>> # push a single key-value pair
>>> $kv->push(3, mx->nd->ones($shape)*8)
>>> $kv->pull(3, out=>$a) # pull out the value
>>> print $a->aspdl()
[[ 8. 8. 8.]
[ 8. 8. 8.]]
>>> # aggregate the value and the push
>>> $gpus = [map { mx->gpu($_) } 0..3]
>>> $b = [map { mx->nd->ones($shape, ctx => $_) } @$gpus]
>>> $kv->push(3, $b)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 4. 4. 4.]
[ 4. 4. 4.]]
>>> # push a list of keys.
>>> # single device
>>> $kv->push($keys, [map { mx->nd->ones($shape) } 0..@$keys-1)
>>> $b = [map { mx->nd->zeros(shape) } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1]->aspdl
[[ 1. 1. 1.]
[ 1. 1. 1.]]
>>> # multiple devices:
>>> $b = [map { [map { mx->nd->ones($shape, ctx => $_) } @$gpus] } @$keys-1]
>>> $kv->push($keys, $b)
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1][1]->aspdl()
[[ 4. 4. 4.]
[ 4. 4. 4.]]
=cut
method push(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] $value,
Int :$priority=0
)
{
my ($keys, $vals) = _key_value($key, $value);
check_call(
AI::MXNetCAPI::KVStorePushEx(
$self->handle, scalar(@{ $keys }), $keys, $vals, $priority
)
);
}
=head2 pull
Pull a single value or a sequence of values from the store.
Data consistency:
1. this function returns after adding an operator to the engine. But any
further read on out will be blocked until it is finished.
2. pull is always called after all previous push and pull on the same
key are finished.
3. It pulls the newest value from the store.
Parameters
----------
key : str or array ref of str
Keys
out: NDArray or array ref of NDArray or array ref of array refs of NDArray
According values
priority : int, optional
The priority of the push operation.
The higher the priority, the faster this action is likely
to be executed before other push actions.
Examples
--------
>>> # pull a single key-value pair
>>> $a = mx->nd->zeros($shape)
>>> $kv->pull(3, out=>$a)
>>> print $a->aspdl
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # pull into multiple devices
>>> $b = [map { mx->nd->ones($shape, $_) } @$gpus]
>>> $kv->pull(3, out=>$b)
>>> print $b->[1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # pull a list of key-value pairs.
>>> # On single device
>>> $keys = [5, 7, 9]
>>> $b = [map { mx->nd->zeros($shape) } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
>>> # On multiple devices
>>> $b = [map { [map { mx->nd->ones($shape, ctx => $_) } @$gpus ] } 0..@$keys-1]
>>> $kv->pull($keys, out=>$b)
>>> print $b->[1][1]->aspdl()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
=cut
method pull(
Str|ArrayRef[Str] $key,
AI::MXNet::NDArray|ArrayRef[AI::MXNet::NDArray]|ArrayRef[ArrayRef[AI::MXNet::NDArray]] :$out,
Int :$priority=0
)
{
my ($keys, $vals) = _key_value($key, $out);
check_call(
AI::MXNetCAPI::KVStorePullEx(
$self->handle, scalar(@{ $keys }), $keys, $vals, $priority
)
);
}
=head2 set_optimizer
Register an optimizer to the store
If there are multiple machines, this process (should be a worker node)
will pack this optimizer and send it to all servers. It returns after
this action is done.
Parameters
----------
optimizer : Optimizer
the optimizer
=cut
method set_optimizer(AI::MXNet::Optimizer $optimizer)
{
my $is_worker = check_call(AI::MXNetCAPI::KVStoreIsWorkerNode());
if($self->type eq 'dist' and $is_worker)
{
my $optim_str = MIME::Base64::encode_base64(Storable::freeze($optimizer), "");
$self->_send_command_to_servers(0, $optim_str);
}
else
{
$self->_updater(AI::MXNet::Optimizer->get_updater($optimizer));
$self->_set_updater(sub { &{$self->_updater}(@_) });
}
}
=head2 type
Get the type of this kvstore
Returns
-------
type : str
the string type
=cut
method type()
lib/AI/MXNet/KVStore.pm view on Meta::CPAN
Send a command to all server nodes, which will make each server node run
KVStoreServer.controller
This function returns after the command has been executed in all server
nodes.
Parameters
----------
head : int
the head of the command
body : str
the body of the command
=cut
method _send_command_to_servers(Int $head, Str $body)
{
check_call(
AI::MXNetCAPI::KVStoreSendCommmandToServers(
$self->handle,
$head,
$body
)
);
}
=head2 create
Create a new KVStore.
Parameters
----------
name : {'local'}
The type of KVStore
- local works for multiple devices on a single machine (single process)
- dist works for multi-machines (multiple processes)
Returns
-------
kv : KVStore
The created AI::MXNet::KVStore
=cut
method create(Str $name='local')
{
my $handle = check_call(AI::MXNetCAPI::KVStoreCreate($name));
return __PACKAGE__->new(handle => $handle);
}
sub _key_value
{
my ($keys, $vals) = @_;
if(not ref $keys)
{
if(blessed $vals)
{
return ([$keys], [$vals->handle]);
}
else
{
for my $value (@{ $vals })
{
assert(blessed($value) and $value->isa('AI::MXNet::NDArray'));
return ([($keys)x@$vals], [map { $_->handle } @$vals]);
}
}
}
else
{
assert(not blessed($vals) and @$keys == @$vals);
my @c_keys;
my @c_vals;
zip(sub {
my ($key, $val) = @_;
my ($c_key, $c_val) = _key_value($key, $val);
push @c_keys, @$c_key;
push @c_vals, @$c_val;
}, $keys, $vals);
return (\@c_keys, \@c_vals);
}
}
1;
( run in 1.180 second using v1.01-cache-2.11-cpan-39bf76dae61 )