RDD.map 函数在 Spark 中挂起



我正在尝试连接两个等长的火花数据帧,例如 -

DF1 -

| A | 
| 1 |
| 2 | 
| 3 |
| 4 |

DF2 -

| B |
| a |
| b |
| c | 
| d |

结果 自由度 -

| A | B |
| 1 | a |
| 2 | b |
| 3 | c |
| 4 | d |

为此,我使用以下代码 -

val combinedRow = df1.rdd.zip(df2.select("B").rdd). map({
case (df1Data, df2Data) => {
Row.fromSeq(df1Data.toSeq ++ df2Data.toSeq)
}
})
val combinedschema =  StructType(df1.schema.fields ++ df2.select("B").schema.fields)
val resultDF = spark.sqlContext.createDataFrame(combinedRow, combinedschema)

但是代码没有取得任何进展。它也没有显示任何异常。它只是卡住了。 任何建议这里可能有什么问题?提前谢谢。

编辑-

成功执行最新语句后生成的日志。

[main] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 13.848847 ms
[broadcast-exchange-0] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 14.323824 ms
[broadcast-exchange-0] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_35 stored as values in memory (estimated size 1024.1 KB, free 871.5 MB)
[broadcast-exchange-0] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_35_piece0 stored as bytes in memory (estimated size 417.0 B, free 871.5 MB)
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_35_piece0 in memory on 192.168.20.181:38202 (size: 417.0 B, free: 872.9 MB)
[broadcast-exchange-0] INFO org.apache.spark.SparkContext - Created broadcast 35 from run at ThreadPoolExecutor.java:1142
[main] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 27.697751 ms
[main] INFO org.apache.spark.SparkContext - Starting job: show at Train.scala:180
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 19 (show at Train.scala:180) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 31 (show at Train.scala:180)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 31 (MapPartitionsRDD[106] at show at Train.scala:180), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_36 stored as values in memory (estimated size 14.3 KB, free 871.5 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_36_piece0 stored as bytes in memory (estimated size 6.4 KB, free 871.5 MB)
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_36_piece0 in memory on 192.168.20.181:38202 (size: 6.4 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 36 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 31 (MapPartitionsRDD[106] at show at Train.scala:180)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 31.0 with 1 tasks
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 31.0 (TID 1267, localhost, executor driver, partition 0, PROCESS_LOCAL, 5961 bytes)
[Executor task launch worker for task 1267] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 31.0 (TID 1267)
[Executor task launch worker for task 1267] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 32.758147 ms
[Executor task launch worker for task 1267] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 20 (head at Train.scala:161) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 32 (head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 32 (MapPartitionsRDD[110] at head at Train.scala:161), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_37 stored as values in memory (estimated size 26.9 KB, free 871.4 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_37_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.4 MB)
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_37_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 37 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 32 (MapPartitionsRDD[110] at head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 32.0 with 1 tasks
[dispatcher-event-loop-2] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 32.0 (TID 1268, localhost, executor driver, partition 0, PROCESS_LOCAL, 5813 bytes)
[Executor task launch worker for task 1268] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 32.0 (TID 1268)
[Executor task launch worker for task 1268] INFO org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD - closed connection
[Executor task launch worker for task 1268] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 32.0 (TID 1268). 1979 bytes result sent to driver
[task-result-getter-3] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 32.0 (TID 1268) in 132 ms on localhost (executor driver) (1/1)
[task-result-getter-3] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 32.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 32 (head at Train.scala:161) finished in 0.128 s
[Executor task launch worker for task 1267] INFO org.apache.spark.scheduler.DAGScheduler - Job 20 finished: head at Train.scala:161, took 0.140223 s
[Executor task launch worker for task 1267] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 8.366053 ms
[Executor task launch worker for task 1267] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 31.0 (TID 1267). 1501 bytes result sent to driver
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 31.0 (TID 1267) in 393 ms on localhost (executor driver) (1/1)
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 31.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 31 (show at Train.scala:180) finished in 0.393 s
[main] INFO org.apache.spark.scheduler.DAGScheduler - Job 19 finished: show at Train.scala:180, took 0.413534 s
[main] INFO org.apache.spark.SparkContext - Starting job: show at Train.scala:180
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 21 (show at Train.scala:180) with 4 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 33 (show at Train.scala:180)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 33 (MapPartitionsRDD[106] at show at Train.scala:180), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_38 stored as values in memory (estimated size 14.3 KB, free 871.4 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_38_piece0 stored as bytes in memory (estimated size 6.4 KB, free 871.4 MB)
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_38_piece0 in memory on 192.168.20.181:38202 (size: 6.4 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 38 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 4 missing tasks from ResultStage 33 (MapPartitionsRDD[106] at show at Train.scala:180)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 33.0 with 4 tasks
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 33.0 (TID 1269, localhost, executor driver, partition 1, PROCESS_LOCAL, 5961 bytes)
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 33.0 (TID 1270, localhost, executor driver, partition 2, PROCESS_LOCAL, 5961 bytes)
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 33.0 (TID 1271, localhost, executor driver, partition 3, PROCESS_LOCAL, 5961 bytes)
[dispatcher-event-loop-0] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 3.0 in stage 33.0 (TID 1272, localhost, executor driver, partition 4, PROCESS_LOCAL, 5961 bytes)
[Executor task launch worker for task 1269] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 33.0 (TID 1269)
[Executor task launch worker for task 1271] INFO org.apache.spark.executor.Executor - Running task 2.0 in stage 33.0 (TID 1271)
[Executor task launch worker for task 1272] INFO org.apache.spark.executor.Executor - Running task 3.0 in stage 33.0 (TID 1272)
[Executor task launch worker for task 1270] INFO org.apache.spark.executor.Executor - Running task 1.0 in stage 33.0 (TID 1270)
[Executor task launch worker for task 1269] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 55.127045 ms
[Executor task launch worker for task 1271] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 22 (head at Train.scala:161) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 34 (head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 34 (MapPartitionsRDD[117] at head at Train.scala:161), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_39 stored as values in memory (estimated size 26.9 KB, free 871.4 MB)
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned shuffle 10
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31267
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31268
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25303
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_34_piece0 on 192.168.20.181:38202 in memory (size: 22.8 KB, free: 872.9 MB)
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25298
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25304
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 31269
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned shuffle 11
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25299
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25301
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25300
[dispatcher-event-loop-2] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_37_piece0 on 192.168.20.181:38202 in memory (size: 12.7 KB, free: 872.9 MB)
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_33_piece0 on 192.168.20.181:38202 in memory (size: 22.6 KB, free: 872.9 MB)
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25305
[dispatcher-event-loop-3] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_36_piece0 on 192.168.20.181:38202 in memory (size: 6.4 KB, free: 873.0 MB)
[Spark Context Cleaner] INFO org.apache.spark.ContextCleaner - Cleaned accumulator 25302
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_39_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.6 MB)
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_39_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 39 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 34 (MapPartitionsRDD[117] at head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 34.0 with 1 tasks
[Executor task launch worker for task 1272] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 92.57204 ms
[Executor task launch worker for task 1269] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 23 (head at Train.scala:161) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 35 (head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 35 (MapPartitionsRDD[122] at head at Train.scala:161), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_40 stored as values in memory (estimated size 26.9 KB, free 871.6 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_40_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB)
[dispatcher-event-loop-1] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_40_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 40 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 35 (MapPartitionsRDD[122] at head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 35.0 with 1 tasks
[Executor task launch worker for task 1270] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 24 (head at Train.scala:161) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 36 (head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 36 (MapPartitionsRDD[124] at head at Train.scala:161), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_41 stored as values in memory (estimated size 26.9 KB, free 871.5 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_41_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB)
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_41_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 41 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 36 (MapPartitionsRDD[124] at head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 36.0 with 1 tasks
[Executor task launch worker for task 1272] INFO org.apache.spark.SparkContext - Starting job: head at Train.scala:161
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 25 (head at Train.scala:161) with 1 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 37 (head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 37 (MapPartitionsRDD[126] at head at Train.scala:161), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_42 stored as values in memory (estimated size 26.9 KB, free 871.5 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_42_piece0 stored as bytes in memory (estimated size 12.7 KB, free 871.5 MB)
[dispatcher-event-loop-1] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_42_piece0 in memory on 192.168.20.181:38202 (size: 12.7 KB, free: 872.9 MB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 42 from broadcast at DAGScheduler.scala:996
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing tasks from ResultStage 37 (MapPartitionsRDD[126] at head at Train.scala:161)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 37.0 with 1 tasks

它卡在这里。

我无法在您的代码中复制任何错误,它在您的情况下也应该可以正常工作。

您也可以简单地联接两个数据帧,并将 ID 分配给两个数据帧

df1.withColumn("id", monotonically_increasing_id())
.join(df2.withColumn("id", monotonically_increasing_id()), "id").drop("id")

希望这有帮助!

您也可以在两个RDD上使用zipWithIndex:

val df1 = sc.parallelize(Seq("A", "1", "2", "3", "4")).toDF("A")
val df2 = sc.parallelize(Seq("B", "a", "b", "c", "d")).toDF("B")
val zip1 = df1.rdd.zipWithIndex.map { case (k, v) => (v, k.mkString)}
val zip2 = df2.rdd.zipWithIndex.map { case (k, v) => (v, k.mkString)}
zip1.join(zip2).map{ case (k, v) => v }.collect()

最新更新