Skip to content

Commit

Permalink
feat: add lambda streaming support for remote invoke (aws#5307)
Browse files Browse the repository at this point in the history
* feat: support response streaming with remote invoke

* add invoker and mappers

* Update output formatting of stream response

* add unit tests

* fix formatting

* Add docs

* address comments

* formatting

* move is_function_invoke_mode_response_stream into lambda invoke executors and add/update string constants
  • Loading branch information
mndeveci authored and lucashuy committed Jun 22, 2023
1 parent 41709ee commit 4ca07e3
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 85 deletions.
4 changes: 2 additions & 2 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
chevron~=0.12
click~=8.0
Flask<2.3
#Need to add Schemas latest SDK.
boto3>=1.19.5,==1.*
#Need to add latest lambda changes which will return invoke mode details
boto3>=1.26.109,==1.*
jmespath~=1.0.1
ruamel_yaml~=0.17.21
PyYAML>=5.4.1,==5.*
Expand Down
132 changes: 118 additions & 14 deletions samcli/lib/remote_invoke/lambda_invoke_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import base64
import json
import logging
from abc import ABC, abstractmethod
from json import JSONDecodeError
from typing import Any, Dict, cast
from typing import Any, cast

from botocore.eventstream import EventStream
from botocore.exceptions import ClientError, ParamValidationError
from botocore.response import StreamingBody

Expand All @@ -26,12 +28,19 @@
LOG = logging.getLogger(__name__)
FUNCTION_NAME = "FunctionName"
PAYLOAD = "Payload"
EVENT_STREAM = "EventStream"
PAYLOAD_CHUNK = "PayloadChunk"
INVOKE_COMPLETE = "InvokeComplete"
LOG_RESULT = "LogResult"

INVOKE_MODE = "InvokeMode"
RESPONSE_STREAM = "RESPONSE_STREAM"

class LambdaInvokeExecutor(BotoActionExecutor):

class AbstractLambdaInvokeExecutor(BotoActionExecutor, ABC):
"""
Calls "invoke" method of "lambda" service with given input.
If a file location provided, the file handle will be passed as Payload object
Abstract class for different lambda invocation executors, see implementation for details.
For Payload parameter, if a file location provided, the file handle will be passed as Payload object
"""

_lambda_client: Any
Expand Down Expand Up @@ -59,14 +68,9 @@ def validate_action_parameters(self, parameters: dict) -> None:
def _execute_action(self, payload: str):
self.request_parameters[FUNCTION_NAME] = self._function_name
self.request_parameters[PAYLOAD] = payload
LOG.debug(
"Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)

try:
response = self._lambda_client.invoke(**self.request_parameters)
return self._execute_lambda_invoke(payload)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
Expand All @@ -80,7 +84,40 @@ def _execute_action(self, payload: str):
elif boto_utils.get_client_error_code(client_ex) == "InvalidRequestContentException":
raise InvalidResourceBotoParameterException(client_ex) from client_ex
raise ErrorBotoApiCallException(client_ex) from client_ex
return response

@abstractmethod
def _execute_lambda_invoke(self, payload: str):
pass


class LambdaInvokeExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke" method of "lambda" service with given input.
"""

def _execute_lambda_invoke(self, payload: str) -> dict:
LOG.debug(
"Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
return cast(dict, self._lambda_client.invoke(**self.request_parameters))


class LambdaInvokeWithResponseStreamExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke_with_response_stream" method of "lambda" service with given input.
"""

def _execute_lambda_invoke(self, payload: str) -> dict:
LOG.debug(
"Calling lambda_client.invoke_with_response_stream with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
return cast(dict, self._lambda_client.invoke_with_response_stream(**self.request_parameters))


class DefaultConvertToJSON(RemoteInvokeRequestResponseMapper):
Expand Down Expand Up @@ -124,6 +161,31 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
return remote_invoke_input


class LambdaStreamResponseConverter(RemoteInvokeRequestResponseMapper):
"""
This class helps to convert response from lambda invoke_with_response_stream API call.
That API call returns 'EventStream' which yields 'PayloadChunk's and 'InvokeComplete' as they become available.
This mapper, gets all 'PayloadChunk's and 'InvokeComplete' events and decodes them for next mapper.
"""

def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
LOG.debug("Mapping Lambda response to string object")
if not isinstance(remote_invoke_input.response, dict):
raise InvalideBotoResponseException("Invalid response type received from Lambda service, expecting dict")

event_stream: EventStream = remote_invoke_input.response.get(EVENT_STREAM, [])
decoded_event_stream = []
for event in event_stream:
if PAYLOAD_CHUNK in event:
decoded_payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD).decode("utf-8")
decoded_event_stream.append({PAYLOAD_CHUNK: {PAYLOAD: decoded_payload_chunk}})
if INVOKE_COMPLETE in event:
log_output = event.get(INVOKE_COMPLETE).get(LOG_RESULT, b"")
decoded_event_stream.append({INVOKE_COMPLETE: {LOG_RESULT: log_output}})
remote_invoke_input.response[EVENT_STREAM] = decoded_event_stream
return remote_invoke_input


class LambdaResponseOutputFormatter(RemoteInvokeRequestResponseMapper):
"""
This class helps to format output response for lambda service that will be printed on the CLI.
Expand All @@ -139,8 +201,8 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
"""
if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT:
LOG.debug("Formatting Lambda output response")
boto_response = cast(Dict, remote_invoke_input.response)
log_field = boto_response.get("LogResult")
boto_response = cast(dict, remote_invoke_input.response)
log_field = boto_response.get(LOG_RESULT)
if log_field:
log_result = base64.b64decode(log_field).decode("utf-8")
remote_invoke_input.log_output = log_result
Expand All @@ -152,3 +214,45 @@ def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExe
remote_invoke_input.response = boto_response.get(PAYLOAD)

return remote_invoke_input


class LambdaStreamResponseOutputFormatter(RemoteInvokeRequestResponseMapper):
"""
This class helps to format streaming output response for lambda service that will be printed on the CLI.
It loops through EventStream elements and adds them to response, and once InvokeComplete is reached, it updates
log_output and response objects in remote_invoke_input.
"""

def map(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
"""
Maps the lambda response output to the type of output format specified as user input.
If output_format is original-boto-response, write the original boto API response
to stdout.
"""
if remote_invoke_input.output_format == RemoteInvokeOutputFormat.DEFAULT:
LOG.debug("Formatting Lambda output response")
boto_response = cast(dict, remote_invoke_input.response)
combined_response = ""
for event in boto_response.get(EVENT_STREAM, []):
if PAYLOAD_CHUNK in event:
payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD)
combined_response = f"{combined_response}{payload_chunk}"
if INVOKE_COMPLETE in event:
log_result = base64.b64decode(event.get(INVOKE_COMPLETE).get(LOG_RESULT)).decode("utf-8")
remote_invoke_input.log_output = log_result
remote_invoke_input.response = combined_response
return remote_invoke_input


def _is_function_invoke_mode_response_stream(lambda_client: Any, function_name: str):
"""
Returns True if given function has RESPONSE_STREAM as InvokeMode, False otherwise
"""
try:
function_url_config = lambda_client.get_function_url_config(FunctionName=function_name)
function_invoke_mode = function_url_config.get(INVOKE_MODE)
LOG.debug("InvokeMode of function %s: %s", function_name, function_invoke_mode)
return function_invoke_mode == RESPONSE_STREAM
except ClientError as ex:
LOG.debug("Function %s, doesn't have Function URL configured, using regular invoke", function_name, exc_info=ex)
return False
22 changes: 21 additions & 1 deletion samcli/lib/remote_invoke/remote_invoke_executor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from samcli.lib.remote_invoke.lambda_invoke_executors import (
DefaultConvertToJSON,
LambdaInvokeExecutor,
LambdaInvokeWithResponseStreamExecutor,
LambdaResponseConverter,
LambdaResponseOutputFormatter,
LambdaStreamResponseConverter,
LambdaStreamResponseOutputFormatter,
_is_function_invoke_mode_response_stream,
)
from samcli.lib.remote_invoke.remote_invoke_executors import RemoteInvokeExecutor, ResponseObjectToJsonStringMapper
from samcli.lib.utils.cloudformation import CloudFormationResourceSummary
Expand Down Expand Up @@ -64,6 +68,22 @@ def _create_lambda_boto_executor(self, cfn_resource_summary: CloudFormationResou
:return: Returns the created remote invoke Executor
"""
lambda_client = self._boto_client_provider("lambda")
if _is_function_invoke_mode_response_stream(lambda_client, cfn_resource_summary.physical_resource_id):
LOG.debug("Creating response stream invocator for function %s", cfn_resource_summary.physical_resource_id)
return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[
LambdaStreamResponseConverter(),
LambdaStreamResponseOutputFormatter(),
ResponseObjectToJsonStringMapper(),
],
boto_action_executor=LambdaInvokeWithResponseStreamExecutor(
lambda_client,
cfn_resource_summary.physical_resource_id,
),
)

return RemoteInvokeExecutor(
request_mappers=[DefaultConvertToJSON()],
response_mappers=[
Expand All @@ -72,7 +92,7 @@ def _create_lambda_boto_executor(self, cfn_resource_summary: CloudFormationResou
ResponseObjectToJsonStringMapper(),
],
boto_action_executor=LambdaInvokeExecutor(
self._boto_client_provider("lambda"),
lambda_client,
cfn_resource_summary.physical_resource_id,
),
)
Expand Down
Loading

0 comments on commit 4ca07e3

Please sign in to comment.