早些时候,我问Flink是否可以从无到有地创建一些东西,答案是肯定的。现在,我将详细研究Flink SQL的功能。
在SQL中,这种类型的挑战有时很容易(例如SELECT 1
在MySQL等常规引擎中工作(,但有时也不可能,例如Apache Pig无法从无到有地创建东西。
我不确定Flink SQL,当然,它的想法是,当你想运行快速测试或构建一个可移植的示例时,它可以从无到有地创建一些东西,从而消除对其他解决方案的任何硬依赖。
为了简单起见:假设我想每秒至少生成一条消息,并且不介意里面有什么。
我的第一个想法:
- 一个简单的select语句不会起作用,因为当没有任何可选择的内容时,您不会得到任何输出
- 从概念上讲,一个时间窗口内的计数(*(可以工作,但我还没有让它工作。也许这是一个特性,如果没有什么可计数的,输出将为零(而不是0(
除了窗口化,我在Flink SQL中没有看到任何具有时间概念的东西,所以在外部,我怀疑这是不可能的。
我不想找什么:
- 从一条消息开始,用它做更多的消息。假设我有一个kafka主题,一条消息进入,那么继续循环它并创建无限的消息可能是微不足道的。但我的问题更多的是关于如何在卡夫卡空着的时候开始
- Flink代码不是SQL,甚至不是其他工具
甚至比datagen连接器更好的是flink faker,flink SQL Cookbook中的许多例子都使用了它。我想你会发现这些例子特别有趣。
Flink SQL对处理事件时间和系统时间都有强大的支持,包括水印。例如,在处理流式事件时间窗口和间隔联接时,Flink SQL使用水印来确定哪些记录可以从Flink状态过期。
有关利用时间的其他Flink SQL操作,请参阅时间版本表、模式检测、时间联接和基于时间的查找联接。
您可能会发现"datagen"连接器对此很有用。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/datagen.html
例如:
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)