如何在 Spark 构建流中的 MySQL 中将两个流 df 写入两个不同的表中?



我使用的是spark 2.3.2版本。

我已经在spark结构化流中编写了代码,将流数据帧数据插入到两个不同的MySQL表中。

假设有两个流式df:DF1、DF2。

我使用foreachWriter API编写了两个查询(query1,query2(,分别写入不同流的MySQL表。I.E.将DF1转换为MYSQL表A,将DF2转换为MYSQL表B。

当我运行spark作业时,它首先运行query1,然后运行query2,所以它写入表A,但不写入表B。

如果我把代码改为先运行query2,然后运行query1,它会写入表B,但不会写入表A。

所以我知道它执行第一个查询只是为了写入表。

注意:我尝试过将不同的MySQL用户/数据库分别分配给两个表。但运气不好。

有人能提出建议吗?如何使其发挥作用。

我的代码如下:

import java.sql._
class  JDBCSink1(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
val driver = "com.mysql.jdbc.Driver"
var connection:Connection = _
var statement:Statement = _

def open(partitionId: Long,version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: (org.apache.spark.sql.Row)): Unit = {
val insertSql = """ INSERT INTO tableA(col1,col2,col3) VALUES(?,?,?); """
val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
preparedStmt.setString (1, value(0).toString)
preparedStmt.setString (2, value(1).toString)
preparedStmt.setString (3, value(2).toString)
preparedStmt.execute
}
def close(errorOrNull: Throwable): Unit = {
connection.close
}
}

class  JDBCSink2(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
val driver = "com.mysql.jdbc.Driver"
var connection:Connection = _
var statement:Statement = _

def open(partitionId: Long,version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: (org.apache.spark.sql.Row)): Unit = {
val insertSql = """ INSERT INTO tableB(col1,col2) VALUES(?,?); """
val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
preparedStmt.setString (1, value(0).toString)
preparedStmt.setString (2, value(1).toString)
preparedStmt.execute
}
def close(errorOrNull: Throwable): Unit = {
connection.close
}
}

val url1="jdbc:mysql://hostname:3306/db1"
val url2="jdbc:mysql://hostname:3306/db2"
val user1 ="usr1"
val user2="usr2"
val pwd = "password"
val Writer1 = new JDBCSink1(url1,user1, pwd)
val Writer2 = new JDBCSink2(url2,user2, pwd)

val query2 =
streamDF2
.writeStream
.foreach(Writer2)
.outputMode("append")
.trigger(ProcessingTime("35 seconds"))
.start().awaitTermination()

val query1 =
streamDF1
.writeStream
.foreach(Writer1)
.outputMode("append")
.trigger(ProcessingTime("30 seconds"))
.start().awaitTermination()

由于awaitTermination,您正在阻止第二个查询。如果你想有两个输出流,你需要在等待它们的终止之前启动这两个:

val query2 =
streamDF2
.writeStream
.foreach(Writer2)
.outputMode("append")
.trigger(ProcessingTime("35 seconds"))
.start()
val query1 =
streamDF1
.writeStream
.foreach(Writer1)
.outputMode("append")
.trigger(ProcessingTime("30 seconds"))
.start()
query1.awaitTermination()
query2.awaitTermination()

编辑:

Spark还允许您在应用程序中调度和分配资源给不同的流式查询,如调度中所述。您可以根据配置您的池

  • 调度模式:可以是FIFOFAIR
  • 重量:";这控制了池相对于其他池在集群中的份额。默认情况下,所有池的权重都为1。例如,如果你给一个特定的池赋予2的权重,它将获得比其他活动池多2倍的资源">
  • minShare:";除了整体权重之外,每个池还可以获得管理员希望它拥有的最小份额(作为CPU核心的数量(">

可以通过创建一个类似于conf/fairscheduler.xml.template的XML文件,并在类路径上放置一个名为fairscheduler.XML的文件,或者在SparkConf中设置spark.scheduler.allocation.file属性来设置池配置。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

应用不同的池可以如下所示:

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
// In the above example you could then tell Spark to make use of the pools
val query1 = streamDF1.writeStream.[...].start(pool1)
val query2 = streamDF2.writeStream.[...].start(pool2)

最新更新