如何使用 Scala Spark 快速处理数百万个小型 JSON 文件



我必须处理来自 Azure Blob 存储的数百万个 JSON 文件,每个文件代表一行,并且需要将它们加载到 Azure SQL 数据库中,中间需要进行一些最小的转换。这些文件随机出现,但遵循相同的架构。

我的第一个解决方案基本上只是为每个文件创建一个数据帧并将其推送到 SQL 中。当我们收到数百个文件时,这很有效,但现在我们收到了数百万个文件,它没有扩展,需要一天多的时间来处理。

我们还尝试在没有Spark的情况下处理Scala中的文件(见下面的代码(,但这也太慢了;8分钟内处理了500个文件。

var sql_statement = ""
allFiles.par.map(file_name => {
      //processing
      val json = scala.io.Source.fromFile(file_name).mkString
      val mapData1 = mapper.readValue(json, classOf[Map[String, Any]])
      val account=  mapData1("Contact").asInstanceOf[Map[String, Any]]
      val common = account.keys.toList.intersect(srcDestMap .keys.toList)
      val trMap=common.map(rec=>Map(srcDestMap(rec)->account(rec))).flatten.toMap
      val vals=trMap.keys.toList.sorted.map(trMap(_).toString.replace("'", "''")).map("'"+_+"'")
      //end processing
      val cols="insert into dbo.Contact_VS(" + trMap.keys.toList.sorted.mkString(",") + ")" + " values (" + vals.mkString(",") + ")"
      sql_statement = sql_statement + cols
    })
      val updated=statement.executeUpdate(sql_statement)
      connection.close()

如果有人知道如何优化这段代码,或者我们可以用来预处理JSON的任何开箱即用的想法,我们将不胜感激!JSON是嵌套的,因此将所有内容合并到一个大型JSON中以读取到Spark中会更加复杂,但是如果没有人有更好的想法,我们可能不得不这样做。

你很接近 Spark 包含一些帮助程序函数,用于跨群集并行执行任务。 请注意,您需要将"spark.default.parallelism"设置为一个合理的数字,这样您就不会创建太多与数据库的连接。

  def loadFileAndUploadToRDS(filepath: String): Unit = ???
  @Test
  def parallelUpload(): Unit ={
    val files = List("s3://bucket/path" /** more files **/)
    spark.sparkContext.parallelize(files).foreach(filepath => loadFileAndUploadToRDS(filepath))
  }

既然你已经得到了答案,让我指出原始scala实现的一些问题:

1( 手动创建 SQL 请求容易出错且效率低下

2(循环更新sql_statement效率非常低

3(allFiles.par的并行度。 .par不应用于阻止任务,原因有两个:

  • 它在后台使用全局共享线程池,因此一批任务将阻止其他任务。

  • 并行级别针对 CPU 密集型任务(CPU 线程数(进行了优化。您需要更高的并行度。

最新更新