在我的Scala/Spark应用程序中,我正在尝试正确使用多处理。从下面的代码中可以看出,线程数等于storage
数组中的元素数。我测试了当前代码,它可以工作。但如您所见,storage
数组中只有 2 个元素。在我看来,如果数组中有大量元素,就会出现问题。就我而言,我不知道将来数组中会有多少元素。也许我应该限制线程的数量,只有在处理以前的线程时才启动新线程。
问:如何确定最佳线程数?
Main.app:
import org.apache.spark.sql.DataFrame
import utils.CustomThread
object MainApp {
def main(args: Array[String]): Unit = {
// Create the main DataFrame with all information.
var baseDF: DataFrame = spark.read.option("delimiter", "|").csv("/path_to_the_files/")
// Cache the main DataFrame.
baseDF.persist(StorageLevel.MEMORY_AND_DISK)
// The first time DataFrame is computed in an action, it will be kept in memory on the nodes.
baseDF.count()
// Create arrays with the different identifiers
var array1 = Array("6fefc487-bd57-4fa2-808a-3845703b83d0", "9baba76b-07c2-48ec-a153-6cfb8b138ecf")
var array2 = Array("ab654369-77f5-478c-94e5-ee2755ae8571", "3b43e0a6-deba-4919-a2cc-9d450e28e0fe")
var storage = Array(array1, array2)
// Check if the main DataFrame is empty or not.
if (baseDF.head(1).nonEmpty) {
for (item <- storage) {
val thread = new Thread(new CustomThread(baseDF, item))
thread.start()
}
}
}
}
CustomThread.scala:
package utils
import org.apache.spark.sql.DataFrame
class CustomThread(baseDF: DataFrame, item: Array[String]) extends Runnable {
override def run(): Unit = {
val df = baseDF.filter(col("col1").isin(item:_*))
println("Count: " + df.count())
}
}
我使用这样的配置:
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max.mb: 1024
spark.executor.memory: 2g
spark.sql.autoBroadcastJoinThreshold: -1
spark.sql.files.ignoreCorruptFiles: true
spark.driver.memory: 30g
spark.driver.maxResultSize: 20g
spark.executor.cores: 1
spark.cores.max: 48
spark.scheduler.mode: FAIR
你想用你的多线程实现什么?在 Spark 应用程序中,您不必担心线程数等。您的代码所做的是启动并行作业(您的多线程仅在驱动程序上(,对于执行器,没有区别。
根据我的经验,仅当我有多个作业较小或倾斜(例如群集资源未充分利用(时,我才使用并行作业启动。如果我这样做,我会使用 scalas 并行集合。
回答您的问题:如果线程可能是 1 的最佳数字
编辑:我建议完全重写您的代码,目标是将所有结果放在一个新的数据帧中,这比实现复杂的多线程更好:
// testcase
val baseDf = Seq(
"6fefc487-bd57-4fa2-808a-3845703b83d0",
"9baba76b-07c2-48ec-a153-6cfb8b138ecf",
"ab654369-77f5-478c-94e5-ee2755ae8571",
"dummy"
).toDF("col1")
var array1 = Seq("6fefc487-bd57-4fa2-808a-3845703b83d0", "9baba76b-07c2-48ec-a153-6cfb8b138ecf")
var array2 = Seq("ab654369-77f5-478c-94e5-ee2755ae8571", "3b43e0a6-deba-4919-a2cc-9d450e28e0fe")
var storage = Seq(array1, array2)
broadcast(storage.toDF("storage"))
.join(baseDf,array_contains($"storage",$"col1"),"left")
.groupBy($"storage").agg(count($"col1").as("count"))
.show()
给:
+--------------------+-----+
| storage|count|
+--------------------+-----+
|[ab654369-77f5-47...| 1|
|[6fefc487-bd57-4f...| 2|
+--------------------+-----+