如何从SQS json MSG嵌套值中提取python?



my aws chain: lambda->sns->sqs->python script on ec2

python脚本给我错误,由于无法提取我需要的值,错误的结构,我认为,但我看不到错误。

我无法从"vote"SQS消息中的字段。怎么做呢?

测试事件结构(启用原始msg传递):

{
"body": {
"MessageAttributes": {
"vote": {
"Type": "Number",
"Value": "90"
},
"voter": {
"Type": "String",
"Value": "default_voter"
}
}
}
}

处理SQS msg的python脚本:

#!/usr/bin/env python3
import boto3
import json
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
queue = boto3.resource('sqs', region_name='us-east-1').get_queue_by_name(QueueName="erjan")
table = boto3.resource('dynamodb', region_name='us-east-1').Table('Votes')
def process_message(message):
try:
payload = json.loads(message.body) #unable to parse sqs json msg here
#payload = message.body
#payload = message['body']
voter = payload['MessageAttributes']['voter']['Value'] #here the exception raised!
vote  = payload['MessageAttributes']['vote']['Value']
logging.info("Voter: %s, Vote: %s", voter, vote)
store_vote(voter, vote)
update_count(vote)
message.delete()
except Exception as e:
print('x = msg.body')
x = (message.body)
print(x)
print('-------')
print('message.body')
print(message.body)
try:
vote = x['MessageAttributes']['vote']['Value']
logging.error("Failed to process message")
logging.error('------- here: ' + str(e))
logging.error('vote %d' % vote)
except TypeError:
logging.error("error catched")
def store_vote(voter, vote):
try:
logging.info('table put item.......')
print('table put item......')
response = table.put_item(
Item={'voter': voter, 'vote': vote}
)
except:
logging.error("Failed to store message")
raise
def update_count(vote):
logging.info('update count....')
print('update count....')
table.update_item(
Key={'voter': 'count'},
UpdateExpression="set #vote = #vote + :incr",
ExpressionAttributeNames={'#vote': vote},
ExpressionAttributeValues={':incr': 1}
)
if __name__ == "__main__":
while True:
try:
messages = queue.receive_messages()
except KeyboardInterrupt:
logging.info("Stopping...")
break
except:
logging.error(sys.exc_info()[0])
logging.info('here error - we continue')
continue
for message in messages:
process_message(message)

我得到的错误信息:

payload = json.loads(message)
File "/usr/lib64/python3.7/json/__init__.py", line 341, in loads
raise TypeError(f'the JSON object must be str, bytes or bytearray, '
TypeError: the JSON object must be str, bytes or bytearray, not sqs.Message
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "./processor.py", line 90, in <module>
process_message(message)
File "./processor.py", line 40, in process_message
vote = x['MessageAttributes']['vote']['Value']
TypeError: string indices must be integers

我理解它的sqs消息,它比json有不同的结构。我试着

json.loads(message.body)

,但是SQS MSG是用空MSG体生成的。即使上面的测试事件有"body"这是不一样的。当我只是调试和打印(消息)

它给出如下输出:

sqs.Message(queue_url='https://sqs.us-east-1.amazonaws.com/025416187662/erjan', receipt_handle='AQEBAz3JiGRwss1ROOr2R8GkpBWwr7tJ1tUDUa7JurX7H6SxoF6gyj7YOxoLuU1/KcHpBIowon12Vle97mJ/cZFUIjzJon78zIIcVSVLrZbKPBABztUeE/Db0ALCMncVXpHWXk76hZVLCC+LHMsi8E5TveZ7+DbTdyDX
U6djTI1VcKpUjEoKLV9seN6JIEZV35r3fgbipHsX897IqTVvjhb0YADt6vTxYQTM1kVMEPBo5oNdTWqn6PfmoYJfZbT1GHMqphTluEwVuqBzux2kPSMtluFk3yk4XXwPJS304URJ7srMksUdoVTemA56OsksVZzXT4AcS8sm8Y3SO2PLLjZSV+7Vdc6JZlX7gslvVSADBlXw5BJCP/Rb9mA2xI9FOyW4')

我认为自动生成的SQS MSG隐藏在某个地方

这个解决方案来自于这个例子-在最后它展示了如何接收和处理消息- https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html

在main中,我认为队列是空的,因为当从队列中检索它时,我没有指定var 'messages'属性名称。

if __name__ == "__main__":
while True:
try:
messages = queue.receive_messages() #this was empty
messages = queue.receive_messages(MessageAttributeNames=['vote','voter']) #this is the solution - we have to specify the msg attr names
except KeyboardInterrupt:
logging.info("Stopping...")
break
except:
logging.error(sys.exc_info()[0])
continue
for message in messages:
process_message(message)

因为现在在调试中它确实显示了MSG属性:

def process_message(message):
try:
payload = json.loads(message.body)
print(type(payload))
print(payload)
print('-------------------MSG ATTR----------------------')
print(message.message_attributes) #shows actual values!

最新更新