我有一个需求,我需要每5分钟从多个源系统(Mysql实例)获取数据,并加入和丰富它们与一些其他数据(存在于S3让说)。
我想在Spark中处理这个过程,将我的执行分布在多个执行器上。
主要问题是每次我在Mysql做一个查找,我只想获取最新的记录(让我们说lastmodifieldon>时间戳)。如何有效地选择性获取MySql行?这是我尝试过的:
val filmDf = sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/sakila")
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "")
.load()
您应该使用spark sql与jdbc数据源。我给你看一个例子。
val res = spark.read.jdbc(
url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb",
table = "TEST.table",
columnName = "lastModifiedOn",
lowerBound = lowerTimestamp,
upperBound = upperTimestamp,
numPartitions = 20,
connectionProperties = new Properties()
)
Apache Spark测试套件中有更多示例:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala