使用Spark/java在数据帧中加载查询的ResultSet



我想在数据帧Spark中加载选择查询的结果集。

我使用以下代码:

public static void func (Dataset <Row> df){
df.repartition(20); //one connection per partition, see below
df.foreachPartition((Iterator<Row> t) -> {
Connection conn = DriverManager.getConnection("url",
"root", "");
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
ResultSet query =   statement.executeQuery("SELECT * FROM zones WHERE zones.id IN ("
+"'"  + row.getAs("idZones")
+ "'"+ ")  ");

}  catch (SQLException e) {
e.printStackTrace();
} finally {

}
}
statement.close();
conn.close();

});
}

是否有可能在数据帧中加载结果集?

我需要你的帮助

谢谢。

如果我正确理解您的问题,您希望在数据框架中加载SQL表。要做到这一点,你需要做以下事情:

  1. 创建一个sparkSession对象
  2. 将JDBC连接放入Properties对象中
  3. 通过读取方法加载SQL表
  4. 您可以根据加载的数据帧应用相关的筛选器

请在下面找到代码作为示例。

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReadFromSQLTable {
public static void main(String[] args) {
String applicationName = ReadFromSQLTable.class.getName();
SparkConf sparkConf = new SparkConf().setAppName(applicationName).setMaster("local[2]");
// using Dataset<Row>
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "root"); // user name of your SQL database
connectionProperties.put("password", "password"); // password of SQL
connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
// Name of the database that i am interacting with is `test`. You will find this as part of URL.
// Name of table that I want to load is the `employee`
Dataset<Row> employeeDetail = sparkSession.read().jdbc("jdbc:mysql://127.0.0.1:3306/test",
"employee", connectionProperties);
log.error("Printing table detail");
employeeDetail.show(); // to show the dataset loaded on the console
long count = employeeDetail.count();
System.out.println("The count is = " + count);
Dataset<Row> employeeDetail2 = employeeDetail.filter("employee_number < 2");
employeeDetail2.show();
}

您可以对这些数据帧应用任何类型的操作,如筛选、选择或任何其他SQL操作。

我正在本地系统中运行此代码。我在代码中添加了注释,这样你就可以很容易地理解它。如果你有任何疑问,请告诉我。

我希望这能让您对如何开始将SQL表加载为数据帧有一个合理的想法。

最新更新