groupBy with collect_list of struct failure



这真的看起来像一个bug,但我找不到原因也没有任何信息在互联网上

发生了什么:我有一些java代码,在groupBy之后的agg方法中使用collect_list(struct(...))重构一些输入数据集。起初我有奇怪的错误,说明valueArraySize有一些奇怪的值应该是正的(当值是负的),或者当它太大时,它应该比Integer.MAX_VALUE小。我所理解的是,spark在字节数组中读取长,可能是错误的地方,并且得到奇怪的无效数字,使代码在使用它时失败。

为了调试它,我尝试用简单的代码重现这个错误。下面是一个导致jvm崩溃的简单示例!

// Some definitions
@Data // use lombok to generate boilerplate code - no case class in java :
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
String user;
String item;
Double amount;
}
@Data
public static class Top {
String user;
List<Command> commands;
}
@Data
public static class Command {
String item;
Double amount;
}

和失败的代码:

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
List<Order> orders = Arrays.asList(
new Order("U1", "C1", 13.0),
new Order("U1", "C2", 10.1),
new Order("U2", "C3", 42.0));
Dataset<Row> grouped = spark.createDataset(orders, Encoders.bean(Order.class))
.groupBy("user")
.agg(collect_list(struct( "item", "amount" )).as("commands"));
List<Top> output = grouped.as(Encoders.bean(Top.class)).collectAsList();

根据用例,我得到了不同的错误。例如,如果所有的amount都是整数(没有十进制),或者如果我将amount的类型替换为Integer,则代码可以工作,但输出中的item和amount的值完全是任意的。

有人知道那个bug/奇怪的行为吗?

我使用这个spark版本

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0.2.6.5.108-1</version>
</dependency>

这似乎是一个bug。我认为导致这个问题的原因在JIRA注释中有描述。

应该在2.3.3版本中修复

我真的找到了一种方法使工作而编写的代码的问题:

=比;struct中的列名列表需要按字母顺序排列

(在上面的例子中是struct("amount", "item"))

但我仍然不知道为什么。在网上找不到任何东西。struct的文档清楚地提到它输出一个带有列名的模式。它可以在模式中可视化,所以我希望它们被使用…

相关内容