我在docker中使用spark来进行一些处理。我们得到了一个Kafka容器、Spark主容器、两个Spark worker容器和一个Python容器来协调整个流程。我们使用docker-compose
来调出所有内容:
version: '3.4'
volumes:
zookeeper-persistence:
kafka-store:
spark-store:
services:
zookeeper-server:
image: 'bitnami/zookeeper:3.6.1'
expose:
- '2181'
environment:
...
volumes:
- zookeeper-persistence:/bitnami/zookeeper
kafka-server:
image: 'bitnami/kafka:2.6.0'
expose:
- '29092'
- '9092'
environment:
...
volumes:
- kafka-store:/bitnami/kafka
depends_on:
- zookeeper-server
spark-master:
image: bitnami/spark:3.0.1
environment:
SPARK_MODE: 'master'
SPARK_MASTER_HOST: 'spark-master'
ports:
- '8080:8080'
expose:
- '7077'
depends_on:
- kafka-server
spark-worker1:
image: bitnami/spark:3.0.1
environment:
SPARK_MODE: 'worker'
SPARK_WORKER_MEMORY: '4G'
SPARK_WORKER_CORES: '2'
depends_on:
- spark-master
spark-worker2:
#same as spark-worker1
compute:
build: ./app
image: compute
environment:
KAFKA_HOST: kafka-server:29092
COMPUTE_TOPIC: DataFrames
PYSPARK_SUBMIT_ARGS: "--master spark://spark-master:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"
depends_on:
- spark-master
- kafka-server
volumes:
- spark-store:/app/checkpoints
数据通过另一个Python应用程序发送,计算容器对更改做出响应。我们创建一个ComputeDeployment并调用start函数来启动我们的Spark作业:
def start(self):
if self.compute_config.group_by:
group_by_columns = ["value." + x for x in self.compute_config.group_by]
agg_columns = [first(col("value." + x)).alias(x) for x in self.schema.names if x not in self.compute_config.group_by]
select_columns = [x for x in self.schema.names]
self.data_frame = self.data_frame
.select("key", "timestamp", from_json(col("value").cast("string"), self.schema).alias("value"))
.withWatermark("timestamp", "30 seconds")
.groupBy(*group_by_columns, window("timestamp", "30 seconds", "10 seconds"))
.agg(*agg_columns)
.select(struct(*select_columns).alias("value"))
.withColumn("value", col("value").cast(self.rename_schema))
.select(col(f"value.{self.compute_config.identifier}").alias("key"), col("value").alias("value"))
.withColumn("key", col("key").cast("binary"))
else:
self.data_frame = self.data_frame
.select("key", from_json(col("value").cast("string"), self.schema).alias("value"))
.withColumn("value", col("value").cast(self.rename_schema))
if self.build_columns:
for column in self.build_columns:
self.data_frame = self.data_frame.withColumn("value", struct("value.*", (function_mapping[column['build']['function']](*column['builder'])).alias(column['name'])))
self.ds = self.data_frame.withColumn("value", to_json("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("topic", self.compute_config.topic_target)
.option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target+str(self.compute_config.id)}")
.outputMode("update")
.start()
if self.compute_config.group_by:
的其他部分工作起来没有任何问题。然而,追星族部分导致以下异常:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382)
... 37 more
Caused by: java.io.IOException: mkdir of file:/app/checkpoints/driver10/state/0/0 failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1280)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:183)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:212)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:804)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:800)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:807)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:303)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.init(HDFSBackedStateStoreProvider.scala:224)
at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.createAndInit(StateStore.scala:230)
at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$get$1(StateStore.scala:366)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:365)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我不明白为什么使用groupby会导致异常,而其他部分则不会。它让我抓狂,因为不知何故,它的行为有所不同,却没有找到任何关于差异的文档。
流式传输到控制台确实有效(因为没有使用偏离路线的检查点(:
self.data_frame
.writeStream
.format("console")
.option("truncate", "false")
.start()
产生这个:
+-------------------------+--------------------------------------------+
|key |value |
+-------------------------+--------------------------------------------+
|[00 00 00 00 07 59 C7 6E]|[name, 116, 123324270] |
|[00 00 00 00 0B 2E E6 68]|[name, 116, 187623016] |
...
正如在我们的具体案例中所预期的那样。
有人能帮我吗?
提前谢谢!
这与权限有关。多亏了OneCricketer,我发现Spark容器中的用户无法写入目录。用这个入口点文件修复了它:
#!/bin/bash
chmod -R a+rwX /app/checkpoints/
python -u run.py