AWS-Step函数,使用TuningStep中的执行输入



我用一个步骤编写了一个简单的AWS步骤函数工作流:

from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import Chain, TuningStep
from stepfunctions.workflow import Workflow
import train_utils

def main():
workflow_execution_role = 'arn:aws:iam::MY ARN'
execution_input = ExecutionInput(schema={
'app_id': str
})
estimator = train_utils.get_estimator()
tuner = train_utils.get_tuner(estimator)
tuning_step = TuningStep(state_id="HP Tuning", tuner=tuner, data={
'train': f's3://my-bucket/{execution_input["app_id"]}/data/'},
wait_for_completion=True,
job_name='HP-Tuning')
workflow_definition = Chain([
tuning_step
])
workflow = Workflow(
name='HP-Tuning',
definition=workflow_definition,
role=workflow_execution_role,
execution_input=execution_input
)
workflow.create()

if __name__ == '__main__':
main()

我的目标是从运行时提供的执行JSON中提取train输入。当我执行工作流(从步骤函数控制台(时,提供JSON{"app_id": "My App ID"},调优步骤不会获得正确的数据,而是获得stepfunctions.inputs.placeholders.ExecutionInput的to_string表示。此外,当查看生成的ASL时,我可以看到执行输入被呈现为字符串:

... 
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "s3://my-bucket/<stepfunctions.inputs.placeholders.ExecutionInput object at 0x12261f7d0>/data/",
"S3DataDistributionType": "FullyReplicated"
}
},
...

我做错了什么?

更新:正如@yoodan所说,SDK可能落后了,所以我必须在调用create之前编辑定义。我可以看到有一种方法可以在调用create之前查看定义,但我可以修改图形定义吗?怎样

步骤函数的python SDK会生成相应的代码,我们需要在Amazon States Language中内置一个字符串串联/格式来实现您的愿望。

最近在2020年8月,亚马逊国家语言在其语言规范中引入了字符串格式等内置功能。https://states-language.net/#appendix-b

不幸的是,python SDK不是最新的,不支持新的更改。

作为一种解决方案,可能在调用工作流创建之前手动修改定义?

SDK似乎不支持您所寻找的内容。https://github.com/aws/aws-step-functions-data-science-sdk-python/issues/79

但是,您可以在创建状态机之前更改定义。这里有一个函数,它将迭代定义dict,并用正确的内部函数语法替换用{{PH}}包围的占位符,例如:

s3://my-bucket/{{app_id}}/data/

def get_updated_definition(data):
if isinstance(data, dict):
for k, v in data.copy().items():
if isinstance(v, dict):  # For DICT
data[k] = get_updated_definition(v)
elif isinstance(v, list):  # For LIST
data[k] = [get_updated_definition(i) for i in v]
elif isinstance(v, str) and re.search(r'{{([a-z_]+)}}', v):  # Update Key-Value
# data.pop(k)
# OR
del data[k]
keys = re.findall(r'{{([a-z_]+)}}', v)
data[f"{k}.$"] = f"States.Format('{re.sub(r'{{[a-z_]+}}', '{}', v)}',{','.join(['$.'+k for k in keys])})"
return data

用法:

workflow_definition = get_updated_definition(workflow.definition.to_dict())
create_state_machine(json.dumps(workflow_definition)) #<-- implement using boto3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html#SFN.Client.create_state_machine)

您对step函数的定义很好,看起来应该可以工作
从您的代码示例中不清楚实际执行的位置(您直接从控制台中说明(,这可能会导致几个选项导致问题,我相信这就是问题的根源
请提供更多相关信息。

您是否以某种方式导出创建的Workflow对象以便执行它
好像什么东西不见了。。。作为健全性检查,将以下内容附加到main函数中:

workflow.execute(inputs={"app_id": "My App ID"})

并再次检查日志

最新更新