让我们假设,我有以下数据集:
+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
| 0 | 1 | 0.15 | 2.132 |
| 1 | 2 | 0.72 | 0.15 |
| 2 | 12 | error | |
| 3 | 75 | error | |
+--------+--------+--------+--------+
如您所见,field3可能包含double
或string
值。此处唯一可能的string
值是错误。在error
值field4根本不包含任何值的情况下(事实上,field3之后有15个字段,为了可读性,我省略了这些字段,并对它们应用相同的规则(
因此,我正在努力实现以下目标:
- 使用窄模式读取输入(仅包含前三个字段的描述(
- 过滤器错误
- 应用由所有字段组成的新架构
因此,读数如下:
val er_schema =
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
StructField("field3", StringType, true)))
val c_schema =
StructType(
Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
// StringType only for now, DoubleType would be used instead
StructField("field3", StringType, true),
StructField("field4", StringType, true)))
val raw = sc.read.schema(er_schema).csv(PATH)
val correctOnly = filterErr(raw)
ss.createDataframe(
correctOnly,
c_schema))
此代码出现异常:java.lang.ArrayIndexOutOfBoundsException:3
据我所知,这是因为底层RDD只由3个第一字段组成。
所以,问题来了:是否可以使用缩小的(在字段数量减少的意义上(模式,然后将数据帧转换为正常的(包含所有字段(模式?
编辑1:源文件采用CSV
格式,如下所示:
0,1,0.15,2.132
1,2,0.72,0.15
2,12,error
3,75,error
我想到的可能的解决方案是使用RDD并在过滤错误行后应用完整的模式,但我想知道是否可以只使用数据帧来简化
编辑2:我想要的结果:
正确的一个:
+--------+--------+--------+--------+
| field1 | field2 | field3 | field4 |
+--------+--------+--------+--------+
| 0 | 1 | 0.15 | 2.132 |
| 1 | 2 | 0.72 | 0.15 |
+--------+--------+--------+--------+
具有正确的数据类型(字段field3
和field4
为DoubleType(
EDIT 3:这里的主要问题是field3
列,它不仅可以包含double
值,还可以包含strings
值。我想去掉具有string
值的行,只保留具有双值的行。我尝试使用两种不同的模式,但都不起作用。
您可以通过将mode
设置为DROPMALFORMED
来删除不遵循指定模式的行。读取数据时,请使用所需数据帧的模式:
val schema = StructType(Array(
StructField("field1", IntegerType, true),
StructField("field2", IntegerType, true),
StructField("field3", DoubleType, true),
StructField("field4", DoubleType, true)
))
然后读取csv文件:
val df = spark.read.
.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("/path/to/file")
这样,所有没有正确数据类型或错误行数的行都将被丢弃。