如何解析嵌套的EventGrid消息



我只是在学习由队列存储事件处理程序触发的Azure函数。在这种情况下,队列存储正在处理事件网格消息。

问题:如何使用Python访问下面"body"中嵌套的各种值

  • 例如body/data/blobUrl的值

队列存储消息如下所示(缩进以便于阅读(:

  • "body"是嵌套的EventGrid消息
{
"id": "<big-long-guid>", 
"body": "{
"topic":"/subscriptions/<big-long-guid>/resourceGroups/azureStorage/providers/Microsoft.Storage/storageAccounts/stgcool",
"subject":"/blobServices/default/containers/cont-pics/blobs/profile_pic.jpg",
"eventType":"Microsoft.Storage.BlobCreated",
"id":"<big-long-guid>",
"data":{
"api":"PutBlob",
"clientRequestId":"<big-long-guid>",
"requestId":"<big-long-guid>",
"eTag":"0x8D94CE0B2F5CD71",
"contentType":"image/jpeg",
"contentLength":35799,
"blobType":"BlockBlob",
"blobUrl":"https://stgcool.blob.core.windows.net/cont-pics/profile_pic.jpg",
"url":"https://stgcool.blob.core.windows.net/cont-pics/profile_pic.jpg",
"sequencer":"00000000000000000000000000003730000000000000312a",
"storageDiagnostics":{
"batchId":"<big-long-guid>"
}
},
"dataVersion":"",
"metadataVersion":"1",
"eventTime":"2021-07-22T07:17:00.8479184Z"
}", 
"expiration_time": "2021-07-30T05:10:37+00:00", 
"insertion_time": "2021-07-23T05:10:37+00:00", 
"time_next_visible": "2021-07-23T05:20:37+00:00", 
"pop_receipt": "cOQ8m5lN2QgBAAAA", 
"dequeue_count": 1
}

这是生成上述日志的示例函数代码:

import logging
import json
import azure.functions as func
def main(msg: func.QueueMessage):
logging.info('Python queue trigger function processed a queue item.')
result = json.dumps({
'id': msg.id,
'body': msg.get_body().decode('utf-8'),
'expiration_time': (msg.expiration_time.isoformat()
if msg.expiration_time else None),
'insertion_time': (msg.insertion_time.isoformat()
if msg.insertion_time else None),
'time_next_visible': (msg.time_next_visible.isoformat()
if msg.time_next_visible else None),
'pop_receipt': msg.pop_receipt,
'dequeue_count': msg.dequeue_count
})
logging.info(result)

已尝试:

  • get_json()json.dumps()的不同迭代中封装msg.get_body(),但接收到错误

编辑1:

  • 'body': msg.get_body().decode('utf-8'),更改为'body': json.loads(msg.get_body().decode('utf-8')),将主体转换为实际的JSON,这很好。

  • 但是如何访问['body']['data']['blobUrl'] in结果?

  • type(result)str

我写这篇文章的理解是,您正在尝试更新如何为logging提供result。如果您实际要做的是从构造不良的Queue Storage消息中解析它,请告诉我。

举一个你举的例子:

d = {
"id": "<big-long-guid>", 
"body": "{
"topic":"/subscriptions/<big-long-guid>/resourceGroups/azureStorage/providers/Microsoft.Storage/storageAccounts/stgcool",
"subject":"/blobServices/default/containers/cont-pics/blobs/profile_pic.jpg",
"eventType":"Microsoft.Storage.BlobCreated",
"id":"<big-long-guid>",
"data":{
"api":"PutBlob",
"clientRequestId":"<big-long-guid>",
"requestId":"<big-long-guid>",
"eTag":"0x8D94CE0B2F5CD71",
"contentType":"image/jpeg",
"contentLength":35799,
"blobType":"BlockBlob",
"blobUrl":"https://stgcool.blob.core.windows.net/cont-pics/profile_pic.jpg",
"url":"https://stgcool.blob.core.windows.net/cont-pics/profile_pic.jpg",
"sequencer":"00000000000000000000000000003730000000000000312a",
"storageDiagnostics":{
"batchId":"<big-long-guid>"
}
}",
"dataVersion":"",
"metadataVersion":"1",
"eventTime":"2021-07-22T07:17:00.8479184Z"
}", 
"expiration_time": "2021-07-30T05:10:37+00:00", 
"insertion_time": "2021-07-23T05:10:37+00:00", 
"time_next_visible": "2021-07-23T05:20:37+00:00", 
"pop_receipt": "cOQ8m5lN2QgBAAAA", 
"dequeue_count": 1
}

我们可以这样查看"body"中的值:

d['body']['data']['api']
PutBlob

不幸的是,如果你尝试这样做,你会因为试图使用string访问另一个string中的索引而遭到TypeError的攻击。我们使用string对象作为dict的键,但不用于string的索引。你被抛出这样一个错误的原因是CCD_;CCD_ 23";实际上是str,而不是dict(请注意大括号两侧的"(。

通过将json.dumps更新为'body':来修复此问题

result = json.dumps({
'id': msg.id,
'body': json.loads(msg.get_body().decode('utf-8')),
...

编辑:

重读你的问题,在你的例子中,我所说的dict在你得到它的时候可能仍然是一个字符串。在这种情况下,你可能会遇到更多的问题,因为"body"格式不好。

如果是这种情况,您可以通过运行来清除"body"值周围那些讨厌的"标记

message = message.replace('"{', '{').replace('}"', '}')

在读取之前:

d = json.loads(message)

最新更新