将用户定义的对象转换为数据帧,然后写入RDBMS-如何使用数据库维护映射



我在mysql中具有下表结构:

创建表用户(
id int not null,
名称varchar(20)不是null,
年龄int不是null,
地址VARCHAR(100)非null);

现在,我想编写一个火花流动作业,该作业读取来自Kafka的数据,进行一些处理和过滤,并将其写入表"用户"中的RDBM。

为此,我首先创建了表 -

的pojo表示形式
@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
}

下面,我写了将RDD转换为dataFrame的Spark作业 -

JavaDStream<User> userStream = ... // created this stream with some processing
userStream.foreachRDD(rdd -> {
DataFrame df = sqlContext.createDataFrame(rdd,User.class);
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
});

现在,一旦我执行此代码,因为数据框以HAP危险方式形成,并且与数据库架构没有同步。因此,它试图在" ID"列中插入"地址",并以SQL异常退出。

我无法理解如何使数据框架了解数据库的模式并相应地加载数据。有什么办法吗?我认为 javardd 可以映射到 javardd ,但是我无法理解该怎么做。

另外,我相信这种 createAtaframe() api使用反射(必须),因此也存在一个绩效影响的问题。您能告诉我是否有一种方法可以维护POJO和关系数据库之间的映射,并插入数据?

这样做对我有用的。

@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
private static StructType structType = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("id", DataTypes.IntegerType, false),
        DataTypes.createStructField("name", DataTypes.StringType, false),
        DataTypes.createStructField("age", DataTypes.IntegerType, false),
        DataTypes.createStructField("address", DataTypes.StringType, false)
});
public static StructType getStructType() {
    return structType;
}
public Object[] getAllValues() {
    return new Object[]{id, name, age, address};
}
}

火花工作 -

JavaDStream<User> userStream = ... // created this stream with some processing
userStream.map(e -> {
            Row row = RowFactory.create(e.getAllValues());
            return row;
        }).foreachRDD(rdd -> {
            DataFrame df = sqlContext.createDataFrame(rdd,User.getStructType());
            df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
        });

我认为这是比以前更好的方法,因为在上一篇中,DataFrame使用反射将POJO映射到其自己的数据结构中。这是一种更干净的方法,因为我已经是SPARK SQL本身的一种格式,并且我已经在 getAllValues() getsstructType()

中的列映射

如果我错了,请纠正我。

相关内容

  • 没有找到相关文章

最新更新