从onEvent内部停止Azure EventBus Receiver



我正在使用Python Azure事件中心包,并有以下代码:

def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))
event_json = json.loads(str(event))
if event_json["command_status"]=="successull":
# STOP RECEIVER HERE!

def on_partition_initialize(partition_context):
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
print("Partition: {} has been closed, reason for closing: {}.".format(
partition_context.partition_id,
reason
))
def on_error(partition_context, error):
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=self.conn_string_event_hub,
consumer_group=self.consumer_group,
eventhub_name=self.event_hub_name,
)
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
)

我知道你可以使用asynco通过超时停止接收器。

然而,我想做的是,如果我收到一个具有特定特征的事件,我想停止接收器。

或者在第一个事件到达后停止接收器。

这可能吗?

谢谢!

您可以通过关闭on_event回调中的消费者客户端来停止接收,如下所示。

def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))
event_json = json.loads(str(event))
if event_json["command_status"]=="successull":
consumer_client.close()

最新更新