如何将blob触发的启动函数的输入传递到Azure持久函数的编排器内的活动函数?



正如标题所说,我有一个触发blob的启动器函数,我希望将触发启动器的blob发送到活动函数作为输入。我可以选择使用.start_new()方法传入输入,但是这些输入必须是JSON可序列化的,而blob文件(func_inputstream)不是。

我试图解码InputStream对象(myblob),它允许我将其作为输入传递,但我失去了一些基本功能,当它是InputStream对象时,我将拥有它,并且还需要确保我能够将其传递到将在活动函数中调用的后续认知服务中。下面是starter函数的代码。

启动函数

functions.json'

{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "myblob",
"type": "blobTrigger",
"direction": "in",
"path": "container/{name}",
"connection": "conn str"
},
{
"name": "$return",
"type": "blob",
"path": "container/{name}",
"connection": "AzureWebJobsStorage",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}

'init.py

async def main(myblob: func.InputStream, starter: str) -> func.InputStream:
logging.info(f"Python blob trigger function processed blob n"
f"Name: {myblob.name}n"
f"Blob Size: {myblob.length} bytesnn")        
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new('Orchestrator', client_input=myblob) ##Client_input must be json serializable
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return myblob

返回:异常:TypeError: class 'azure.functions.blob.InputStream'>不公开to_json函数

Activity Functions也可以有其他常规的输入和输出绑定。因此,您可以将blob的详细信息作为输入传递给活动函数,并使用这些详细信息使用blob输入绑定来获取blob。

使用blobtrigger有更多的问题,所以改为使用事件网格触发器。将触发编排的blob的名称传递给活动函数(从事件数据中检索),并使用azure sdk将文件下载到内存,而不是工作得很好。

移除blob触发器的原因:

  1. 高延迟,标准blob触发器实际上不是事件驱动的,而是依赖于轮询
  2. 出于类似的原因,日志不被保证
  3. 当处理大文件时(我们的用例需要2.5gb的上限),blob触发器可能是计算密集型的,并且有时触发器会由于blob
  4. 的大小/长度而失败。

下面链接的文档中提供了这些信息:

https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-blob-trigger?pivots=programming-language-python&标签= python-v2 % 2 cin-process

相关内容

  • 没有找到相关文章

最新更新