等待活动完成的持久协调器函数的同步调用(Python Azure函数)



我正在尝试构建一个持久的函数,其中编排器等待活动完成,然后返回活动的结果。我能够从Azure文档中获得"标准"异步函数链接Python示例。

现在我想同步调用协调器客户端函数,等待活动函数完成并直接返回活动函数的输出,而不是使用create_check_status_response()方法并返回statusQueryGetUri端点。

我已经看到存在一个waitForCompletionOrCreateCheckStatusResponse()方法(Azure docs),它也在这个代码示例中使用。这应该正是我要找的,但我不能让它工作。

我目前的设置如下:E1_HelloSequence协调器和E1_SayHello活动函数与相应的function.json文件完全相同,与第一个链接和协调器客户端是与持久函数实例管理文档链接相同的精确副本:

import logging
import azure.functions as func
import azure.durable_functions as df
timeout = "timeout"
retry_interval = "retryInterval"
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params['functionName'], None, req.get_body())
logging.log(f"Started orchestration with ID = '${instance_id}'.")
timeout_in_milliseconds = get_time_in_seconds(req, timeout)
timeout_in_milliseconds = timeout_in_milliseconds if timeout_in_milliseconds != None else 30000
retry_interval_in_milliseconds = get_time_in_seconds(req, retry_interval)
retry_interval_in_milliseconds = retry_interval_in_milliseconds if retry_interval_in_milliseconds != None else 1000
return client.wait_for_completion_or_create_check_status_response(
req,
instance_id,
timeout_in_milliseconds,
retry_interval_in_milliseconds
)
def get_time_in_seconds(req: func.HttpRequest, query_parameter_name: str):
query_value = req.params.get(query_parameter_name)
return query_value if query_value != None else 1000

这会产生以下错误:

Exception: TypeError: class <class 'bytes'> does not expose a `to_json` function"

我是否可能需要将http输出绑定类型更改为function.json中的其他内容,以使其工作?

{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}",
"methods": [
"post"
]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
],
"disabled": false
}

有人能指出我需要改变什么,以使这个工作像预期的?谢谢!

这里你传递的是一个bytes对象,它来自于req.get_body():

instance_id = await client.start_new(req.route_params['functionName'], None, req.get_body())

在不知道你选择调用的函数的情况下,这个函数很可能期望一个HttpRequest对象,而不是请求体的bytes表示(因为它试图调用to_json()方法)。将req.get_body()更改为req应该可以解决这个问题。

我还注意到您在呼叫client.wait_for_completion_or_create_check_status_response之前缺少await

最新更新