我想用apachekafka创建一个简单的基于传感器数据的应用程序。我的问题很简单,参考了apachekafka的基本概念。我是apache kafka的初学者。
这是我的要求:
我通过一个包含不同数据的字节数组获取传感器数据。例如,该数组包含三个条目(温度1、温度2和电压(。这里有一个4个数组和值数据的例子。每个数组都有一个定义的时间戳。
阵列1:[1,2,3]
阵列2:[4,5,6]
阵列3:[7,8,9]
阵列4:[10,11,12]
现在,我想阅读这些数组,并为以下三个主题生成消息:
-
主题-temp1
-
topic-temp2
-
主题电压
生产顺序为:
- 读取数组1
- 生成主题-temp1的消息(值=1(
- 向主题temp2生成消息(值=2(
-
生成消息到主题电压(值=3(
-
读取阵列2
- 向主题temp1生成消息(值=4(
- 向主题temp2生成消息(值=5(
-
生成消息到主题电压(值=6(
-
读取阵列3
- 向主题temp1生成消息(值=7(
- 向主题temp2生成消息(值=8(
- 生成消息到主题电压(值=9(
。。。读取数组n。。。
在那之后,我有3个主题,里面有不同的数据:
-
主题时间1:1、4、7、10
-
主题时间2:2,5,8,11
-
主题电压:3、6、9、12
现在我的问题是:我想创建一个使用这3个主题的软件应用程序。我想在一张图中显示3个图形(温度1,温度2,电压(。y-axe是信号值,x-axe是时间戳。
我如何保证在同一时间戳获得消耗的值?只有I可以覆盖图形。
-
1,2,3
-
4,5,6
-
7,8,9
-
10,11,12
我应该使用Kafka-Stream API吗?一个输入流主题(字节数组(和三个输出流主题?如何确保这三种价值观共同产生并将共同消费?
或者我应该使用一个简单的消费者api并通过偏移值访问数据。因为条目(1,2,3((4,5,6(…的偏移量应该相同。。。,因为我按这个顺序生产?
提前谢谢!
我建议您使用一个传感器重新编码主题,有效载荷为传感器名称(最好是UUID(,这样您就可以知道是哪个传感器发送了数据,以及它生成的数据,作为一个完整的消息。
否则,纯粹通过时间戳连接数据似乎并不能防止失败。
您的消息密钥可以是UUID/名称,您可以将其扩展到数百个分区
您可以对发送的数据进行二进制编码,但我将使用JSON字符串来说明
{
"sensor_id" : "some unique name",
"temperatures" [1,2],
"voltage": 3
}
如果你想要三个主题,你可以很容易地使用Kafka Streams或KSQL创建三个输出主题
否则,继续创建单独的主题,但添加ID/名称,以便您可以加入,使用时间窗口(以秒或分钟为单位(,而不是试图调整滞后,因为一个事件只有微秒的时间,您无法加入消息