我想通过键减少数据框架。减少逻辑非常复杂,需要更新大约10-15个字段。这就是为什么我想将数据框架转换为数据集并减少Java Pojos。
问题
问题是,在groupByKey-reduceByKey
之后,我得到了一些非常奇怪的值。Encoders.bean(Entity.class)
读取正确的数据。请参阅代码示例部分。
Workartounds
用Encoders.kryo
替换Encoders.bean
不起作用,例外:
Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.
我也看到了这个工厂,但是Encoders.product
需要TypeTag
。我不知道如何在Java代码上创建TypeTag
。
代码示例
Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
.as(Encoders.bean(Entity.class));
// shows correct numbers
ds.show(10, false);
// correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages |
+-----------+-----------+-----+-------------------------------+
|A1 |S1 |1 |[[2018-10-29 23:11:44, 12.5]] |
|A2 |S1 |1 |[[2018-10-30 14:43:05, 13.2]] |
|A3 |S1 |2 |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+
//after reduce shows wrong numbers
ds
.groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
.reduceGroups((e1, e2) -> e1)
.map(tuple -> tuple._2, Encoders.bean(Entity.class))
.show(10, false);
// wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages |
+-----------+-----+-----------+---------------------------------------------------------+
|A1 |2 |S1 |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1 |1 |S1 |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]] |
+-----------+-----+-----------+---------------------------------------------------------+
entity.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
private String broker_name;
private String server_name;
private Integer order;
private Storage[] storages;
}
storage.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
private Timestamp timestamp;
private Double storage;
}
testdata.json:
[
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-29 23:11:44.000",
"storage": 12.5
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 1,
"storages": [
{
"timestamp": "2018-10-30 14:43:05.000",
"storage": 13.2
}
]
},
{
"broker_name": "A1",
"server_name": "S1",
"order": 2,
"storages": [
{
"timestamp": "2019-11-02 10:00:03.000",
"storage": 1001.0
}
]
}
]
testschema.json:
{
"type": "struct",
"fields": [
{
"name": "broker_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "server_name",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "order",
"type": "integer",
"nullable": true,
"metadata": {}
},
{
"name": "storages",
"type": {
"type": "array",
"elementType": {
"type": "struct",
"fields": [
{
"name": "timestamp",
"type": "timestamp",
"nullable": true,
"metadata": {}
},
{
"name": "storage",
"type": "double",
"nullable": true,
"metadata": {}
}
]
},
"containsNull": true
},
"nullable": true,
"metadata": {}
}
]
}
这是因为避免化使用Encoder
推断的模式上的结构匹配,并且由于Bean类没有自然结构,因此架构的字段按名称排序。
因此,如果您定义了像 Entity
这样的bean类,则从bean Encoder
推断的架构将为
Encoders.bean(Storage.class).schema().printTreeString();
root
|-- storage: double (nullable = true)
|-- timestamp: timestamp (nullable = true)
不是
root
|-- timestamp: timestamp (nullable = true)
|-- storage: double (nullable = true)
以及应使用Dataset
的模式。换句话说,架构定义为:
StructType schema = Encoders.bean(Entity.class).schema();
或
StructType schema = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<storage: double, timestamp: timestamp>>"
);
将有效,可以直接用于加载testData
:
Dataset<Entity> ds = spark.read()
.option("multiline", "true")
.schema(schema)
.json("testData.json")
.as(Encoders.bean(Entity.class));
虽然您当前的模式等同于:
StructType valid = StructType.fromDDL(
"broker_name string, order integer, server_name string, " +
"storages array<struct<timestamp: timestamp, storage: double>>"
);
尽管它可以与JSON Reader一起使用(与Encoders
相比),但它并不是与名称匹配的数据。
可以说,应将这种行为报告为一个错误 - 直觉上,Encoder
不应该将数据与其自己的加载逻辑不相容。
相关的JIRA票-SPARK -27050