sqs中的一些芹菜任务永远挂起,我想在删除之前阅读这些消息(任务(。在去sqs控制台时,我能够看到我尝试用解码的编码消息
value = base64.b64decode(value.encode('utf-8')).decode('utf-8')
这给了我dict转储与密钥
['body', 'headers', 'content-type', 'properties', 'content-encoding']
在这个dict中,身体看起来像编码的我试着用同样的解码
value = base64.b64decode(value.encode('utf-8')).decode('utf-8')
但它给出了错误的说法UnicodeDecodeError:"utf8"编解码器无法解码位置1中的字节0x87:无效的起始字节
我是不是错过了什么?如何解码此消息?有什么办法解码它吗?
似乎"Celery"使用"pickle.dump"将任务的有效负载转换为字节,然后编码为base64。通过反向操作,我们再次获得有效载荷。
import base64
import boto3
import pickle
queue_name = 'your-queue-name'
sqsr = boto3.resource('sqs')
queue = sqsr.get_queue_by_name(QueueName=queue_name)
for message in queue.receive_messages(MaxNumberOfMessages=10):
print(f'{message.message_id} >>> {message.receipt_handle}'
f' >>> {message.body} >>> {message.message_attributes}')
body_dict = json.loads(base64.b64decode(message.body))
celery_payload = pickle.loads(base64.b64decode(body_dict.get('body')))
print(celery_payload)