Apache Flink:AllWindowed Stream上的窗口功能 - 结合Kafka主题



我正在尝试使用单个Kafka消费者在主题列表中使用两个Kafka主题,进一步将流中的JSON字符串转换为POJO。然后,通过Keyby(在事件时间字段上(加入它们,并将它们合并为单个脂肪JSON,我正计划使用窗口流并在窗口流上应用窗口功能。假设是主题A&主题-B可以在活动时间内加入,只有一对(主题A(JSON(,主题B(JSON(将在同一活动时间内出现。因此,计划在活动时间使用Coutwindow(2(键比。p>我有几个问题;

  1. 方法是合并主题和创建单个JSON的方法吗?
  2. 所有窗口流上的窗口函数似乎都无法正常工作;任何指针都将不胜感激。

代码段:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
logger.info("Flink Stream Window Charger has started");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");
properties.setProperty("group.id", "group-0011");
properties.setProperty("auto.offset.reset", "smallest");

List < String > names = new ArrayList < > ();

names.add("Topic-A");
names.add("Topic-B");

DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));
DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
List < String > where = new ArrayList < String > ();
AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);
DataStream < String > data_charging = data_window.apply(new MyWindowFunction());
data_charging.addSink(new SinkFunction < String > () {

public void invoke(String value) throws Exception {

  // Yet to be implemented - Merge two POJO into one 
 }
});

try
{
 env.execute();
} catch (Exception e)
{
 return;
}
}
}
class Tokenizer implements FlatMapFunction < TopicPojo, String > {
 private static final long serialVersionUID = 1 L;
 @Override
 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {
  ObjectMapper mapper = new ObjectMapper();
  out.collect(mapper.writeValueAsString(value));
 }
}
class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {
 @Override
 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)
 throws Exception {
  int count = 0;
  for (TopicPojo in : arg2) {
   count++;
  }
  // Test Result - TO be modified
  out.collect("Window: " + window + "count: " + count);

 }
}
class Deserializer implements MapFunction < String, TopicPojo > {
 private static final long serialVersionUID = 1 L;
 @Override
 public TopicPojo map(String value) throws IOException {
  // TODO Auto-generated method stub
  ObjectMapper mapper = new ObjectMapper();
  TopicPojo obj = null;
  try {

   System.out.println(value);

   obj = mapper.readValue(value, TopicPojo.class);

  } catch (JsonParseException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");

  } catch (JsonMappingException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");
  } catch (IOException e) {

   // TODO Auto-generated catch block

   throw new IOException("Failed to deserialize JSON object.");
  }
  return obj;
 }
} 

我得到 -

该类型的AllWindowedStream中的方法应用方法(AllWindowFunction(不适用于参数(myWindowFunction(错误。

AllwindowedStream是一个非钥匙流,因此AllWindowedStreams的应用方法没有密钥参数。由于您正在窗口键入键流,因此您的data_window应该是键盘。

相关内容

  • 没有找到相关文章

最新更新