所以我有一个用PyFlink SQL API编写的简单聚合作业。该作业从AWS kinesis读取数据并将结果输出到kinesis。
我很好奇我是否可以用say-pytest对我的管道进行单元测试?我想我需要用文件系统连接器模拟源和接收器?但是如何创建本地Flink会话来在pytest中运行作业?我们这里有最佳实践建议吗?
谢谢!
您应该看看PyFlink本身的测试是如何实现的。它为实现表测试用例建立了各种基类;PyFlinkStreamTableTestCase
可能是一个很好的起点。使用这个可以编写像我从这里复制的这样的测试:
def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sinks",
source_sink_utils.TestAppendSink(field_names, field_types))
result = t_env.sql_query("select a + 1, b, c from %s" % source)
result.execute_insert("sinks").wait()
actual = source_sink_utils.results()
expected = ['+I[2, Hi, Hello]', '+I[3, Hello, Hello]']
self.assert_equals(actual, expected)
还有更多的测试是从哪里来的。