Python AWS SQS mocking with MOTO



我正在尝试用moto模拟AWS SQS,下面是我的代码

from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs

@mock_sqs
def test_get_all_msg_from_queue():

#from myClass import get_msg_from_sqs

conn = boto3.client('sqs', region_name='us-east-1')

queue = conn.create_queue(QueueName='Test')

os.environ["SQS_URL"] = queue["QueueUrl"]

queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))


#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"]) 
assert resp is not None

当执行这个时,我得到以下错误

>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'

如果我尝试在SQS中发送消息的另一种方式(参见注释掉的代码#也试过了)然后在实际SQS调用我的方法get_msg_from_sqs的时候,我得到下面的错误

E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.

我在win10上用PyCharm运行它,moto版本设置为

moto = "^2.2.6"

我的代码如下

sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

我在这里错过了什么?

您的queue变量是由create_queue返回的字典:

queue = conn.create_queue(QueueName='Test')

它不是一个队列,因此你不能在它上面调用sendMessage

为此,您需要创建一个队列对象:

conn = boto3.client('sqs')
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueUrl"]
queue = sqs.Queue(queue_url)
queue.send_message()

根据@gshpychka,你需要看看create_queue是如何工作的。具体来说,它返回如下形式的字典:

反应结构(东西)

QueueUrl (string)——

创建的Amazon SQS队列的URL。

使用这个api你可以:

import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)

我同意这些文档令人困惑。这种混淆可能是因为sqs资源api,它的工作方式不同:

import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)

这可以工作,因为这个api返回一个Queue对象,这可能是您所期望的。

请注意@gshpychka已经在评论中给出了答案;我刚写出来了

最新更新