我目前使用的项目有一些POJO文件,Spark
使用这些文件的方式如下: JavaRDD<MyPojo> = ...
sqlContext.createDataFrame(rdd, MyPojo.class);
但是,我还需要使用普通的Java代码将MySQL表加载到此POJO中。我可以使用本机SQL通过POJO加载和包装数据。但我也想知道我是否被允许将此 POJO 标记为冬眠实体。
提前谢谢。
无论您使用 RDD 还是 POJO 的数据集/数据帧,Hibernate/JPA 注释都不应干扰。
但是,当您"玩"Spark时,请注意代码的运行位置。
如果您希望执行程序通过 Hibernate 与数据库通信,则它们必须打开 Hibernate 会话。我不知道(或认为(Hibernate会话可以序列化并在驱动程序和执行器之间共享。
如果您在驱动程序中具有休眠会话,并且此时想要保存数据,请记住,您必须将所有数据从执行程序传输到驱动程序(例如,通过类似collect()
的方法(。
它可能会稍微改变应用程序的体系结构,但我会考虑调用write()
:
df.write()
.mode(SaveMode.Overwrite)
.jdbc(dbConnectionUrl, "ch02", prop);
并不是说这是用数据帧完成的,数据帧是一个Dataset<Row>
,而不是一个Dataset<MyPojo>
(也不是RDD(。完整示例是:
package net.jgp.books.spark.ch02.lab100_csv_to_db;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
/**
* CSV to a relational database.
*
* @author jgp
*/
public class CsvToRelationalDatabaseApp {
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
CsvToRelationalDatabaseApp app = new CsvToRelationalDatabaseApp();
app.start();
}
/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("CSV to DB")
.master("local")
.getOrCreate();
// Step 1: Ingestion
// ---------
// Reads a CSV file with header, called authors.csv, stores it in a
// dataframe
Dataset<Row> df = spark.read()
.format("csv")
.option("header", "true")
.load("data/authors.csv");
// Step 2: Transform
// ---------
// Creates a new column called "name" as the concatenation of lname, a
// virtual column containing ", " and the fname column
df = df.withColumn(
"name",
concat(df.col("lname"), lit(", "), df.col("fname")));
// Step 3: Save
// ----
// The connection URL, assuming your PostgreSQL instance runs locally on the
// default port, and the database we use is "spark_labs"
String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs";
// Properties to connect to the database, the JDBC driver is part of our
// pom.xml
Properties prop = new Properties();
prop.setProperty("driver", "org.postgresql.Driver");
prop.setProperty("user", "jgp");
prop.setProperty("password", "Spark<3Java");
// Write in a table called ch02
df.write()
.mode(SaveMode.Overwrite)
.jdbc(dbConnectionUrl, "ch02", prop);
System.out.println("Process complete");
}
}