我正在尝试使用 Kafka 消息(作为 Flink 1.10 API StreamSource(在 Elasticsearch 中聚合数据。数据以动态的JSON格式接收,下面给出了示例。我想通过唯一 ID 将多个记录合并到单个文档中。 数据按顺序排列,它是时间序列数据。
源汇卡夫卡和目标汇松紧每个 7.6.1 6
我没有找到任何可以在下面的问题陈述中使用的好例子。
Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF"
}
}
Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field3" : "GHY"
}
}
Result :
{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
"Field1" : "ABC",
"Field2" : "DEF",
"Field3" : "GHY"
}
}
以下是版本详细信息:
- 狐狸 1.10
- Flink-kafka-connector 2.11
- Flink-Elasticsearch-connector 7.x 卡
- 夫卡 2.11
- JDK 1.8
你所要求的可以被描述为某种连接,有很多方法可以用 Flink 来实现这一点。在 Apache Flink 训练中有一个有状态扩充的示例,它展示了如何使用可以帮助您入门的RichFlatMapFunction
实现类似的联接。您需要先通读相关的培训材料 - 至少是关于数据管道和ETL的部分。
使用此方法最终要做的是按 ID 对流进行分区(通过keyBy
(,然后使用键分区状态(在这种情况下可能MapState
,假设每个 ID 要存储多个属性/值对(来存储来自记录(如记录 1(的信息,直到您准备好发出结果。
顺便说一句,如果密钥集是无限的,您需要注意不要永远保持这种状态。在不再需要状态时清除状态(如此示例所示(,或使用状态 TTL 安排其最终删除。
有关 Flink 中其他类型的连接的更多信息,请参阅本答案中的链接。