AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN


sub header_hash2string {
    my $header_hashref = shift;
    return join($EOL, map { "$_:$header_hashref->{$_}" } keys %$header_hashref);
}

sub header_string2hash {
    my $header_string = shift;
    my $result_hashref = {};

    foreach (split /\n/, $header_string) {
        if (m/([^\r\n:]+):([^\r\n:]*)/) {
            # Repeated Header Entries: Do not replace if it already exists
            $result_hashref->{$1} = $2 unless defined $result_hashref->{$1};
        }
    }

    return $result_hashref;
}

sub encode_header {
    my $header_hashref = shift;
    my $result_hashref = {};

    my $ENCODE_KEYS = '['.join('', map(sprintf('\\x%02x', ord($_)), keys(%ENCODE_MAP))).']';

    while (my ($k, $v) = each(%$header_hashref)) {
        $v =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
        $k =~ s/($ENCODE_KEYS)/$ENCODE_MAP{$1}/ego;
        $result_hashref->{$k} = $v;
    }

    return $result_hashref;
}

sub decode_header {
    my $header_hashref = shift;
    my $result_hashref = {};

    while (my ($k, $v) = each(%$header_hashref)) {
        if ($v =~ m/(\\.)/) {
            $v =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header value.";
        }
        if ($k =~ m/(\\.)/) {
            $k =~ s/(\\.)/$DECODE_MAP{$1}/eg || croak "Invalid header key.";
        }
        $result_hashref->{$k} = $v;
    }

    return $result_hashref;
}

sub send_frame {
    my ($self, $command, $header_hashref, $body) = @_;

    unless ($self->is_connected or $command eq 'CONNECT') {
        croak "Have you considered connecting to a STOMP broker first before "
            ."trying to send something?";
    }

    utf8::encode($command);

    my $header;
    if ($command eq 'CONNECT') {
        $header = header_hash2string($header_hashref);
    }
    else {
        $header = header_hash2string(encode_header($header_hashref));
    }
    utf8::encode($header);

    my $frame;
    if ($command eq 'SEND') {
        $body = '' unless defined $body;
        $frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
    }
    else {
        $frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
    }

    $self->event('SEND_FRAME', $frame);
    $self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
    $self->{handle}->push_write($frame);
    $self->reset_client_heartbeat_timer;
}

sub send {
    my ($self, $destination, $headers, $body) = @_;

    if (defined $destination) {
        $headers->{destination} = $destination;
    }
    else {
        croak "Would you mind supplying me with a destination?";
    }

    unless (defined $headers->{'content-length'}) {
        $headers->{'content-length'} = length $body || 0;
    }

    unless (defined $headers->{'content-type'}) {
        carp "It is strongly recommended to set the 'content-type' header.";
    }

    $self->send_frame('SEND', $headers, $body);
}

sub ack {
    my ($self, $ack_id, $transaction) = @_;

    unless ($ack_id) {
        croak "I do really need the message's ack header to ACK it.";
    }

    my $header = {id => $ack_id,};
    $header->{transaction} = $transaction if (defined $transaction);

    $self->send_frame('ACK', $header);
}

sub nack {
    my ($self, $ack_id, $transaction) = @_;

    unless ($ack_id) {
        croak "I do really need the message's ack header to NACK it.";
    }

    my $header = {id => $ack_id,};
    $header->{transaction} = $transaction if (defined $transaction);



( run in 1.168 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )