如何在Spark SQL中向现有的Dataframe添加新列?



我使用DataFrame API

我有现有的DataFrame和一个列表对象(也可以使用Array)。如何将此列表作为新列添加到现有的DataFrame中?我应该为此使用类Column吗?

您可能应该将您的List转换为单列RDD,并对您选择的标准应用join。简单的数据帧转换:

 val df1 = sparkContext.makeRDD(yourList).toDF("newColumn")

如果您需要创建额外的列来执行连接,您可以添加更多的列,映射您的列表:

val df1 = sparkContext.makeRDD(yourList).map(i => (i, fun(i)).toDF("newColumn", "joinOnThisColumn")

我不熟悉Java版本,但您应该尝试使用JavaSparkContext.parallelize(yourList)并基于此文档应用类似的映射操作。

对不起,是我的错,我已经找到了可以解决我问题的withColumn(String colName, Column col)函数

这里有一个例子,我们有一个列日期,想要添加另一个列月。

Dataset<Row> newData = data.withColumn("month", month((unix_timestamp(col("date"), "MM/dd/yyyy")).cast("timestamp")));

希望有帮助!

干杯!

这个线程有点老了,但是我在使用Java时遇到了类似的情况。我认为最重要的是,对于我应该如何处理这个问题,存在着观念上的误解。

为了解决我的问题,我创建了一个简单的POJO来帮助数据集的新列(而不是试图在现有的基础上构建)。我认为从概念上讲,我不理解在需要添加额外列的初始读取期间生成Dataset是最好的。我希望这对将来的人有所帮助。

考虑以下内容:

        JavaRDD<MyPojo> myRdd = dao.getSession().read().jdbc("jdbcurl","mytable",someObject.getProperties()).javaRDD().map( new Function<Row,MyPojo>() {
                       private static final long serialVersionUID = 1L;
                       @Override
                       public MyPojo call(Row row) throws Exception {
                       Integer curDos = calculateStuff(row);   //manipulate my data
                       MyPojo pojoInst = new MyPojo();
                       pojoInst.setBaseValue(row.getAs("BASE_VALUE_COLUMN"));
                       pojoInst.setKey(row.getAs("KEY_COLUMN"));
                       pojoInst.setCalculatedValue(curDos);
                       return pojoInst;
                      }
                    });
         Dataset<Row> myRddRFF = dao.getSession().createDataFrame(myRdd, MyPojo.class);
//continue load or other operation here... 

相关内容

  • 没有找到相关文章

最新更新