如何使用 Python SDK 处理来自 EventHub 的 Json 消息?



我正在按照此示例处理从事件中心接收的消息:

https://github.com/Azure/azure-event-hubs-python/blob/master/examples/eph.py

但是,代码确实显示了有关如何检索每条消息并获取 Json 有效负载的任何代码?

我尝试了以下代码(使用适用于事件中心的 Python SDK(:

async def process_events_async(self, context, messages):
"""
Called by the processor host when a batch of events has arrived.
This is where the real work of the event processor is done.
:param context: Information about the partition
:type context: ~azure.eventprocessorhost.PartitionContext
:param messages: The events to be processed.
:type messages: list[~azure.eventhub.common.EventData]
"""
for message in messages:
message_content = json.loads(message.body)
logger.info("Events processed {}".format(context.sequence_number))
await context.checkpoint_async()

但是,我在 json.load 语句上收到以下错误消息:

"2018-09-20 23:13:12,484 azure 错误事件处理器错误 TypeError("JSON 对象必须是 str、bytes 或 bytearray,而不是 '生成器'",(">

你能帮我输入代码来获取消息的 Json 有效负载吗?

这是获取事件中心消息的 JSON 有效负载的工作代码:

for message in messages:
message_body = list(message.body)
message_content = json.loads(message_body[0])

最新更新