通过键对嵌套结构的嵌套结构的序列化不正确



我想通过键减少数据框架。减少逻辑非常复杂,需要更新大约10-15个字段。这就是为什么我想将数据框架转换为数据集并减少Java Pojos。

问题

问题是,在groupByKey-reduceByKey之后,我得到了一些非常奇怪的值。Encoders.bean(Entity.class)读取正确的数据。请参阅代码示例部分

Workartounds

Encoders.kryo替换Encoders.bean不起作用,例外:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

我也看到了这个工厂,但是Encoders.product需要TypeTag。我不知道如何在Java代码上创建TypeTag

代码示例

    Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
        .as(Encoders.bean(Entity.class));
    // shows correct numbers
    ds.show(10, false);
    // correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages                       |
+-----------+-----------+-----+-------------------------------+
|A1         |S1         |1    |[[2018-10-29 23:11:44, 12.5]]  |
|A2         |S1         |1    |[[2018-10-30 14:43:05, 13.2]]  |
|A3         |S1         |2    |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+

    //after reduce shows wrong numbers
    ds
        .groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
        .reduceGroups((e1, e2) -> e1)
        .map(tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);
    // wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages                                                 |
+-----------+-----+-----------+---------------------------------------------------------+
|A1         |2    |S1         |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1         |1    |S1         |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]]  |
+-----------+-----+-----------+---------------------------------------------------------+

entity.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}

storage.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private Timestamp timestamp;
    private Double storage;
}

testdata.json:

[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "storage": 13.2
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 2,
    "storages": [
      {
        "timestamp": "2019-11-02 10:00:03.000",
        "storage": 1001.0
      }
    ]
  }
]

testschema.json:

{
  "type": "struct",
  "fields": [
    {
      "name": "broker_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "server_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "order",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "storages",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "storage",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

这是因为避免化使用Encoder推断的模式上的结构匹配,并且由于Bean类没有自然结构,因此架构的字段按名称排序。

因此,如果您定义了像 Entity这样的bean类,则从bean Encoder推断的架构将为

Encoders.bean(Storage.class).schema().printTreeString();
root
 |-- storage: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

不是

root
 |-- timestamp: timestamp (nullable = true)
 |-- storage: double (nullable = true)

以及应使用Dataset的模式。换句话说,架构定义为:

StructType schema = Encoders.bean(Entity.class).schema();

StructType schema = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<storage: double, timestamp: timestamp>>" 
);

将有效,可以直接用于加载testData

Dataset<Entity> ds = spark.read()
  .option("multiline", "true")
  .schema(schema)
  .json("testData.json")
  .as(Encoders.bean(Entity.class));

虽然您当前的模式等同于:

StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<timestamp: timestamp, storage: double>>" 
);

尽管它可以与JSON Reader一起使用(与Encoders相比),但它并不是与名称匹配的数据。

可以说,应将这种行为报告为一个错误 - 直觉上,Encoder不应该将数据与其自己的加载逻辑不相容。

相关的JIRA票-SPARK -27050

相关内容

  • 没有找到相关文章

最新更新