如何在python上模拟单元测试的aws时间流



我在网上找到了一个测试aws时间流服务的例子;当我尝试启动测试时,我会得到一个NoRegionError

#test_timestream.py
import logging
from datetime import datetime
import pandas as pd 
import awswrangler as wr
import logging
from datetime import datetime
import pandas as pd
import awswrangler as wr
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
def test_basic_scenario(timestream_database_and_table):
name = timestream_database_and_table
df = pd.DataFrame(
{
"time": [datetime.now(), datetime.now(), datetime.now()],
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.2],
}
)
rejected_records = wr.timestream.write(
df=df,
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
)
assert len(rejected_records) == 0
df = wr.timestream.query(
f"""
SELECT
1 as col_int,
try_cast(now() as time) as col_time,
TRUE as col_bool,
current_date as col_date,
'foo' as col_str,
measure_value::double,
measure_name,
time
FROM "{name}"."{name}"
ORDER BY time
DESC LIMIT 10
"""
)
assert df.shape == (3, 8)

错误在此处输入图像描述

如何解决这个问题?因为即使是";moto";图书馆不提供模拟aws服务吗?

如果您查看时间流的AWS DataWrangler示例:

https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.timestream.write.html

当您调用构造函数时,您可以传递boto3_session,或者它将使用默认会话:

boto3_session (boto3.Session(), optional) – Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

boto3配置需要指定默认区域,或者需要在boto3会话中声明。

https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html

import boto3
from botocore.config import Config
my_config = Config(
region_name = 'us-west-2',
signature_version = 'v4',
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
client = boto3.client('kinesis', config=my_config)

最新更新