我在Java Spark应用程序中使用lambda函数的lambda函数和映射有一个问题。
我正在获取此运行时错误
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"
我正在使用以下课程和Spark 2.2.0。https://gitlab.com/opencell/test-bigdata
中提供了带有示例数据的完整示例Dataset<CDR> cdr = spark
.read()
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.csv("CDR_SAMPLE.csv")
.as(Encoders.bean(CDR.class));
long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
System.out.println("validated entries :" + v);
cdr文件定义是gitlab链接
编辑
val cdrCSVSchema = StructType(Array(
StructField("timestamp", DataTypes.TimestampType),
StructField("quantity", DataTypes.DoubleType),
StructField("access", DataTypes.StringType),
StructField("param1", DataTypes.StringType),
StructField("param2", DataTypes.StringType),
StructField("param3", DataTypes.StringType),
StructField("param4", DataTypes.StringType),
StructField("param5", DataTypes.StringType),
StructField("param6", DataTypes.StringType),
StructField("param7", DataTypes.StringType),
StructField("param8", DataTypes.StringType),
StructField("param9", DataTypes.StringType),
StructField("dateParam1", DataTypes.TimestampType),
StructField("dateParam2", DataTypes.TimestampType),
StructField("dateParam3", DataTypes.TimestampType),
StructField("dateParam4", DataTypes.TimestampType),
StructField("dateParam5", DataTypes.TimestampType),
StructField("decimalParam1", DataTypes.DoubleType),
StructField("decimalParam2", DataTypes.DoubleType),
StructField("decimalParam3", DataTypes.DoubleType),
StructField("decimalParam4", DataTypes.DoubleType),
StructField("decimalParam5", DataTypes.DoubleType),
StructField("extraParam", DataTypes.StringType)))
我使用此命令加载CSV文档
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")
然后尝试此命令来编码和运行lambda函数,但是我仍然会遇到错误
cdr.as[CDR].filter(c => c.timestamp != null).show
tl; dr 明确定义模式,因为输入数据集没有从(对于java.sql.Date
字段)推断类型的值。
对于您的情况,使用未遵循的数据集API可能是一个解决方案(也许是解决方法,老实说,我建议它避免从内部行格式避免不必要的挑选):
cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count
(这是Scala,我将其翻译成Java作为家庭练习)。
问题在于,您将inferSchema
选项与输入CDR_SAMPLE.csv
文件中不可用的大多数字段一起使用,该文件使大多数字段类型字符串(这是默认类型时,当无需可用值可推断出更具体的类型时)。
使java.sql.Date
类型的字段,即dateParam1
到 CC_5 to type String。
import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
option("inferSchema", "true").
option("delimiter", ";").
option("header", true).
csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- quantity: integer (nullable = true)
|-- access: string (nullable = true)
|-- param1: string (nullable = true)
|-- param2: string (nullable = true)
|-- param3: string (nullable = true)
|-- param4: string (nullable = true)
|-- param5: string (nullable = true)
|-- param6: string (nullable = true)
|-- param7: string (nullable = true)
|-- param8: string (nullable = true)
|-- param9: string (nullable = true)
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
|-- decimalParam1: string (nullable = true)
|-- decimalParam2: string (nullable = true)
|-- decimalParam3: string (nullable = true)
|-- decimalParam4: string (nullable = true)
|-- decimalParam5: string (nullable = true)
|-- extraParam: string (nullable = true)
请注意,感兴趣的字段,即dateParam1
到dateParam5
,都是字符串。
|-- dateParam1: string (nullable = true)
|-- dateParam2: string (nullable = true)
|-- dateParam3: string (nullable = true)
|-- dateParam4: string (nullable = true)
|-- dateParam5: string (nullable = true)
当您使用CDR
类中定义的编码器"假装"字段类型时,问题会浮出水面。
private Date dateParam1;
private Date dateParam2;
private Date dateParam3;
private Date dateParam4;
private Date dateParam5;
这是问题的根本原因。Spark可以从班级中推断出的东西有所不同。没有转换,代码就可以了,但是由于您坚持...
cdrs.as[CDR]. // <-- HERE is the issue = types don't match
filter(cdr => cdr.timestamp != null).
show // <-- trigger conversion
您在filter
操作员中访问哪个字段并不重要。问题是进行转换会导致执行不正确(和整个阶段Java代码生成)。
我怀疑Spark可以做很多事情,因为您请求使用数据集的inferSchema
,而无需用于类型推理的值。最好的选择是明确定义模式并使用schema(...)
操作员将其设置。