Spark 2.2 排序在处理大型数据集时失败



我在根据1.2列对一个巨大的数据集 (4 T)进行排序时遇到了一个问题。在排序之后,我还需要在 HDFS 中编写最终数据集时根据排序函数中使用的列之一对该数据集进行分区。

这是我几天前发布的一篇堆栈溢出帖子,描述了我在相同代码中遇到的另一个问题,但关于连接两个数据集:

上一期

我用这篇文章的答案来改进我的代码。现在连接工作正常。

我在没有排序的情况下测试了代码,它工作正常。为了执行排序,我考虑了根据四列对数据进行分区。

一个分区的大小为500MB。然后我2600=1.2T/500MB分区。

执行 Spark 作业时,我收到shuffle.RetryingBlockFetcher错误(请参阅下面的错误日志)。

我的问题是

  • 在 Spark 中对数据进行排序以避免随机播放的最佳方法是什么?还是减少它?
  • 我可以更正/添加对代码的改进以执行排序吗?
  • 我真的必须这样排序吗?我不能使用其他技术,如分组依据?

我的代码片段:

编辑

val uh = uh_months
.withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
//      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
.withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
.drop("UHDIN_YYYYMMDD")
.drop("january")
.drop("DVA").repartition(1300,col("MMED"),col("DEBCRED"),col("NMTGP"))//.repartition(1300,col("NO_NUM"))
val uh_flag_comment = new TransactionType().transform(uh)
val uh_repartitioned = uh_flag_comment.repartition(1300,col("NO_NUM"))
val uh_joined = uh_repartitioned.join(broadcast(smallDF), "NO_NUM")
.select(
uh.col("*"),
smallDF.col("PSP"),
smallDF.col("minrel"),
smallDF.col("Label"),
smallDF.col("StartDate"))
.withColumnRenamed("DVA_1", "DVA")
val uh_final = uh_joined.repartition(1300, col("PSP")).sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))
return uh_final

TransactionType是一个类,我在其中使用正则表达式根据 3 列(MMEDDEBCREDNMTGP)的值向我的uh数据帧添加一个新列。

如果不进行排序,并使用集群的全部容量,代码大约在 1 小时内运行。

执行计划

