我在mysql中具有下表结构:
创建表用户(
id int not null,
名称varchar(20)不是null,
年龄int不是null,
地址VARCHAR(100)非null);
现在,我想编写一个火花流动作业,该作业读取来自Kafka的数据,进行一些处理和过滤,并将其写入表"用户"中的RDBM。
为此,我首先创建了表 -
的pojo表示形式@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
}
下面,我写了将RDD转换为dataFrame的Spark作业 -
JavaDStream<User> userStream = ... // created this stream with some processing
userStream.foreachRDD(rdd -> {
DataFrame df = sqlContext.createDataFrame(rdd,User.class);
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
});
现在,一旦我执行此代码,因为数据框以HAP危险方式形成,并且与数据库架构没有同步。因此,它试图在" ID"列中插入"地址",并以SQL异常退出。
我无法理解如何使数据框架了解数据库的模式并相应地加载数据。有什么办法吗?我认为 javardd 可以映射到 javardd ,但是我无法理解该怎么做。
另外,我相信这种 createAtaframe() api使用反射(必须),因此也存在一个绩效影响的问题。您能告诉我是否有一种方法可以维护POJO和关系数据库之间的映射,并插入数据?
这样做对我有用的。
@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
private static StructType structType = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("address", DataTypes.StringType, false)
});
public static StructType getStructType() {
return structType;
}
public Object[] getAllValues() {
return new Object[]{id, name, age, address};
}
}
火花工作 -
JavaDStream<User> userStream = ... // created this stream with some processing
userStream.map(e -> {
Row row = RowFactory.create(e.getAllValues());
return row;
}).foreachRDD(rdd -> {
DataFrame df = sqlContext.createDataFrame(rdd,User.getStructType());
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
});
我认为这是比以前更好的方法,因为在上一篇中,DataFrame使用反射将POJO映射到其自己的数据结构中。这是一种更干净的方法,因为我已经是SPARK SQL本身的一种格式,并且我已经在 getAllValues()和 getsstructType()
中的列映射如果我错了,请纠正我。