Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferrable support to LambdaInvokeFunctionOperator #40307

Closed
1 task done
eladkal opened this issue Jun 18, 2024 · 10 comments
Closed
1 task done

Add deferrable support to LambdaInvokeFunctionOperator #40307

eladkal opened this issue Jun 18, 2024 · 10 comments
Assignees
Labels

Comments

@eladkal
Copy link
Contributor

eladkal commented Jun 18, 2024

Body

#33327 added deferrable option to LambdaCreateFunctionOperator we should have similar functionality in LambdaInvokeFunctionOperator

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@eladkal eladkal added provider:amazon AWS/Amazon - related issues area:providers kind:feature Feature Requests good first issue labels Jun 18, 2024
@eladkal eladkal changed the title Add defferable support to LambdaInvokeFunctionOperator Add deferrable support to LambdaInvokeFunctionOperator Jun 18, 2024
@vincbeck
Copy link
Contributor

vincbeck commented Jun 18, 2024

Indeed, that'd be a cool add-on!

@gopidesupavan
Copy link
Member

Happy to look Into this: 😀

@eladkal
Copy link
Contributor Author

eladkal commented Jun 19, 2024

Happy to look Into this: 😀

Assigned

@vincbeck
Copy link
Contributor

@gopidesupavan, have you started to work on it? Do you have any lead? I am asking this because I recall now there is no Lambda API to fetch status of a given execution. Which then makes this task impossible. Any pooling mechanism on a Lambda execution is purely impossible because there is no API to retrieve status

@eladkal
Copy link
Contributor Author

eladkal commented Jun 25, 2024

@gopidesupavan, have you started to work on it? Do you have any lead? I am asking this because I recall now there is no Lambda API to fetch status of a given execution. Which then makes this task impossible. Any pooling mechanism on a Lambda execution is purely impossible because there is no API to retrieve status

But we do have LambdaFunctionStateSensor
What am I missing?

@vincbeck
Copy link
Contributor

vincbeck commented Jun 25, 2024

@gopidesupavan, have you started to work on it? Do you have any lead? I am asking this because I recall now there is no Lambda API to fetch status of a given execution. Which then makes this task impossible. Any pooling mechanism on a Lambda execution is purely impossible because there is no API to retrieve status

But we do have LambdaFunctionStateSensor What am I missing?

LambdaFunctionStateSensor polls the deployment state of the AWS Lambda function until it reaches a target state. It does not poll an execution

@vincbeck
Copy link
Contributor

But if @gopidesupavan is interested in a similar issue, there is this issue: #40207. It seems the operator EcsRunTaskOperator has a bug with the deferrable mode. It is a good first issue

@gopidesupavan
Copy link
Member

gopidesupavan commented Jun 25, 2024

Hi @eladkal @vincbeck , actually i was started this task yesterday, agreed what you said is correct. No option to get the execution status of lambda execution. What i am thinking is invoking lambda with async with deferable from the trigger. I thinking of some cases useful, where the lambda execution takes more time say (more than 5min) and InvocationType=RequestResponse. so in this case the trigger can listen on the response from the lambda?.

This is trigger logic am planning:

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            self.__class__.__module__ + "." + self.__class__.__qualname__,
            {
                "function_name": self.function_name,
                "payload": self.payload,
                "log_type": self.log_type,
                "qualifier": self.qualifier,
                "invocation_type": self.invocation_type,
                "client_context": self.client_context,
                "aws_conn_id": self.aws_conn_id,
                "region_name": self.region_name,
                "verify": self.verify,
                "botocore_config": self.botocore_config,
            },
        )

    @cached_property
    def hook(self) -> LambdaHook:
        return LambdaHook(aws_conn_id=self.aws_conn_id,
                          region_name=self.region_name,
                          verify=self.verify,
                          config=self.botocore_config,
                          )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        response = await self.hook.invoke_lambda_async(
            function_name=self.function_name,
            invocation_type=self.invocation_type,
            log_type=self.log_type,
            client_context=self.client_context,
            payload=self.payload,
            qualifier=self.qualifier,
        )
 
        payload = await response.get("Payload").read()
        yield TriggerEvent({"status": "success", "response": response, "payload": payload.decode()})

Do you think something that usable? or is this approach not required?

@gopidesupavan
Copy link
Member

But if @gopidesupavan is interested in a similar issue, there is this issue: #40207. It seems the operator EcsRunTaskOperator has a bug with the deferrable mode. It is a good first issue
Yes can take a look, looks like Ellis is already working on this.

@vincbeck
Copy link
Contributor

As mentioned in #40425, this is not possible because AWS Lambda does not provide an API to retrieve status of an execution. Without this capability, it is not possible to make this operator deferrable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants