spark 2.2结构流foreach编写器jdbc接收器滞后



我在一个项目中使用spark 2.2 struct streaming将kafka msg读取到oracle数据库中。进入kafka的消息流大约是每秒4000-6000条消息。


当使用hdfs文件系统作为接收器目标时,它工作得很好。当使用foreach jdbc编写器时,随着时间的推移,它将有巨大的延迟。我认为滞后是由foreach循环引起的。

jdbc接收器类(独立类文件):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
val driver = "oracle.jdbc.driver.OracleDriver"
var connection: java.sql.Connection = _
var statement: java.sql.PreparedStatement = _
val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
connection.setAutoCommit(false)
statement = connection.prepareStatement(v_sql)
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.setString(1, value(0).toString)
statement.setString(2, value(1).toString)
statement.setString(3, value(2).toString)
statement.setString(4, value(3).toString)
statement.executeUpdate()        
}
def close(errorOrNull: Throwable): Unit = {
connection.commit()
connection.close
}
}

水槽部分:

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option("subscribe", "rawdb.raw_data")
.option("startingOffsets", "latest")
.load()
.select($"value".as[Array[Byte]])
.map(avroDeserialize(_))
.filter(some logic).select(some logic) 
.writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

如果我更改最后一行

.writeStream.format("csv")...

到jdbcforeach接收器中,如下所示:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()

滞后显示。

我想这个问题很可能是由foreach循环机制引起的,它不是在批处理模式下处理的,就像批处理中的几千行一样,作为一名oracle DBA,我对oracle数据库端进行了微调,大部分数据库都在等待空闲事件。通过设置connection.setAutoCommit(false)来避免过度提交,任何建议都将不胜感激。

虽然我没有关于应用程序中耗时最长的内容的实际配置文件,但我认为这是因为每次运行时使用ForeachWriter都会有效地关闭和重新打开JDBC连接,因为ForeachWriter就是这样工作的。

我建议您不要使用它,而是为JDBC编写一个自定义的Sink,用于控制连接的打开或关闭方式。

有一个向Spark添加JDBC驱动程序的openpull请求,您可以查看该请求以了解实现的可能方法。

问题通过将结果注入另一个Kafka主题来解决,然后编写另一个从新主题读取的程序将它们批量写入数据库。

我认为在下一个spark版本中,他们可能会提供jdbc接收器,并具有一些参数设置批量大小。

主要代码如下:

写信给另一个主题:

.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "x.x.x.x:9092")
.option("topic", "fastdbtest")
.option("checkpointLocation", "/user/root/chk")
.start()

阅读主题并写入数据库,我使用的是c3p0连接池

lines.foreachRDD(rdd => {
if (!rdd.isEmpty) {
rdd.foreachPartition(partitionRecords => {
//get a connection from connection pool
val conn = ConnManager.getManager.getConnection
val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)")
try {
conn.setAutoCommit(false)
partitionRecords.foreach(record => {
insertIntoDB(ps, record)
}
)
ps.executeBatch()
conn.commit()
} catch {
case e: Exception =>{}
// do some log
} finally {
ps.close()
conn.close()
}
})
}
})

您尝试过使用触发器吗?

我注意到,当我没有使用触发器时,我的Foreach Sink打开和关闭了好几次与数据库的连接。

writeStream.foreach(writer).start()

但当我使用触发器时,Foreach只打开和关闭了一次连接,例如处理了200个查询,当微批结束时,它关闭了连接,直到收到新的微批。

writeStream.trigger(Trigger.ProcessingTime("3 seconds")).foreach(writer).start()

我的用例是阅读一个只有一个分区的Kafka主题,所以我认为Spark使用的是一个分区。我不知道这个解决方案是否适用于多个Spark分区,但我的结论是Foreach在process方法中一次(逐行)处理所有的微批处理,并不像很多人认为的那样对每一行都调用open()和close()。

最新更新