Spark RDD 中的多个分区



所以我正在尝试在Play/Scala项目中使用Spark从MySQL数据库中获取数据。由于我尝试接收的行数很大,我的目标是从 spark rdd 中获取一个迭代器。下面是 Spark 上下文和配置...

  private val configuration = new SparkConf()
    .setAppName("Reporting")
    .setMaster("local[*]")
    .set("spark.executor.memory", "2g")
    .set("spark.akka.timeout", "5")
    .set("spark.driver.allowMultipleContexts", "true")
  val sparkContext = new SparkContext(configuration)

JDBCRDD 与 sql 查询如下所示

val query =
  """
    |SELECT id, date
    |FROM itembid
    |WHERE date BETWEEN ? AND ?
  """.stripMargin

val rdd = new JdbcRDD[ItemLeadReportOutput](SparkProcessor.sparkContext,
      driverFactory,
      query,
      rangeMinValue.get,
      rangeMaxValue.get,
      partitionCount,
      rowMapper)
      .persist(StorageLevel.MEMORY_AND_DISK)

数据太多,无法一次获得。一开始,使用较小的数据集,可以从rdd.toLocalIterator获取迭代器。但是,在这种特定情况下,它无法计算迭代器。所以我的目标是拥有多个分区并逐部分接收数据。我不断收到错误。正确的方法是什么?

我相信您正面临读取MySQL表的堆问题。

在您的情况下,我要做的是将数据从MySQL提取到存储系统(HDFS,本地)文件中,然后我将使用spark的上下文textFile来获取它!

例:

object JDBCExample {
  def main(args: Array[String]) {
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/database"
    val username = "user"
    val password = "pass"
    var connection: Connection = null
    try {
      Class.forName(driver)
      connection = DriverManager.getConnection(url, username, password)
      // This is the tricky part of reading a huge MySQL table you'll need to set your sql statement as following :
      val statement = connection.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY)
      statement.setMaxRows(0)
      statement.setFetchSize(Integer.MIN_VALUE)
      val resultSet = statement.executeQuery("select * from ex_table")
      val fileWriter = new FileWriter("output.csv")
      val writer = new CSVWriter(fileWriter, 't');
      while (resultSet.next()) {
        val entries = List(... // process result here //...)
        writer.writeNext(entries.toArray)
      }
      writer.close();
    } catch {
      case e: Throwable => e.printStackTrace
    }
    connection.close()
  }
}

存储数据后,您可以读取它:

val data = sc.textFile("output.csv")

PS:我在代码中使用了一些快捷方式(每个示例的CSVWriter),但您可以将其用作您打算做的事情的骨架!

相关内容

  • 没有找到相关文章

最新更新