如何在Apache Spark中并行运行两个SparkSql查询



首先,让我在 Spark 上的 .scala 文件中编写我要执行的代码部分。

这是我的源文件。它具有四个字段的结构化数据

val inputFile = sc.textFile("hdfs://Hadoop1:9000/user/hduser/test.csv")

我已经声明了一个 case 类,用于将文件中的数据存储到具有四列的表中

case class Table1(srcIp: String, destIp: String, srcPrt: Int, destPrt: Int)
val inputValue = inputFile.map(_.split(",")).map(p => Table1(p(0),p(1),p(2).trim.toInt,p(3).trim.toInt)).toDF()
inputValue.registerTempTable("inputValue")

现在,假设我想运行以下两个查询。如何并行运行这些查询,因为它们是相互独立的。我觉得,如果我能并行运行它们,它可以减少执行时间。现在,它们是串行执行的。

val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
primaryDestValues.registerTempTable("primaryDestValues")
val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
primarySrcValues.registerTempTable("primarySrcValues")
primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show(

也许你可以朝期货/承诺的方向看。SparkContext submitJob中有一种方法可以让你的未来得到结果。因此,您可以解雇两份工作,然后从期货中收集结果。

我还没有尝试过这种方法。只是一个假设。

不知道你为什么要首先使用sqlContext,不要让事情变得简单。

val inputValue = inputFile.map(_.split(",")).map(p => (p(0),p(1),p(2).trim.toInt,p(3).trim.toInt))

假设 p(0) = destIp,p(1)=srcIp

val joinedValue = inputValue.map{case(destIp, srcIp, x, y) => (destIp, (x, y))}
                  .join(inputFile.map{case(destIp, srcIp, x, y) => (srcIp, (x, y))})
                  .map{case(ip, (x1, y1), (x2, y2)) => (ip, destX, destY, srcX, srcY)}

现在它将被paralleziation,你甚至可以使用colasce控制你想要的分区数量。

您可以跳过两个DISTINCT并在最后执行一个:

inputValue.select($"srcIp").join(
  inputValue.select($"destIp"), 
  $"srcIp" === $"destIp"
).distinct().show

这是一个很好的问题。这可以使用数组中的par并行执行。为此,您必须相应地自定义代码。

声明一个包含两个项目的数组(您可以根据需要命名)。在需要并行执行的每个 case 语句中编写代码。

Array("destIp","srcIp").par.foreach { i => 
{
    i match {
      case "destIp" => {
        val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
        primaryDestValues.registerTempTable("primaryDestValues")
      }
      case "srcIp" => {
        val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
        primarySrcValues.registerTempTable("primarySrcValues")
      }}}
}

一旦两个 case 语句的执行完成,下面的代码将被执行。

primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show()

注意:如果从代码中删除par,它将按顺序运行

另一种选择是在代码中创建另一个 Sparksession 并使用该 sparksession 变量执行 sql。但这风险很小,并且使用起来非常小心

相关内容

  • 没有找到相关文章

最新更新