== Physical Plan ==
Exchange hashpartitioning(PSP#82, 2600)
+- *Sort [PSP#82 ASC NULLS FIRST, NO_NUM#252 ASC NULLS FIRST, UHDIN#547 ASC NULLS FIRST, HOURMV#175 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(PSP#82 ASC NULLS FIRST, NO_NUM#252 ASC NULLS FIRST, UHDIN#547 ASC NULLS FIRST, HOURMV#175 ASC NULLS FIRST, 200)
+- Exchange hashpartitioning(PSP#82, NO_NUM#252, UHDIN#547, HOURMV#175, 2600)
+- *Project [NO_NUM#252, DEV#153, DEBCRED#154, BDGRORI#155, BDGREUR#156, BEWC#157, MSG30_NL#158, SCAPMV#159, USERID#160, MMED#161, TNUM#162, NMTGP#163, BKA#164, CATEXT#165, SEQETAT#166, ACCTYPE#167, BRAND#168, FAMILY#169, SUBFAMILY#170, FORCED_DVA#172, BYBANK#173, CPTE_PROTEGE#174, HOURMV#175, RDFB#176, ... 30 more fields]
+- *BroadcastHashJoin [NO_NUM#252], [NO_NUM#13], Inner, BuildRight
:- Exchange hashpartitioning(NO_NUM#252, 1300)
:  +- *Project [NUM#152 AS NO_NUM#252, DEV#153, DEBCRED#154, BDGRORI#155, BDGREUR#156, BEWC#157, MSG30_NL#158, SCAPMV#159, USERID#160, MMED#161, TNUM#162, NMTGP#163, BKA#164, CATEXT#165, SEQETAT#166, ACCTYPE#167, BRAND#168, FAMILY#169, SUBFAMILY#170, FORCED_DVA#172, BYBANK#173, CPTE_PROTEGE#174, HOURMV#175, RDFB#176, ... 26 more fields]
:     +- *Filter (BEWC#157 INSET (25003,25302,25114,20113,12017,20108,25046,12018,15379,15358,11011,20114,10118,12003,25097,20106,20133,10133,10142,15402,25026,25345,28023,15376,25019,28004,21701,25001,11008,15310,15003,2SOMEPORT,22048,15470,25300,25514,25381,25339,15099,25301,28005,28026,25098,25018,15323,25376,15804,15414,25344,25102,15458,15313,28002,25385,22051,25214,15031,12005,15425,20145,22011,15304,25027,14020,11007,25901,15343,22049,20112,12031,20127,15339,25421,15432,28025,25340,25325,20150,28011,25368,25304,22501,25369,28022,15098,12032,15375,25002,25008,10116,10101,22502,25090,15004,20105,12030,22503,15095,22007,15809,15342,15311,25216,10103,20122,11019,20142,15097,20147,20149,25005,25205,25380,15380,10120,25015,15384,11003,10110,25016,15090,25307,15001,25390,15312,10115,25219,15806,15459,12016,15359,15395,15302,12021,11701,10111,10148,25379,15807,10102,25352,25355,12010,25095,25394,20101,25413,15385,25322,28027,11026,15533,25201,25371,10128,11028,12020,15819,10143,28028,10123,10125,11020,25029,10122,25343,15015,12033,25014,12012,25024,25375,11023,25501,25402,22001,15317,12014,16114,20501,15046,12001,12022,10104,10117,12002,25499,10145,10153,12011,15350,15300,10119,25305,15345,25374,11027,25430,28021,25202,10121,28024,25101,28001,15321,11025,25358,15333,15501,25533,15372,12008,11015,10114,10113,10112,15303,15320,28006,22002,25359,10132,15497,25353,11029,25425,15374,12019,25437,11022,15357,20148,20111,26114,25099,25354,10124,25303,11010,20120,20135,15820,15331,28029) && isnotnull(NUM#152))
:        +- *FileScan csv [UHDIN_YYYYMMDD#151,NUM#152,DEV#153,DEBCRED#154,BDGRORI#155,BDGREUR#156,BEWC#157,MSG30_NL#158,SCAPMV#159,USERID#160,MMED#161,TNUM#162,NMTGP#163,BKA#164,CATEXT#165,SEQETAT#166,ACCTYPE#167,BRAND#168,FAMILY#169,SUBFAMILY#170,DVA#171,FORCED_DVA#172,BYBANK#173,CPTE_PROTEGE#174,... 26 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [In(BEWC, [25003,25302,25114,20113,12017,20108,25046,12018,15379,15358,11011,20114,10118,12003,25..., ReadSchema: struct<UHDIN_YYYYMMDD:string,NUM:string,DEV:string,DEBCRED:string,BDGRORI:string,BDGREUR:string,B...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [NO_NUM#13, PSP#82, minrel#370, Label#105, StartDate#106]
+- *SortMergeJoin [PSP#381], [PSP#82], Inner
:- *Sort [PSP#381 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(PSP#381, 200)
:     +- *Project [PSP#381, NO_NUM#13, minrel#370]
:        +- SortMergeJoin [PSP#381, C_SNUM#14, minrel#370, NO_NUM#13], [NO_PSP#47, C_SNUM_1#387, C_NRELPR#50, NO_NUM_1#400], LeftOuter
:           :- *Sort [PSP#381 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, minrel#370 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
:           :  +- Exchange hashpartitioning(PSP#381, C_SNUM#14, minrel#370, NO_NUM#13, 200)
:           :     +- SortAggregate(key=[NO_PSP#12, C_SNUM#14, NO_NUM#13], functions=[min(C_NRELPR#15)])
:           :        +- *Sort [NO_PSP#12 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
:           :           +- Exchange hashpartitioning(NO_PSP#12, C_SNUM#14, NO_NUM#13, 200)
:           :              +- SortAggregate(key=[NO_PSP#12, C_SNUM#14, NO_NUM#13], functions=[partial_min(C_NRELPR#15)])
:           :                 +- *Sort [NO_PSP#12 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
:           :                    +- *Project [NO_PSP#12, C_SNUM#14, NO_NUM#13, C_NRELPR#15]
:           :                       +- *Filter (((C_NRELPR#15 IN (001,006) && C_SNUM#14 IN (030,033)) && isnotnull(NO_NUM#13)) && isnotnull(NO_PSP#12))
:           :                          +- *FileScan csv [NO_PSP#12,NO_NUM#13,C_SNUM#14,c_nrelpr#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [In(c_nrelpr, [001,006]), In(C_SNUM, [030,033]), IsNotNull(NO_NUM), IsNotNull(NO_PSP)], ReadSchema: struct<NO_PSP:string,NO_NUM:string,C_SNUM:string,c_nrelpr:string>
:           +- *Sort [NO_PSP#47 ASC NULLS FIRST, C_SNUM_1#387 ASC NULLS FIRST, C_NRELPR#50 ASC NULLS FIRST, NO_NUM_1#400 ASC NULLS FIRST], false, 0
:              +- Exchange hashpartitioning(NO_PSP#47, C_SNUM_1#387, C_NRELPR#50, NO_NUM_1#400, 200)
:                 +- *Project [NO_PSP#47, NO_NUM#48 AS NO_NUM_1#400, C_SNUM#49 AS C_SNUM_1#387, c_nrelpr#50]
:                    +- *FileScan csv [NO_PSP#47,NO_NUM#48,C_SNUM#49,c_nrelpr#50] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NO_PSP:string,NO_NUM:string,C_SNUM:string,c_nrelpr:string>
+- *Sort [PSP#82 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(PSP#82, 200)
+- *Project [PSP#82, Label#105, StartDate#106]
+- *Filter isnotnull(PSP#82)
+- *FileScan csv [PSP#82,Label#105,StartDate#106] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [IsNotNull(PSP)], ReadSchema: struct<PSP:string,Label:string,StartDate:string>

这是我在启动排序作业时遇到的主要错误:

19/05/06 18:02:25 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 214 outstanding blocks 
java.io.IOException: Failed to connect to SOMEHOST/SOMEADDRESS:SOMEPORT
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:98)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:108)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:228)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:435)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:140)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: SOMEHOST/SOMEADDRESS:SOMEPORT
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
19/05/06 18:02:25 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 214 outstanding blocks after 5000 ms
19/05/06 18:02:25 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 13 ms
19/05/06 18:02:28 INFO executor.Executor: Finished task 408.0 in stage 14.0 (TID 6696). 1733 bytes result sent to driver
19/05/06 18:02:28 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 6816
19/05/06 18:02:28 INFO executor.Executor: Running task 466.1 in stage 14.0 (TID 6816)
19/05/06 18:02:28 INFO storage.ShuffleBlockFetcherIterator: Getting 5073 non-empty blocks out of 5089 blocks
19/05/06 18:02:28 INFO client.TransportClientFactory: Found inactive connection to SOMEHOST/SOMEADDRESS:SOMEPORT, creating a new one.
19/05/06 18:02:28 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 82 outstanding blocks 
java.io.IOException: Failed to connect to SOMEHOST/SOMEADDRESS:SOMEPORT
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:98)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:108)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:228)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:435)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:140)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: SOMEHOST/SOMEADDRESS:SOMEPORT
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more

编辑

其他类型的错误:

19/05/06 18:06:16 ERROR executor.Executor: Exception in task 309.1 in stage 13.1 (TID 7592)
java.io.FileNotFoundException: /applis/hadoop/yarn/local/usercache/MYUSER/appcache/application_1555263602441_0123/blockmgr-aa586b76-ff58-4f88-b168-288c3e1b9f61/3c/temp_shuffle_ea967624-f633-4481-9a05-249b561e3c38 (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.spark_project.guava.io.Files$FileByteSource.openStream(Files.java:124)
at org.spark_project.guava.io.Files$FileByteSource.openStream(Files.java:114)
at org.spark_project.guava.io.ByteSource.copyTo(ByteSource.java:202)
at org.spark_project.guava.io.Files.copy(Files.java:436)
at org.spark_project.guava.io.Files.move(Files.java:651)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:277)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:216)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/05/06 18:06:16 ERROR executor.Executor: Exception in task 502.1 in stage 13.1 (TID 7599)
java.io.FileNotFoundException: /applis/hadoop/yarn/local/usercache/MYUSER/appcache/application_1555263602441_0123/blockmgr-aa586b76-ff58-4f88-b168-288c3e1b9f61/34/temp_shuffle_dd202cd1-ad8f-41c4-b4d1-d79621cd169e (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:102)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:115)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:247)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:201)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:405)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:209)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/05/06 18:06:16 INFO executor.Executor: Finished task 200.2 in stage 13.1 (TID 7568). 2826 bytes result sent to driver
19/05/06 18:06:16 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/05/06 18:06:16 INFO util.ShutdownHookManager: Shutdown hook called

一些信息/背景:

我正在生产环境中工作(请参阅下面的群集配置)。我无法升级我的火花版本。我没有火花 UI 或纱线 UI 来监视我的作业。我所能检索的只是纱线日志。

火花版本:2.2

群集配置:

  • 21 个计算节点(辅助角色)
  • 每个 8 个内核
  • 每个节点 64 GB 内存

当前 Spark 配置:

-主: 纱线

-执行器内存:42G

-执行器核心:5

-驱动程序内存: 42G

-执行者数:32

-spark.sql.broadcastTimeout=3600

-spark.kryoserializer.buffer.max=512

-spark.yarn.executor.memoryOverhead=2400

-spark.driver.maxResultSize=500m

-spark.memory.storageFraction=0.3

-spark.memory.fraction=0.9

-spark.hadoop.fs.permissions.umask-mode=007

作业是如何执行的:

我们使用IntelliJ构建一个工件(jar),然后将其发送到服务器。然后执行一个 bash 脚本。此脚本:

  • 导出一些环境变量(SPARK_HOME、HADOOP_CONF_DIR、PATH 和 SPARK_LOCAL_DIRS)

  • 使用上述 Spark 配置中定义的所有参数启动 Spark-submit 命令

  • 检索应用程序的纱线日志

以下是针对您的情况的一些建议:

  • 更改1:根据生成的较大数据集 1.2TB 重新分区。我也在这一点上删除了repartition(col("NO_NUM"), col("UHDIN"), col("HOURMV")),因为它将从下一次重新分区("NO_NUM")中被覆盖,因此它是多余的。

  • 更改 2:使用 persist 保存我们刚刚分区的数据,以避免为同一数据帧一遍又一遍地重新分区(请查看上一篇文章中的链接以了解其工作原理)

  • 更改3:删除uh_flag_comment.repartition(1300,col("NO_NUM")),因为它对我来说似乎是多余的。尽管这仅在TransactionType().transform(uh)导致重新洗牌时才有用,例如在内部执行 join 或 groupBy!此类操作将使用repartition(2600, col("NO_NUM")修改我们在上一步中设置的分区键。

  • 更改 4:使用col("NO_NUM"), col("UHDIN"), col("HOURMV")重新分区,因为这将是订单将使用的分区键,因此这两个应该是相同的

  • 更改 5:按col("NO_NUM"), col("UHDIN"), col("HOURMV")排序

  • 更改 6:将执行程序 num 增加到 40

val uh = uh_months
.withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
//      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
.withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
.drop("UHDIN_YYYYMMDD")
.drop("january")
.drop("DVA")
.repartition(2600, col("NO_NUM"))//change 1: repartition based on the larger generated dataset also removed repartition(col("NO_NUM"), col("UHDIN"), col("HOURMV")) since it will be overriten from the next repartition()
.persist() //change 2: save your keys (please check the links from the previous post on how this works)
val uh_flag_comment = new TransactionType().transform(uh)
//change 3: the previous repartition was redudant 
val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
.select(
uh.col("*"),
smallDF.col("PSP"),
smallDF.col("minrel"),
smallDF.col("Label"),
smallDF.col("StartDate"))
.withColumnRenamed("DVA_1", "DVA")
.repartition(2600, col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV"))//change 4: this is the partition key that will be used by the orderBy therefore these two should be identical
.persist()//optional, try to remove it as well
// change 5: removed redudant repartition and addded the same partition information as above   
val uh_final = uh_joined.orderBy(col("PSP), col("NO_NUM"), col("UHDIN"), col("HOURMV"))
return uh_final

祝你好运,如果您有任何问题,请告诉我

我设法使用@Alexandros发布的答案的重要部分成功地对数据进行排序(然后运行整个代码)。

但是,我在集群中的配置方面进行了一些更改:

  • 我将执行器内存增加到 45 G(它是 42 G)
  • 我更改了 Spark 参数--executor-memory将其增加到45G而不是42G
  • 我将executors的数量增加到40
  • 我还在每个节点上增加了/applis/hadoop/yarn/local/usercache/MYUSER/的磁盘空间,增加了20-25G(此文件夹的每个节点上的可用空间略低于 50 G)。这是Spark编写中间随机数据块的yarn usercache。由于我有一个 1.2T 的数据集,并且有 21 个节点,因此当数据分布在节点上时,每个节点上需要大约60-65G的磁盘空间。

我还使用了sortWithinPartition函数(它使用此函数工作正常,但在经典排序函数中失败)。此外,我只需要对每个分区进行排序,因为我是根据PSP进行分区的(如果数据集不是根据PSP排序的,那很好)。

下面是代码:

val uh = uh_months
.withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
//      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
.withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
.drop("UHDIN_YYYYMMDD")
.drop("january")
.drop("DVA")
.repartition(3000, col("NO_NUM"))
.persist()
val uh_flag_comment = new TransactionType().transform(uh)
val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
.select(
uh_flag_comment.col("*"),
kl_holdmand_pruned.col("PSP"),
kl_holdmand_pruned.col("minrel"),
kl_holdmand_pruned.col("TerroLabel"),
kl_holdmand_pruned.col("TerroStartDate"))
.withColumnRenamed("DVA_1", "DVA")
smallDF.unpersist()
uh.unpersist()
val uh_to_be_sorted = uh_joined.repartition(3000, col("PSP"))
val uh_final = uh_to_be_sorted.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))
uh_final

最新更新