Spark在使用groupie时无法mkdir



我在docker中使用spark来进行一些处理。我们得到了一个Kafka容器、Spark主容器、两个Spark worker容器和一个Python容器来协调整个流程。我们使用docker-compose来调出所有内容:

version: '3.4'
volumes:
zookeeper-persistence:
kafka-store:
spark-store:
services:
zookeeper-server:
image: 'bitnami/zookeeper:3.6.1'
expose:
- '2181'
environment:
...
volumes:
- zookeeper-persistence:/bitnami/zookeeper
kafka-server:
image: 'bitnami/kafka:2.6.0'
expose:
- '29092'
- '9092'
environment:
...
volumes:
- kafka-store:/bitnami/kafka
depends_on:
- zookeeper-server
spark-master:
image: bitnami/spark:3.0.1
environment:
SPARK_MODE: 'master'
SPARK_MASTER_HOST: 'spark-master'
ports:
- '8080:8080'
expose:
- '7077'
depends_on:
- kafka-server
spark-worker1:
image: bitnami/spark:3.0.1
environment:
SPARK_MODE: 'worker'
SPARK_WORKER_MEMORY: '4G'
SPARK_WORKER_CORES: '2'
depends_on:
- spark-master
spark-worker2:
#same as spark-worker1

compute:
build: ./app
image: compute
environment:
KAFKA_HOST: kafka-server:29092
COMPUTE_TOPIC: DataFrames
PYSPARK_SUBMIT_ARGS: "--master spark://spark-master:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"
depends_on:
- spark-master
- kafka-server
volumes:
- spark-store:/app/checkpoints

数据通过另一个Python应用程序发送,计算容器对更改做出响应。我们创建一个ComputeDeployment并调用start函数来启动我们的Spark作业:

def start(self):
if self.compute_config.group_by:
group_by_columns = ["value." + x for x in self.compute_config.group_by]
agg_columns = [first(col("value." + x)).alias(x) for x in self.schema.names if x not in self.compute_config.group_by]
select_columns = [x for x in self.schema.names]
self.data_frame = self.data_frame 
.select("key", "timestamp", from_json(col("value").cast("string"), self.schema).alias("value")) 
.withWatermark("timestamp", "30 seconds") 
.groupBy(*group_by_columns, window("timestamp", "30 seconds", "10 seconds")) 
.agg(*agg_columns) 
.select(struct(*select_columns).alias("value")) 
.withColumn("value", col("value").cast(self.rename_schema)) 
.select(col(f"value.{self.compute_config.identifier}").alias("key"), col("value").alias("value")) 
.withColumn("key", col("key").cast("binary"))
else:
self.data_frame = self.data_frame 
.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) 
.withColumn("value", col("value").cast(self.rename_schema))
if self.build_columns:
for column in self.build_columns:
self.data_frame = self.data_frame.withColumn("value", struct("value.*", (function_mapping[column['build']['function']](*column['builder'])).alias(column['name'])))
self.ds = self.data_frame.withColumn("value", to_json("value")) 
.writeStream 
.format("kafka") 
.option("kafka.bootstrap.servers", KAFKA_HOST) 
.option("topic", self.compute_config.topic_target) 
.option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}") 
.outputMode("update") 
.start()

if self.compute_config.group_by:的其他部分工作起来没有任何问题。然而,追星族部分导致以下异常:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382)
... 37 more
Caused by: java.io.IOException: mkdir of file:/app/checkpoints/driver10/state/0/0 failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1280)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:183)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:212)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:804)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:800)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:807)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:303)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.init(HDFSBackedStateStoreProvider.scala:224)
at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.createAndInit(StateStore.scala:230)
at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$get$1(StateStore.scala:366)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:365)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我不明白为什么使用groupby会导致异常,而其他部分则不会。它让我抓狂,因为不知何故,它的行为有所不同,却没有找到任何关于差异的文档。

流式传输到控制台确实有效(因为没有使用偏离路线的检查点(:

self.data_frame 
.writeStream 
.format("console") 
.option("truncate", "false") 
.start()

产生这个:

+-------------------------+--------------------------------------------+
|key                      |value                                       |
+-------------------------+--------------------------------------------+
|[00 00 00 00 07 59 C7 6E]|[name, 116, 123324270]                      |
|[00 00 00 00 0B 2E E6 68]|[name, 116, 187623016]                      |
...

正如在我们的具体案例中所预期的那样。

有人能帮我吗?

提前谢谢!

这与权限有关。多亏了OneCricketer,我发现Spark容器中的用户无法写入目录。用这个入口点文件修复了它:

#!/bin/bash
chmod -R a+rwX /app/checkpoints/
python -u run.py

最新更新