Mysql数据在Spark中的处理



我有一个需求,我需要每5分钟从多个源系统(Mysql实例)获取数据,并加入和丰富它们与一些其他数据(存在于S3让说)。

我想在Spark中处理这个过程,将我的执行分布在多个执行器上。

主要问题是每次我在Mysql做一个查找,我只想获取最新的记录(让我们说lastmodifieldon>时间戳)。如何有效地选择性获取MySql行?这是我尝试过的:

val filmDf = sqlContext.read.format("jdbc")
  .option("url", "jdbc:mysql://localhost/sakila")
  .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "")
  .load()

您应该使用spark sql与jdbc数据源。我给你看一个例子。

val res = spark.read.jdbc(
      url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb",
      table = "TEST.table",
      columnName = "lastModifiedOn",
      lowerBound = lowerTimestamp,
      upperBound = upperTimestamp,
      numPartitions = 20,
      connectionProperties = new Properties()
    )

Apache Spark测试套件中有更多示例:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

相关内容

  • 没有找到相关文章

最新更新