我想在具有kafka源代码的数据砖中创建结构化流。 我按照此处描述的说明进行操作。我的脚本似乎启动了,但是它因流的第一个元素而失败。当我使用confluent_kafka
时,流 itsellf 工作正常并产生结果并工作(在数据砖中),因此似乎缺少一个不同的问题:
处理初始流后,脚本超时:
java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4,
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`
我尝试过什么:查看SO并找到这个答案,我包括spark.conf.set("spark.sql.streaming.stopTimeout", 36000)
进入我的设置 - 这没有任何改变。
任何意见都非常感谢!
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Define a data schema
schema = StructType()
.add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())
.add('ID', StringType())
.add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())
.add('TIMESTAMP', TimestampType())
df = spark
.readStream
.format("kafka")
.option("host", "stream.xxx.com")
.option("port", 12345)
.option('kafka.bootstrap.servers', 'stream.xxx.com:12345')
.option('subscribe', 'stream_test.json')
.option("startingOffset", "earliest")
.load()
df_word = df.select(F.col('key').cast('string'),
F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
df_word
.writeStream
.format("parquet")
.option("path", "dbfs:/mnt/streamfolder/stream/")
.option("checkpointLocation", "dbfs:/mnt/streamfolder/check/")
.outputMode("append")
.start()
我的流输出数据如下所示:
"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"
此外,stream
和check
文件夹都填充了 0-b 文件,除了metadata
,其中包括上述错误的 ìd。
谢谢,注意安全。
我遇到了同样的问题。 我检查了驱动程序日志,并在堆栈跟踪中发现了此异常:
org.apache.spark.SparkException: Failed to store executor broadcast spark_join_relation_3540_1455983219 (size = Some(67371008)) in BlockManager with storageLevel=StorageLevel(memory, deserialized, 1 replicas)
根据此建议,我提高了驱动程序内存(在我的情况下为 16gb 到 32gb),它解决了这个问题。
StackOverflow上的这个答案解释了它为什么有效。