从步骤函数向lambda任务传递并使用输入(参数)



我有一个简单的步骤函数来启动lambda,我正在寻找一种方法来将参数(事件/上下文)传递给几个后续任务中的每一个。我的步骤函数如下:

{
"Comment": "A Hello World example of the Amazon States Language using an AWS Lambda function",
"StartAt": "HelloWorld",
"States": {
"HelloWorld": {
"Type": "Task",
"Parameters": {
"TableName": "table_example"
},
"Resource": "arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync",
"End": true
}
}
}

在用Python编写的lambda中,我使用了一个简单的处理程序,它是:

def lambda_handler(event, context):
#...

事件和上下文如下(检查日志):

START RequestId:f58140b8-9f04-47d7-9285-510b0357b4c2版本:$LATEST

我找不到将参数传递到此lambda并在脚本中使用它们的方法。本质上,我试图做的是运行相同的lambda,将几个不同的值作为参数传递。

有人能告诉我正确的方向吗?

基于您所说的:"寻找传递参数的方法(事件/上下文)到若干后续任务中的每一个">我以为您希望将非静态值传递给lambdas。

有两种方法通过状态机传递参数。经由CCD_ 1和CCD_。如有差异,请查看此处。

如果没有任何要传递给lambda的静态值,我将执行以下操作。以json格式将所有参数传递给step函数。

为状态机输入JSON

{
"foo": 123,
"bar": ["a", "b", "c"],
"car": {
"cdr": true
}
"TableName": "table_example"
}

在步骤函数中,使用"InputPath": "$"将整个JSON显式传递给lambda,但第一步是隐式传递的。有关$路径语法的更多信息,请查看此处。您还需要注意任务结果,使用ResultPath的多种方法之一。在大多数情况下,最安全的解决方案是将任务结果保持在特殊变量"ResultPath": "$.taskresult"

{
"Comment": "A Hello World example of the Amazon States Language using an AWS Lambda function",
"StartAt": "HelloWorld",
"States": {
"HelloWorld": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync",
"Next": "HelloWorld2"
},
"HelloWorld2": {
"Type": "Task",
"InputPath": "$",
"ResultPath": "$.taskresult"
"Resource": "arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync_2",
"End": true
}
}
}

lambda中的哪个成为事件变量,可以作为python字典访问

def lambda_handler(event, context):
table_example = event["TableName"]
a = event["bar"][0]
cdr_value = event["car"]["cdr"]
# taskresult will not exist as event key 
# only on lambda triggered by first state
# in the rest of subsequent states
# it will hold a task result of last executed state
taskresult = event["taskresult"]

使用这种方法,您可以使用多个步骤函数和不同的lambda,并且通过移动lambdas中的所有逻辑,仍然保持它们干净且小。此外,它更容易调试,因为所有Lambda中的所有事件变量都是相同的,所以通过简单的print(event),您可以看到整个状态机所需的所有参数以及可能出现的问题。

很明显,当Resource设置为lambda ARN(例如"arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync")时,您不能使用InputPath0来指定输入,而是传递步骤函数的状态(如果之前没有步骤,则可能是状态函数的输入)。

要通过Parameters传递函数输入,您可以将Resource指定为"arn:aws:states:::lambda:invoke",并在Parameters部分提供您的FunctionName

{
"StartAt": "HelloWorld",
"States": {
"HelloWorld": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "YOUR_FUNCTION_NAME",
"Payload": {
"SOMEPAYLOAD": "YOUR PAYLOAD"
}
},
"End": true
}
}
}

您可以在这里找到调用Lambda函数的文档:https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html

您也可以潜在地使用inputPath,或者也可以使用步骤函数状态函数中的元素:https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html

由于某些原因,在Resource中直接指定lambda function ARN不起作用。

以下解决方法纯粹是ASL定义,您只需在之前使用参数创建一个Pass步骤,其输出将用作下一步骤的输入(使用lambda调用的步骤HelloWorld):

{
"Comment": "A Hello World example of the Amazon States Language using an AWS Lambda function",
"StartAt": "HelloParams",
"States": {
"HelloParams": {
"Type": "Pass",
"Parameters": {
"TableName": "table_example"
},
"Next": "HelloWorld"
},
"HelloWorld": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync",
"End": true
}
}
}

在另一个响应中有一个解决方法,告诉您使用lambda函数执行上一步,但对于简单的情况来说不需要。上下文值也可以映射,例如当前时间戳:

"HelloParams": {
"Type": "Pass",
"Parameters": {
"TableName": "table_example",
"Now": "$$.State.EnteredTime"
},
"Next": "HelloWorld"
}, 

此外,InputPathResultPath可用于防止覆盖先前步骤中的值。例如:

{
"Comment": "A Hello World example of the Amazon States Language using an AWS Lambda function",
"StartAt": "HelloParams",
"States": {
"HelloParams": {
"Type": "Pass",
"Parameters": {
"TableName": "table_example"
},
"ResultPath": "$.hello_prms",
"Next": "HelloWorld"
},
"HelloWorld": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-southeast-2:XXXXXXX:function:fields_sync",
"InputPath": "$.hello_prms",
"ResultPath": "$.hello_result",
"End": true
}
}
}

这将在hello_prms中保存参数(以便您可以在其他步骤中重用它们),并在Parameters0中保存执行结果,而不使用以前步骤中的值(以防添加它们)。

就像Milan在评论中提到的那样,您可以将数据从Step function State传递给Lambda函数。

在Lambda函数中,您需要读取event的内容。

import json
def lambda_handler(event, context):
TableName = event['TableName']

最新更新