Spark:循环通过JavaRDD Tuple - Java



我正在尝试使用元组遍历JavaRDD,但是我在如何正确循环JavaRDD并将我的rdd_value设置为等于rdd_array中包含的元组时遇到了一些问题,无论b迭代器计数器值是多少。

这是我的代码

//JavaRDD that contains Tuples
JavaRDD<Tuple5<Long, String, Float, Float, String>> rdd_array
//For loop to loop through rdd_array
for (int b=0;b<rdd_array.count();b++){
        //Need help on this line, how do I set rdd_row equal to the tuple5 in rdd_array
        Tuple5<Long, String, Float, Float, String> Value = rdd_array.;
        String id=Value._1().toString();
        String text=Value._2().toString();
        String negative_tweets=Value._3().toString();
        String positive_tweets=Value._4().toString();
        String score_tweets=Value._5().toString();
System.out.println(id+text+negative_tweets+positive_tweets+score_tweets)
}

编辑:伙计们,我真的在这里尝试,在 JavaRDD 上接收错误无法解决方法的 foreachloop 采取了 zero323 建议。

rdd_array.foreach(new Function<Tuple5<Long, String, Float, Float, String>, Void>(){
            @Override
            public Void call(Tuple5<Long, String, Float, Float, String> rdd){
                String id=rdd._1().toString();
                String text=rdd._2().toString();
                String negative_tweets=rdd._3().toString();
                String positive_tweets=rdd._4().toString();
                String score_tweets=rdd._5().toString();
                System.out.println(id+text+negative_tweets+positive_tweets+score_tweets);
                return null;
            }
        });

请注意,在你自己尝试答案时,你实际上是在处理看起来像<K,V>的东西,K(键(是长,V(值(是元组7。这与您在原始问题中提出的内容大不相同。最有可能的是,所有这些都可以通过JavaPairRDD更有效地实现。

从RDD开始,您可以使用以下方法将JavaRDD转换为JavaPairRDD

JavaPairRDD<Long,Tuple7<...>> prdd = rdd.mapToPair(...)

这将包括基于密钥的重新分区。

此外,使用 .foreach 进行最终处理会将 rdd 结果序列化到驱动程序并执行包含的逻辑串行。您可能需要考虑使用filter,reduce和其他范例将大部分逻辑推向RDD的上游。还可以考虑使用 .foreachPartition 来实现某种级别的并行性,并在任务节点上而不是在驱动程序上进行计算。

请注意,使用 Java 8 lambda 语法,您可以编写更紧凑的大部分逻辑:

prdd.foreach((k,v)->{
  System.out.println("Printing: " + k + ", " + v._1() ...);
});

现在,要注意另一件事...使用更专用的类而不是通用Tuple7<>不是更容易吗?至少它可能看起来像这样:

public class UserLocation {
  public long id;
  public String text;
  public String createdat;
  public String userlocation;
  public String name;
  public String username;
  public String lat;
  public String lon;
  @Override
  public String toString() {
    return Long.toString(id)+text+createdat+userlocation+name+username+lat+lon;
  }
}

然后,您的处理可能如下所示:

JavaRDD<UserLocation> jrdd;
JavaPairRDD<Long,UserLocation> jprdd = jrdd.mapToPair((v)->new Tupple2<>(v.id,v));
...
jprdd
  .foreach((k,v)->{
     System.out.println(v.toString());
  });

通常,在此示例中,将数据映射到JavaPairRDD<Long,UserLocation>没有实际用途。但是,您的数据将根据 Long 键重新分区,并且可以具有更好的并行性。

使用以下

函数解决

 rdd_array.foreach(new VoidFunction<Tuple2<Long, Tuple7<String, String, String, String, String, String, String>>>() {
        @Override
        public void call(Tuple2<Long, Tuple7<String, String, String, String, String, String, String>> rdd_val) throws Exception {
            //new Tuple7<String, String, String, String, String, String, String>(text,created_at,userlocation,name,username,lat,lon);
            String id = rdd_val._1().toString();
            String text = rdd_val._2()._1().toString();
            String createdat = rdd_val._2()._2().toString();
            String userlocation = rdd_val._2()._3().toString();
            String name = rdd_val._2()._4().toString();
            String username = rdd_val._2()._5().toString();
            String lat = rdd_val._2()._6().toString();
            String lon = rdd_val._2()._7().toString();

            System.out.println("Printing Values EXTRA: "+id+text+createdat+userlocation+name+username+lat+lon);
        }
    });

相关内容

  • 没有找到相关文章

最新更新