如何使用 Apache Flink 在此 avro 模式中应用过滤器



大家好,我正在像这样反序列化 Avro Kafka 消息:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer("conekta.public.person",
new KafkaGenericAvroDeserializationSchema("http://localhost:8081"), kafkaProps);

我收到的消息是这样的:

{"before": null, "after": {"id": 257, "status": "c4ca4238a0"}, "source": {"version": "0.9.4.Final", "connector": "postgresql", "name": "conekta", "db": "testdb", "ts_usec": 1579909929965704, "txId": 5847, "lsn": 294339488688, "schema": "public", "table": "person", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1579909930004}
{"before": null, "after": {"id": 258, "status": "c4ca4238a0"}, "source": {"version": "0.9.4.Final", "connector": "postgresql", "name": "conekta", "db": "testdb", "ts_usec": 1579910374459669, "txId": 5849, "lsn": 294473695272, "schema": "public", "table": "person", "snapshot": false, "last_snapshot_record": null, "xmin": null}, "op": "c", "ts_ms": 1579910374518}

我如何应用过滤器来获取 flink 消费者上 id <258 的所有记录。

我对Apache Flink很陌生。

任何帮助将不胜感激。

谢谢!

您可以在希望流结束时覆盖 KafkaGenericAvroDeserializationSchema 上的boolean isEndOfStream(GenericRecord nextElement),以返回 true。

相关内容

  • 没有找到相关文章

最新更新