AWS-Lambda

 view release on metacpan or  search on metacpan

lib/AWS/Lambda/PSGI.pm  view on Meta::CPAN

    $env->{'psgi.nonblocking'}  = Plack::Util::FALSE;
    $env->{'psgix.harakiri'}    = Plack::Util::TRUE;
    $env->{'psgix.input.buffered'} = Plack::Util::TRUE;

    # inject the request id that compatible with Plack::Middleware::RequestId
    if ($ctx) {
        $env->{'psgix.request_id'} = $ctx->aws_request_id;
        $env->{'HTTP_X_REQUEST_ID'} = $ctx->aws_request_id;
    }

    my $body = encode_utf8($payload->{body} // '');
    if ($payload->{isBase64Encoded}) {
        $body = decode_base64 $body;
    }
    open my $input, "<", \$body;
    $env->{'psgi.input'} = $input;
    $env->{CONTENT_LENGTH} //= bytes::length($body);
    my $requestContext = $payload->{requestContext};
    $env->{REQUEST_METHOD} = $requestContext->{http}{method};
    $env->{REQUEST_URI} = $payload->{rawPath};
    if ($env->{QUERY_STRING}) {
        $env->{REQUEST_URI} .= '?' . $env->{QUERY_STRING};
    }
    $env->{PATH_INFO} = $requestContext->{http}{path};
    $env->{SCRIPT_NAME} = '';
    return $env;
}

sub format_output {
    my ($self, $response) = @_;
    my ($status, $headers, $body) = @$response;

    my $singleValueHeaders = {};
    my $multiValueHeaders = {};
    Plack::Util::header_iter($headers, sub {
        my ($k, $v) = @_;
        $singleValueHeaders->{lc $k} = string $v;
        push @{$multiValueHeaders->{lc $k} //= []}, string $v;
    });

    my $content = '';
    if (reftype($body) eq 'ARRAY') {
        $content = join '', grep defined, @$body;
    } else {
        local $/ = \4096;
        while (defined(my $buf = $body->getline)) {
            $content .= $buf;
        }
        $body->close;
    }

    my $type = $singleValueHeaders->{'content-type'} // 'application/octet-stream';
    my $isBase64Encoded = $type !~ m(^text/.*|application/(:?json|javascript|xml))i;
    if ($isBase64Encoded) {
        $content = encode_base64 $content, '';
    } else {
        $content = try {
            # is valid utf-8 string? try to decode as utf-8.
            decode_utf8($content, Encode::FB_CROAK | Encode::LEAVE_SRC);
        } catch {
            # it looks not utf-8 encoding. fallback to base64 encoding.
            $isBase64Encoded = 1;
            encode_base64 $content, '';
        };
    }

    return +{
        isBase64Encoded => bool $isBase64Encoded,
        headers => $singleValueHeaders,
        multiValueHeaders => $multiValueHeaders,
        statusCode => number $status,
        body => string $content,
    }
}

sub _handle_response_stream {
    my ($self, $response) = @_;
    if (reftype($response) ne "CODE") {
        my $orig = $response;
        $response = sub {
            my $responder = shift;
            $responder->($orig);
        };
    }

    return sub {
        my $lambda_responder = shift;
        my $psgi_responder = sub {
            my $response = shift;
            my ($status, $headers, $body) = @$response;

            # write the prelude.
            my $writer = $lambda_responder->("application/vnd.awslambda.http-integration-response");
            my $prelude = encode_json($self->_format_response_stream($status, $headers));
            $prelude .= "\x00\x00\x00\x00\x00\x00\x00\x00";
            $writer->write($prelude) or die "failed to write prelude: $!";

            # write the body.
            if (!defined $body) {
                # the caller will write the body.
                return $writer;
            }
            if (reftype($body) eq 'ARRAY') {
                # array-ref
                for my $chunk (@$body) {
                    $writer->write($chunk) or die "failed to write chunk: $!";
                }
            } else {
                # IO::Handle-like object
                local $/ = \4096;
                while (defined(my $chunk = $body->getline)) {
                    $writer->write($chunk) or die "failed to write chunk: $!";
                }
            }
            $writer->close or die "failed to close writer: $!";
            return;
        };
        $response->($psgi_responder);
    };
}

sub _format_response_stream {
    my ($self, $status, $headers) = @_;
    my $headers_hash = {};
    my $cookies = [];

    Plack::Util::header_iter($headers, sub {
        my ($k, $v) = @_;
        $k = lc $k;
        if ($k eq 'set-cookie') {
            push @$cookies, string $v;
        } elsif (exists $headers_hash->{$k}) {
            $headers_hash->{$k} = ", $v";
        } else {
            $headers_hash->{$k} = string $v;
        }
    });

    return +{
        statusCode => number $status,
        headers    => $headers_hash,
        cookies    => $cookies,
    };
}

1;
__END__

=encoding utf-8

=head1 NAME

AWS::Lambda::PSGI - It translates event of Lambda Proxy Integrations in API Gateway and 
Application Load Balancer into L<PSGI>.

=head1 SYNOPSIS

Add the following script into your Lambda code archive.

    use utf8;
    use warnings;
    use strict;
    use AWS::Lambda::PSGI;

    my $app = require "$ENV{'LAMBDA_TASK_ROOT'}/app.psgi";
    my $func = AWS::Lambda::PSGI->wrap($app);

    sub handle {
        return $func->(@_);
    }

    1;

And then, L<Set up Lambda Proxy Integrations in API Gateway|https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html> or
L<Lambda Functions as ALB Targets|https://docs.aws.amazon.com/elasticloadbalancing/latest/application/lambda-functions.html>

=head1 DESCRIPTION

=head2 Streaming Response

L<AWS::Lambda::PSGI> supports L<response streaming|https://docs.aws.amazon.com/lambda/latest/dg/configuration-response-streaming.html>.
The function urls's invoke mode is configured as C<"RESPONSE_STREAM">, and Lambda environment variable "PERL5_LAMBDA_PSGI_INVOKE_MODE" is set to C<"RESPONSE_STREAM">.

    ExampleApi:
        Type: AWS::Serverless::Function
        Properties:
            FunctionUrlConfig:
                AuthType: NONE
                InvokeMode: RESPONSE_STREAM
            Environment:
                Variables:
                PERL5_LAMBDA_PSGI_INVOKE_MODE: RESPONSE_STREAM
            # (snip)

In this mode, the PSGI server accepts L<Delayed Response and Streaming Body|https://metacpan.org/pod/PSGI#Delayed-Response-and-Streaming-Body>.

    my $app = sub {
        my $env = shift;
    
        return sub {
            my $responder = shift;
            $responder->([ 200, ['Content-Type' => 'text/plain'], [ "Hello World" ] ]);
        };
    };

An application MAY omit the third element (the body) when calling the responder.

    my $app = sub {
        my $env = shift;



( run in 2.001 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )