如何使用moto(Python AWS模拟库)配置AWS Firehose



我正在像这样创建我的消防水管资源,以及一个名为 self.problem_reporter_bucket_name 的 s3 存储桶。但是,在打电话给put_record之后,我的桶里什么都没有。也就是说,当我在存储桶上调用list_objects时,没有项目。

self.firehose.create_delivery_stream(
DeliveryStreamName=self.problem_reporter_delivery_stream_name,
S3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::' + self.problem_reporter_bucket_name,
'Prefix': 'myPrefix',
'BufferingHints': {
'SizeInMBs': 1,
'IntervalInSeconds': 60
},
'CompressionFormat': 'UNCOMPRESSED',
})
)

甚至moto是否支持我的用例?

moto似乎不支持Firehose。 我做了这样的事情来测试我的 Firehose 相关代码;

如果在测试的模块中定义了消防水带资源:

from unittest.mock import patch, MagicMock
@patch('mymodule.boto3')
def test_put_record(boto3):
record = {'ID": "123'}
my_put_firehose_record(record)
all_args = {
'DeliveryStreamName': 'my-test-firehose-stream',
'Record': record
}
boto3.client.assert_called_with('firehose')
boto3.client().put_record.assert_called_with(**all_args)

如果在测试的模块外部定义了消防站资源:

from unittest.mock import patch, MagicMock
def test_put_record():
firehose = MagicMock()
record = {'ID': '123'}
my_put_firehose_record_external(firehose, record)
all_args = {
'DeliveryStreamName': 'my-test-firehose-stream',
'Record': record
}
firehose.put_record.assert_called_with(**all_args)

最新更新