我可以将POJO标记为Hibernate实体,而这个POJO由Apache Spark使用吗?



我目前使用的项目有一些POJO文件,Spark

使用这些文件的方式如下:
    JavaRDD<MyPojo> = ...
    sqlContext.createDataFrame(rdd, MyPojo.class);

但是,我还需要使用普通的Java代码将MySQL表加载到此POJO中。我可以使用本机SQL通过POJO加载和包装数据。但我也想知道我是否被允许将此 POJO 标记为冬眠实体。

提前谢谢。

无论您使用 RDD 还是 POJO 的数据集/数据帧,Hibernate/JPA 注释都不应干扰。

但是,当您"玩"Spark时,请注意代码的运行位置。

如果您希望执行程序通过 Hibernate 与数据库通信,则它们必须打开 Hibernate 会话。我不知道(或认为(Hibernate会话可以序列化并在驱动程序和执行器之间共享。

如果您在驱动程序中具有休眠会话,并且此时想要保存数据,请记住,您必须将所有数据从执行程序传输到驱动程序(例如,通过类似collect()的方法(。

它可能会稍微改变应用程序的体系结构,但我会考虑调用write()

df.write()
    .mode(SaveMode.Overwrite)
    .jdbc(dbConnectionUrl, "ch02", prop);

并不是说这是用数据帧完成的,数据帧是一个Dataset<Row>,而不是一个Dataset<MyPojo>(也不是RDD(。完整示例是:

package net.jgp.books.spark.ch02.lab100_csv_to_db;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.lit;
import java.util.Properties;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
/**
 * CSV to a relational database.
 * 
 * @author jgp
 */
public class CsvToRelationalDatabaseApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    CsvToRelationalDatabaseApp app = new CsvToRelationalDatabaseApp();
    app.start();
  }
  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("CSV to DB")
        .master("local")
        .getOrCreate();
    // Step 1: Ingestion
    // ---------
    // Reads a CSV file with header, called authors.csv, stores it in a
    // dataframe
    Dataset<Row> df = spark.read()
        .format("csv")
        .option("header", "true")
        .load("data/authors.csv");
    // Step 2: Transform
    // ---------
    // Creates a new column called "name" as the concatenation of lname, a
    // virtual column containing ", " and the fname column
    df = df.withColumn(
        "name",
        concat(df.col("lname"), lit(", "), df.col("fname")));
    // Step 3: Save
    // ----
    // The connection URL, assuming your PostgreSQL instance runs locally on the
    // default port, and the database we use is "spark_labs"
    String dbConnectionUrl = "jdbc:postgresql://localhost/spark_labs";
    // Properties to connect to the database, the JDBC driver is part of our
    // pom.xml
    Properties prop = new Properties();
    prop.setProperty("driver", "org.postgresql.Driver");
    prop.setProperty("user", "jgp");
    prop.setProperty("password", "Spark<3Java");
    // Write in a table called ch02
    df.write()
        .mode(SaveMode.Overwrite)
        .jdbc(dbConnectionUrl, "ch02", prop);
    System.out.println("Process complete");
  }
}

相关内容

  • 没有找到相关文章

最新更新