SparkSQL Java:使用数据集时,POJO到表格格式



我很新来Spark SQL。在执行一项培训任务时,我面临以下问题,找不到答案(以下所有示例有点愚蠢,但对于演示目的而言仍然可以。

我的应用程序读取一个镶木quet文件并在其内容上创建数据集:

DataFrame input = sqlContext.read().parquet("src/test/resources/integration/input/source.gz.parquet");
Dataset<Row> dataset = input.as(RowEncoder$.MODULE$.apply(input.schema()));

dataset.show()调用结果:

+------------+----------------+--------+
+    Names   +       Gender   +   Age  +
+------------+----------------+--------+
| Jack, Jill |  Male, Female  | 30, 25 |

然后,我将数据集转换为带有人类型的新数据集:

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData
            .flatMap((Row sourceRow) -> {
                // code to parse an input row and split person data goes here
                Person person1 = new Person(name1, gender1, age1);
                Person person2 = new Person(name2, gender2, age2);
                return Arrays.asList(person1, person2);
            }, Encoders.bean(Person.class));
}

其中

public abstract class Human implements Serializable {
   protected String name;
   protected String gender;
   // getters/setters go here
   // default constructor + constructor with the name and gender params
 }
 public class Person extends Human {
   private String age;
   // getters/setters for the age param go here
   // default constructor + constructor with the age, name and gender params
   // overriden toString() method which returns the string: (<name>, <gender>, <age>)
 }

最后,当我显示数据集的内容时,我希望看到

 +------------+----------------+--------+
 +    name    +       gender   +   age  +
 +------------+----------------+--------+
 |     Jack   |     Male       |   30   |
 |     Jill   |     Femail     |   25   |

但是,我看到

+-------------------+----------------+--------+
+      name         +       gender   +   age  +
+-------------------+----------------+--------+
|(Jack, Male, 30)   |                |        |
|(Jill, Femail, 25) |                |        |

这是ToString()方法的结果,而标头正确。我相信编码器有问题,只要我使用encoders.javaserizlization(t)或encoders.kryo(t)它显示

+------------------+
+        value     +
+------------------+
|(Jack, Male, 30)  |
|(Jill, Femail, 25)|

最让我担心的是编码器的错误用法可能会导致不正确的Serde和/或性能处罚。在所有Spark Java示例中,我都看不到我能找到的...

你能建议我做错了什么吗?

更新1

这是我项目的依赖性:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>1.6.2</version>
    </dependency>

解决方案

正如Abaghel建议我将版本升级为2.0.2(请注意,在2.0.0版上有Windows的错误),使用的数据集在我的代码中无处不在(似乎是DataFrames,好像是DataFrames不是一部分)Apache Spark从2.0.0开始),并使用基于迭代器的FlatMap函数从行之间转换。

只是为了分享,使用基于traversableonce的flatmap版本为1.6.2的方法对我不起作用,因为它抛出了'mypersonConversion $ function1 notializable oferializable'exception。

现在一切都按预期工作。

您使用的火花的版本是什么?您提供的FlatMap方法不是使用2.2.0版编译。所需的返回类型是Iterator<Person>。请在下面使用FlatMapfunction,您将获得所需的输出。

public static Dataset<Person> transformToPerson(Dataset<Row> rawData) {
    return rawData.flatMap(row -> {
        String[] nameArr = row.getString(0).split(",");
        String[] genArr = row.getString(1).split(",");
        String[] ageArr = row.getString(2).split(",");
        Person person1 = new Person(nameArr[0], genArr[0], ageArr[0]);
        Person person2 = new Person(nameArr[1], genArr[1], ageArr[1]);
        return Arrays.asList(person1, person2).iterator();
    }, Encoders.bean(Person.class));
}
//Call function
Dataset<Person> dataset1 = transformToPerson(dataset);
dataset1.show();

相关内容

  • 没有找到相关文章

最新更新