使用Apache Spark高效地将数据推送到弹性搜索



我在一个xml文件中有2700万条记录,我想把它推送到弹性搜索索引中下面是用spark-scala编写的代码片段,我将创建一个spark-job jar并在AWS EMR 上运行

我如何才能有效地利用火花来完成这项练习?请引导。

我有一个12.5gb的gzipped xml,我正在将其加载到spark数据帧中。我是Spark的新手。。(我应该拆分这个gzip文件吗?还是由spark执行器来处理?)

class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null

new_df = m_df.select($"CountryCode"(0).as("countryCode"),
$"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"),
$"county"(0).as("county"),
$"city"(0).as("city"),
$"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"),
functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
.where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))

new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}

更新1:我已将每个文件拆分为68M,读取单个文件需要3.7分钟我没有试着用snappy代替gzip压缩编解码器因此,将gz文件转换为snappy文件,并在下面的配置中添加

.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")

但它返回空的数据帧

df.printschema只返回";根";

更新2:我已经设法使用lzo格式运行。。解压缩和加载数据帧所花费的时间非常少。

迭代每个大小为140MB的lzo压缩文件并创建数据帧是个好主意吗?或

我应该在一个数据帧中加载一组10个文件吗?或

我应该在一个数据帧中加载所有200个lzo压缩文件吗?。如果是,那么应该给master分配多少内存,因为我认为这将加载到master上?

当从s3桶读取文件时;s3a";uri可以提高性能吗?或";s3";uri可以用于EMR吗?

更新3:测试一小组10个lzo文件。。我使用了以下配置。EMR集群总共花了56分钟,从这个步骤(Spark应用程序)花了48分钟处理10个文件

1 Master-m5.xlarge4 vCore,16 GiB内存,仅EBS存储EBS存储:32 GiB

2芯-m5.xlarge4 vCore,16 GiB内存,仅EBS存储EBS存储:32 GiB

从中学习以下Spark调谐参数https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]

以下是我的一些技巧。

读取镶木地板格式或任何格式的数据。根据需要重新分区。数据转换可能会消耗时间,所以请在spark中读取数据,然后进行处理。在开始加载之前,请尝试创建映射和格式化数据。这将有助于在复杂映射的情况下轻松调试。

val spark = SparkSession
.builder()
.appName("PushToES")
.enableHiveSupport()
.getOrCreate()

val batchSizeInMB=4; // change it as you need
val batchRetryCount= 3
val batchWriteRetryWait = 10
val batchEntries= 10
val enableSSL = true
val wanOnly = true
val enableIdempotentInserts = true
val esNodes = [yourNode1, yourNode2, yourNode3]
var esConfig = Map[String, String]()
esConfig = esConfig + ("es.node"-> esNodes.mkString)(","))
esConfig = esConfig + ("es.port"->port.toString())
esConfig = esConfig + ("es.batch.size.bytes"->(batchSizeInMB*1024*1024).toString())
esConfig = esConfig + ("es.batch.size.entries"->batchEntries.toString())
esConfig = esConfig + ("es.batch.write.retry.count"->batchRetryCount.toString())
esConfig = esConfig + ("es.batch.write.retry.wait"->batchWriteRetryWait.toString())
esConfig = esConfig + ("es.batch.write.refresh"->"false")
if(enableSSL){
esConfig = esConfig + ("es.net.ssl"->"true")
esConfig = esConfig + ("es.net.ssl.keystore.location"->"identity.jks")
esConfig = esConfig + ("es.net.ssl.cert.allow.self.signed"->"true")
}
if (wanOnly){
esConfig = esConfig + ("es.nodes.wan.only"->"true")
}
// This helps if some task fails , so data won't be dublicate
if(enableIdempotentInserts){
esConfig = esConfig + ("es.mapping.id" ->"your_primary_key_column")
}
val df = "suppose you created it using parquet format or any format"

实际上,数据是在执行器级别插入的,而不是在驱动程序级别插入的试着只给每个执行器2-4个核心,这样就不会有那么多连接同时打开。您可以根据自己的方便程度更改文档大小或条目。请阅读有关它们的内容。

将数据分块写入,这将有助于将来加载大型数据集并尝试在加载数据之前创建索引映射。并且更喜欢小嵌套数据,因为ES中有这种功能我的意思是尽量在你的数据中保留一些主键。

val dfToInsert = df.withColumn("salt", ceil(rand())*10).cast("Int").persist()
for (i<-0 to 10){
val start = System.currentTimeMillis
val finalDF = dfToInsert.filter($"salt"===i)
val counts = finalDF.count()
println(s"count of record in chunk $i -> $counts")
finalDF.drop("salt").saveToES("indexName",esConfig)
val totalTime = System.currentTimeMillis - start
println(s"ended Loading data for chunk $i. Total time taken in Seconds : ${totalTime/1000}")
}

试着给你的最终DF提供一些别名,并在每次运行中更新。因为您不想打扰您的生产服务器加载时

内存

这不能是通用的。但只是为了给你一个启动的机会

根据您的数据大小或预算保留10-40个执行器。保留每个执行器8-16gb大小和5gb开销。(这可能因您的文档的大小可以是大的或小的)。如果需要,请保持maxResultSize 8gb。驱动器可以有5个核心和30克内存

重要事项

  • 您需要将配置保存在变量中,因为您可以根据索引进行更改

  • 插入发生在执行器而不是驱动程序上,所以尽量保持较小写入时连接。每个核心将打开一个连接。

  • 文档插入可以是批量输入大小或文档大小。在进行多次跑步时,根据您的学习情况进行更改。

  • 尝试使您的解决方案稳健。它应该能够处理所有大小的数据。阅读和写作都可以调整,但请尝试将数据格式化为在开始加载之前按文档映射。这将有助于调试,如果数据文档有点复杂和嵌套。

  • spark submit的记忆也可以根据你在跑步时的学习情况进行调整工作。试着通过改变内存和批处理来查看插入时间大小

  • 最重要的是设计。如果您正在使用ES,则创建您的地图,同时牢记最终查询和需求。

这不是一个完整的答案,但仍有点长。我有几个建议。

目前还不清楚,但我想你担心的是处决时间。正如注释中所建议的,您可以通过向集群中添加更多的节点/执行器来提高性能。如果加载gzip文件时没有在spark中进行分区,那么应该将其拆分为合理的大小。(不要太小-这会使处理速度变慢。不要太大-执行器将运行OOM)。

使用Spark时,parquet是一种很好的文件格式。如果您可以将XML转换为镶木地板。它具有超强的压缩性和轻质性。

阅读您的评论,coalesce不会完全洗牌。合并算法通过将数据从一些分区移动到现有分区来更改节点数。这种算法显然不能增加分区的数量。请改用repartition。该操作成本高昂,但可能会增加分区数量。查看此项了解更多事实:https://medium.com/@mrpowers/manageing-spark-partitions-with-coless-and-repartition-4050c57ad5c4

最新更新