Pyspark套接字超时错误.回归自我_sock.recv_into(b)socket.timeout:超时



我为协作过滤推荐系统编写了一个spark程序(Python 3.6和spark 2.3.2(,适用于两种情况:

  1. 案例1:基于项目的CF推荐系统
  2. 案例2:基于用户的最小哈希LSH CF推荐系统

我已经编写了具有这两种情况的训练和预测程序。我的代码适用于基于用户的推荐,但当我试图为基于项目的CF训练我的模型时,我得到了以下错误:

2020-10-18 20:12:33 ERROR Executor:91 - Exception in task 0.0 in stage 23.0 (TID 196)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:sparkspark-2.3.2-bin-hadoop2.6pythonlibpyspark.zippysparkworker.py", line 238, in main
File "C:sparkspark-2.3.2-bin-hadoop2.6pythonlibpyspark.zippysparkserializers.py", line 690, in read_int
length = stream.read(4)
File "C:Users17372AppDataLocalProgramsPythonPython36libsocket.py", line 586, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out

我尝试使用此链接上的解决方案来解决此问题:应用程序运行一段时间后出现Pyspark套接字超时异常

它不起作用。

我找到了添加"--spark.worker.timeout=120〃;执行情况如下:

binspark-submit task3train.py train_review.json task3item.model item_based --spark.worker.timeout=120

我仍然看到同样的错误。尝试过Try-Catch盖帽,但我不确定如何正确地完成。

我该怎么办?

我的基于项目的CF:代码

if model_type == ITEM_BASED_MODEL:
# group original data by bidx, and remove those unpopular business (rated time < 3)
# tuple(bidx, (uidx, score))
# [(5306, [(3662, 5.0), (3218, 5.0), (300, 5.0),..]), ()
shrunk_bid_uids_rdd = input_lines 
.map(lambda kv: (bus_index_dict[kv[1]], (user_index_dict[kv[0]], kv[2]))) 
.groupByKey().mapValues(lambda uid_score: list(uid_score)) 
.filter(lambda bid_uid_score: len(bid_uid_score[1]) >= CO_RATED_THRESHOLD) 
.mapValues(lambda vals: [{uid_score[0]: uid_score[1]} for uid_score in vals]) 
.mapValues(lambda val: flatMixedList(val))
candidate_bids = shrunk_bid_uids_rdd.map(lambda bid_uids: bid_uids[0]).coalesce(2)
# convert shrunk_bid_uids_rdd into dict form
# dict(bidx: dict(uidx: score))
# => e.g. {5306: defaultdict(<class 'list'>, {3662: 5.0, 3218: 5.0, 300: 5.0...}),
bid_uid_dict = shrunk_bid_uids_rdd 
.map(lambda bid_uid_score: {bid_uid_score[0]: bid_uid_score[1]}) 
.flatMap(lambda kv_items: kv_items.items()).collectAsMap()
# generate all possible pair between candidate bidx
# and compute the pearson similarity
candidate_pair = candidate_bids.cartesian(candidate_bids) 
.filter(lambda id_pair: id_pair[0] < id_pair[1]) 
.filter(lambda id_pair: existNRecords(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]])) 
.map(lambda id_pair: (id_pair,
computeSimilarity(bid_uid_dict[id_pair[0]],
bid_uid_dict[id_pair[1]]))) 
.filter(lambda kv: kv[1] > 0) 
.map(lambda kv: {"b1": reversed_index_bus_dict[kv[0][0]],
"b2": reversed_index_bus_dict[kv[0][1]],
"sim": kv[1]})

我在本地运行Python 3.7和Spark 2.4.4时遇到了同样的错误。火花选项的组合毫无帮助。

我正在阅读拼花地板文件中严重倾斜的一排排。它们包含一个二进制列,其值在几个字节到超过10MB之间。尽管为spark.default.parallelism设置了较高的数量,但得到的数据帧包含相对较少的分区数量。分区的数量与我正在读取的拼花地板文件的数量相似,并且我不断得到套接字超时

我试图将spark.sql.files.maxPartitionBytes设置为一个足够小的值,但错误仍然存在。唯一有帮助的是读取数据后的repartition,以增加分区数量并更均匀地分布行。请注意,这只是一个观察结果,我仍然无法解释为什么错误会消失。

如果数据偏斜也是这里的一个主题,那么可以通过将代码更改为:来减轻它

input_lines 
.repartition(n) 
.map(...)

n取决于您的集群和工作特点,并且有一个最佳点。如果n太低,您将获得套接字超时。如果n过大,将对性能产生负面影响。

做一个重分区(1(对我来说很有效。我面临着同样的问题,只有很少的行(伪数据(。我尝试过的另一种替代方案是

.config('spark.default.pallelism',1(
.config('park.sql.shuffle.dartitions',1(

但这个1的值是存在的,因为我正在尝试用于伪数据

最新更新