在 Spark 数据集中使用 custome UDF withColumn<Row>;java.lang.String 不能转换为 org.apache.spark.sql.Row



我有一个包含许多字段的JSON文件。我在java中使用spark的数据集读取文件。

  • 火花版本 2.2.0

  • Java JDK 1.8.0_121

下面是代码。

SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.read().json("jsonfile.json");

我想将 withColumn 函数与自定义 UDF 一起使用来添加新列。

UDF1 someudf = new UDF1<Row,String>(){
public String call(Row fin) throws Exception{
String some_str = fin.getAs("String");
return some_str;
}
};
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();

运行上述代码时出现转换错误。 java.lang.String 不能强制转换为 org.apache.spark.sql.Row

问题:

1 - 读取行数据集是唯一的选择吗?我可以将 df 转换为字符串的 df。但我将无法选择字段。

2 - 已尝试定义用户定义的数据类型,但失败。我无法用这个自定义的UDDatatype注册UDF。我在这里需要用户定义的数据类型吗?

3 - 主要问题是,如何从字符串转换为行?

部分日志复制如下:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at Risks.readcsv$1.call(readcsv.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512)
... 16 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string)

您的帮助将不胜感激。

您会收到该异常,因为UDF将在列的数据类型上执行,该数据类型不是Row。考虑我们有Dataset<Row> ds它有两列col1col2都是字符串类型。现在,如果我们想使用col2的值转换为大写UDF.

我们可以像下面这样注册并致电UDF

spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();

或使用withColumn

ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();

UDF应该是这样的。

private static UDF1 toUpper = new UDF1<String, String>() {
public String call(final String str) throws Exception {
return str.toUpperCase();
}
};

改进@abaghel写的内容。 如果使用以下导入

import org.apache.spark.sql.functions;

使用withColumn,代码应如下所示:

ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();

最新更新