SPARK CSV-未找到用于实际参数的适用构造函数/方法



我在Java Spark应用程序中使用lambda函数的lambda函数和映射有一个问题。

我正在获取此运行时错误

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)"

我正在使用以下课程和Spark 2.2.0。https://gitlab.com/opencell/test-bigdata

中提供了带有示例数据的完整示例
Dataset<CDR> cdr = spark
            .read()
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("delimiter", ";")
            .csv("CDR_SAMPLE.csv")
            .as(Encoders.bean(CDR.class));
    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count();
    System.out.println("validated entries :" + v);

cdr文件定义是gitlab链接

编辑

val cdrCSVSchema = StructType(Array(
  StructField("timestamp", DataTypes.TimestampType),
  StructField("quantity", DataTypes.DoubleType),
  StructField("access", DataTypes.StringType),
  StructField("param1", DataTypes.StringType),
  StructField("param2", DataTypes.StringType),
  StructField("param3", DataTypes.StringType),
  StructField("param4", DataTypes.StringType),
  StructField("param5", DataTypes.StringType),
  StructField("param6", DataTypes.StringType),
  StructField("param7", DataTypes.StringType),
  StructField("param8", DataTypes.StringType),
  StructField("param9", DataTypes.StringType),
  StructField("dateParam1", DataTypes.TimestampType),
  StructField("dateParam2", DataTypes.TimestampType),
  StructField("dateParam3", DataTypes.TimestampType),
  StructField("dateParam4", DataTypes.TimestampType),
  StructField("dateParam5", DataTypes.TimestampType),
  StructField("decimalParam1", DataTypes.DoubleType),
  StructField("decimalParam2", DataTypes.DoubleType),
  StructField("decimalParam3", DataTypes.DoubleType),
  StructField("decimalParam4", DataTypes.DoubleType),
  StructField("decimalParam5", DataTypes.DoubleType),
  StructField("extraParam", DataTypes.StringType)))

我使用此命令加载CSV文档

val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv")

然后尝试此命令来编码和运行lambda函数,但是我仍然会遇到错误

cdr.as[CDR].filter(c => c.timestamp != null).show

tl; dr 明确定义模式,因为输入数据集没有从(对于java.sql.Date字段)推断类型的值。

对于您的情况,使用未遵循的数据集API可能是一个解决方案(也许是解决方法,老实说,我建议它避免从内部行格式避免不必要的挑选):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count

(这是Scala,我将其翻译成Java作为家庭练习)。

问题在于,您将inferSchema选项与输入CDR_SAMPLE.csv文件中不可用的大多数字段一起使用,该文件使大多数字段类型字符串(这是默认类型时,当无需可用值可推断出更具体的类型时)。

>

使java.sql.Date类型的字段,即dateParam1到 CC_5 to type String。

import org.opencell.spark.model.CDR
import org.apache.spark.sql.Encoders
implicit val cdrEnc = Encoders.bean(classOf[CDR])
val cdrs = spark.read.
  option("inferSchema", "true").
  option("delimiter", ";").
  option("header", true).
  csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv")
scala> cdrs.printSchema
root
 |-- timestamp: timestamp (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- access: string (nullable = true)
 |-- param1: string (nullable = true)
 |-- param2: string (nullable = true)
 |-- param3: string (nullable = true)
 |-- param4: string (nullable = true)
 |-- param5: string (nullable = true)
 |-- param6: string (nullable = true)
 |-- param7: string (nullable = true)
 |-- param8: string (nullable = true)
 |-- param9: string (nullable = true)
 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)
 |-- decimalParam1: string (nullable = true)
 |-- decimalParam2: string (nullable = true)
 |-- decimalParam3: string (nullable = true)
 |-- decimalParam4: string (nullable = true)
 |-- decimalParam5: string (nullable = true)
 |-- extraParam: string (nullable = true)

请注意,感兴趣的字段,即dateParam1dateParam5,都是字符串。

 |-- dateParam1: string (nullable = true)
 |-- dateParam2: string (nullable = true)
 |-- dateParam3: string (nullable = true)
 |-- dateParam4: string (nullable = true)
 |-- dateParam5: string (nullable = true)

当您使用CDR类中定义的编码器"假装"字段类型时,问题会浮出水面。

private Date dateParam1;
private Date dateParam2;
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

这是问题的根本原因。Spark可以从班级中推断出的东西有所不同。没有转换,代码就可以了,但是由于您坚持...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match
  filter(cdr => cdr.timestamp != null).
  show // <-- trigger conversion

您在filter操作员中访问哪个字段并不重要。问题是进行转换会导致执行不正确(和整个阶段Java代码生成)。

我怀疑Spark可以做很多事情,因为您请求使用数据集的inferSchema,而无需用于类型推理的值。最好的选择是明确定义模式并使用schema(...)操作员将其设置。

相关内容

  • 没有找到相关文章

最新更新