有人能分享将回收流写入kafka接收器的工作示例吗?
我试了以下方法,但没有效果。
DataStream<Tuple2<Boolean, User>> resultStream =
tEnv.toRetractStream(result, User.class);
resultStream.addsink(new FlinkKafkaProducer(OutputTopic, new ObjSerializationSchema(OutputTopic),
props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
通常,最简单的解决方案是执行类似的smth
resultStream.map(elem -> elem.f1)
这将允许您将User
对象写入Kafka。
但从业务角度来看,这并不是那么简单,或者至少这取决于用例。Kafka是一个仅追加的日志,回收流表示ADD、UPDATE和DELETE操作。因此,虽然上面的解决方案允许您将数据写入Kafka,但Kafka中的结果可能不能正确地表示实际的计算结果,因为它们不会表示更新和删除操作。
为了能够将实际正确的计算结果写入Kafka,您可以尝试做以下事情之一:
- 如果您知道您的用例永远不会导致任何DELETE或UPDATE操作,那么您可以安全地使用上面的解决方案
- 如果重复项可能只在某些固定的时间间隔内产生(例如,记录可能只在产生后1小时后更新/删除(,则您可能希望使用窗口聚合所有更新,并将一个最终结果写入Kafka
- 最后,您可以扩展
User
类来添加一个字段,该字段标记该记录是否是回收操作,并在向Kafka主题写入数据时保留该信息。这意味着您必须在下游处理所有可能的UPDATE或DELETE操作(在该数据的使用者中(
最简单的解决方案是使用upsert-kafka连接器作为表接收器。这是为了使用回收流并将其写入kafka。
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/upsert-kafka.html