AnyEvent-STOMP-Client
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
sub header_hash2string {
my $header_hashref = shift;
return join($EOL, map { "$_:$header_hashref->{$_}" } keys %$header_hashref);
}
sub header_string2hash {
my $header_string = shift;
my $result_hashref = {};
foreach (split /\n/, $header_string) {
if (m/([^\r\n:]+):([^\r\n:]*)/) {
# Repeated Header Entries: Do not replace if it already exists
$result_hashref->{$1} = $2 unless defined $result_hashref->{$1};
}
}
return $result_hashref;
}
sub encode_header {
my $header_hashref = shift;
my $result_hashref = {};
my $ENCODE_KEYS = '['.join('', map(sprintf('\\x%02x', ord($_)), keys(%ENCODE_MAP))).']';
while (my ($k, $v) = each(%$header_hashref)) {
$v =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
$k =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
$result_hashref->{$k} = $v;
}
return $result_hashref;
}
sub decode_header {
my $header_hashref = shift;
my $result_hashref = {};
while (my ($k, $v) = each(%$header_hashref)) {
if ($v =~ m/(\\.)/) {
$v =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header value.";
}
if ($k =~ m/(\\.)/) {
$k =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header key.";
}
$result_hashref->{$k} = $v;
}
return $result_hashref;
}
sub send_frame {
my ($self, $command, $header_hashref, $body) = @_;
unless ($self->is_connected or $command eq 'CONNECT') {
croak "Have you considered connecting to a STOMP broker first before "
."trying to send something?";
}
utf8::encode($command);
my $header;
if ($command eq 'CONNECT') {
$header = header_hash2string($header_hashref);
}
else {
$header = header_hash2string(encode_header($header_hashref));
}
utf8::encode($header);
my $frame;
if ($command eq 'SEND') {
$body = '' unless defined $body;
$frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
}
else {
$frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
}
$self->event('SEND_FRAME', $frame);
$self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
$self->{handle}->push_write($frame);
$self->reset_client_heartbeat_timer;
}
sub send {
my ($self, $destination, $headers, $body) = @_;
if (defined $destination) {
$headers->{destination} = $destination;
}
else {
croak "Would you mind supplying me with a destination?";
}
unless (defined $headers->{'content-length'}) {
$headers->{'content-length'} = length $body || 0;
}
unless (defined $headers->{'content-type'}) {
carp "It is strongly recommended to set the 'content-type' header.";
}
$self->send_frame('SEND', $headers, $body);
}
sub ack {
my ($self, $ack_id, $transaction) = @_;
unless ($ack_id) {
croak "I do really need the message's ack header to ACK it.";
}
my $header = {id => $ack_id,};
$header->{transaction} = $transaction if (defined $transaction);
$self->send_frame('ACK', $header);
}
sub nack {
my ($self, $ack_id, $transaction) = @_;
unless ($ack_id) {
croak "I do really need the message's ack header to NACK it.";
}
my $header = {id => $ack_id,};
$header->{transaction} = $transaction if (defined $transaction);
( run in 1.168 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )