Apache Flink Kafka消费者问题



我在Kafka中有数据,无论Kafka是否发送数据,我都想读取数据,并过滤它们并返回JSON。

// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

properties.setProperty("group.id", "flink_consumer");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
//config.setWriteTimestampToKafka(true);
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {

return "Stream Value: " + value;
}
}).print();
env.execute();

案例1:当Kafka生产者将数据发送给Kafka时,我可以在控制台中看到值打印。-这很好,还可以。案例2:Kafka生产者停止发送数据,Kafka在主题中仍然有价值,但相同的代码没有返回任何数据。——这可能吗?

知道我哪里搞错了吗?

{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}

我想要过滤firstname并返回JSON。

我看到在数据流处理过程中有过滤选项。

如果您想从第一条消息开始,您应该设置consumer.setStartFromEarliest();。它将从第一条未被确认的信息开始阅读。

相关内容

  • 没有找到相关文章

最新更新