在JavaDStream中查找每个RDD的消息计数



嗨,我正在尝试将 Kafka 与 Spark 流媒体集成。

我想在JavaDStream中查找每个RDD的消息计数。

请找到下面的代码并给我一些建议。

public class App {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception{
    SparkConf conf = new SparkConf()
            .setAppName("Streamingkafka")
            .setMaster("local[*]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    Set<String> topics = Collections.singleton("data_one");
    JavaPairInputDStream<String,String> directKafkaStream = KafkaUtils.createDirectStream(ssc,String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
    JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
       @Override
       public String call(Tuple2<String, String> tuple2) {
         return tuple2._2();
       }
     });
 msgDataStream.print();
 msgDataStream.count();
  ssc.start();            
  ssc.awaitTermination();  
  }
  }

提前谢谢。

        JavaDStream<String> msgDataStream = directKafkaStream.map(new Function<Tuple2<String, String>, String>() {
               @Override
               public String call(Tuple2<String, String> tuple2) {
                 return tuple2._2();
               }
             });
    msgDataStream.foreachRDD(x -> System.out.println(x.count()));           
      ssc.start();            
      ssc.awaitTermination();    

我正在以lambda的方式做foreachRDD。如果您使用的是以前版本的 java,请使用下面的 foreach 代码。

msgDataStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            @Override
            public void call(JavaRDD<String> arg0) throws Exception {
                System.out.println(arg0.count());
            }
        }
        );

最新更新