Spark insert to hive -JavaPairDStream



Spark 插入到hive:

  finalRDDStream.transform(new Function<JavaPairRDD<OutcomeKey,Tuple2<String,String>>, JavaRDD<Joinkey>>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public JavaRDD<Joinkey> call(JavaPairRDD<OutcomeKey, Tuple2<String, String>> arg0)  throws Exception {

                        JavaRDD<Joinkey> a = arg0.map(x -> {
                            OutcomeKey key = x._1;
                            Joinkey joinkey= new Joinkey();     
                            joinkey.setDate(key.getDate().toString());
                            joinkey.setPublicIP(key.getPublicIP());
                            joinkey.setPrivateIP(key.getPrivateIP());
                            joinkey.setMsisdn(x._2._1);
                            joinkey.setUrlVisited(x._2._2);
                            return joinkey;
                            });

                        DataFrame fileDF = sqlContext.createDataFrame(a, Joinkey.class);
                        fileDF .insertInto("test.fileDF ");
                        fileDF .show();
                        return null;
                    }
                });  

我正在尝试将 JoinKey 类对象插入到配置单元。这样我得到错误。

16/06/03 03:31:42 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
        at scala.Predef$.require(Predef.scala:233)
        at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)

有没有其他方法可以插入蜂巢。

试试这个方式.....这将插入到配置单元中

JavaDStream<Joinkey> p = finalRDDStream.transform(new Function<JavaPairRDD<OutcomeKey,Tuple2<String,String>>, JavaRDD<Joinkey>>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public JavaRDD<Joinkey> call(JavaPairRDD<OutcomeKey, Tuple2<String, String>> arg0)  throws Exception {

                        JavaRDD<Joinkey> a = arg0.map(x -> {
                            OutcomeKey key = x._1;
                            Joinkey joinkey= new Joinkey();     
                            joinkey.setDate(key.getDate().toString());
                            joinkey.setPublicIP(key.getPublicIP());
                            joinkey.setPrivateIP(key.getPrivateIP());
                            joinkey.setMsisdn(x._2._1);
                            joinkey.setUrlVisited(x._2._2);
                            return joinkey;
                            });
                         a.saveAsTextFile("file:////home/aman/temp");

                        DataFrame fileDF= sqlContext.createDataFrame(a, Joinkey.class);
                        fileDF.insertInto("test.raduisfile");
                        fileDF.show();
                        return null;
                    }
                });
             System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
             p.print();

相关内容

  • 没有找到相关文章

最新更新