将数据帧模式从窄模式更改为完整模式



让我们假设,我有以下数据集:

+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
|      0 |      1 | 0.15   |  2.132 |
|      1 |      2 | 0.72   |   0.15 |
|      2 |     12 | error  |        |
|      3 |     75 | error  |        |
+--------+--------+--------+--------+

如您所见,field3可能包含doublestring值。此处唯一可能的string值是错误。在errorfield4根本不包含任何值的情况下(事实上,field3之后有15个字段,为了可读性,我省略了这些字段,并对它们应用相同的规则(

因此,我正在努力实现以下目标:

  1. 使用窄模式读取输入(仅包含前三个字段的描述(
  2. 过滤器错误
  3. 应用由所有字段组成的新架构

因此,读数如下:

val er_schema = 
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
StructField("field3", StringType, true)))
val c_schema = 
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
// StringType only for now, DoubleType would be used instead
StructField("field3", StringType, true),
StructField("field4", StringType, true)))
val raw = sc.read.schema(er_schema).csv(PATH)
val correctOnly = filterErr(raw)
ss.createDataframe(
correctOnly,
c_schema))

此代码出现异常:java.lang.ArrayIndexOutOfBoundsException:3

据我所知,这是因为底层RDD只由3个第一字段组成。

所以,问题来了:是否可以使用缩小的(在字段数量减少的意义上(模式,然后将数据帧转换为正常的(包含所有字段(模式?

编辑1:源文件采用CSV格式,如下所示:

0,1,0.15,2.132
1,2,0.72,0.15
2,12,error
3,75,error

我想到的可能的解决方案是使用RDD并在过滤错误行后应用完整的模式,但我想知道是否可以只使用数据帧来简化

编辑2:我想要的结果:

正确的一个:

+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
|      0 |      1 | 0.15   |  2.132 |
|      1 |      2 | 0.72   |   0.15 |
+--------+--------+--------+--------+

具有正确的数据类型(字段field3field4为DoubleType(

EDIT 3:这里的主要问题是field3列,它不仅可以包含double值,还可以包含strings值。我想去掉具有string值的行,只保留具有双值的行。我尝试使用两种不同的模式,但都不起作用。

您可以通过将mode设置为DROPMALFORMED来删除不遵循指定模式的行。读取数据时,请使用所需数据帧的模式:

val schema = StructType(Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
StructField("field3", DoubleType, true),
StructField("field4", DoubleType, true)
))

然后读取csv文件:

val df = spark.read.
.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("/path/to/file")

这样,所有没有正确数据类型或错误行数的行都将被丢弃。

最新更新