如何强制azure eventhub checkpointstoreblob在xx秒后终止



嗨,我有一个Azure函数(http触发器),从物联网中心读取数据,基于这里的代码。我要做的是在58秒后终止azure函数,我尝试了经典的方法是存储当前时间,当我启动azure函数和on_event()函数时,测试时间是否超过58秒,但它不起作用,它不是python !当我在文档中查看时,如果在调用on_event()函数以终止进程时要传递参数并且找不到任何参数,我如何实现它?

我是这样做的:

  • 在我的__init__.py文件:
from . import eventHubHelper
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get('name')
if name:
# start consumer
logging.info('************* Starting consumer ***********')
# store current time
os.environ["TIME"] = str(time.time()) 
eventHubHelper.consumer()
  • 和在eventHubHelper.py文件中,我有从物联网中心读取事件的基本代码:
def on_event(partition_context, event):
# get the time
start = float(os.environ["TIME"]) 
print("################## current exec time: "+ str(time.time() - start) + " seconds")
if time.time() - start > 58:
start = 0
print("#################### execution time " + str(time.time() - start ))
# return http response but doesn't work
return func.HttpResponse("############ execution time: "+ str(time.time() - start) + " seconds")
print("Received event from partition: {}.".format(partition_context.partition_id))
print("Telemetry received: ", event.body_as_str())
print("Properties (set by device): ", event.properties)
print("System properties (set by IoT Hub): ", event.system_properties)
# store events in table storage
store_to_ts(partition_context.partition_id, event.body_as_str(), event.properties)
partition_context.update_checkpoint(event)
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
def consumer():
checkpoint_store = BlobCheckpointStore.from_connection_string(
STORAGE_CONNECTION_STR, 
BLOB_CONTAINER_NAME
)
client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group="$default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store

)
try:
with client:
client.receive(
on_event=on_event,
on_error=on_error
)
except KeyboardInterrupt:
print("Receiving has stopped.")

我认为AF没有对ENV变量的写访问权限os.environ["TIME"] = str(time.time())。

将事件第一次调用的开始时间存储在redis缓存(任何外部缓存)中,并与缓存值进行比较。不要忘记清除缓存值。

或者看一下持久函数来管理状态(在您的例子中是开始时间)。https://learn.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode

相关内容

  • 没有找到相关文章

最新更新