我在Kafka中有数据,无论Kafka是否发送数据,我都想读取数据,并过滤它们并返回JSON。
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
//config.setWriteTimestampToKafka(true);
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
案例1:当Kafka生产者将数据发送给Kafka时,我可以在控制台中看到值打印。-这很好,还可以。案例2:Kafka生产者停止发送数据,Kafka在主题中仍然有价值,但相同的代码没有返回任何数据。——这可能吗?
知道我哪里搞错了吗?
{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}
我想要过滤firstname
并返回JSON。
我看到在数据流处理过程中有过滤选项。
如果您想从第一条消息开始,您应该设置consumer.setStartFromEarliest();
。它将从第一条未被确认的信息开始阅读。