Google Colab中的Spark SQL在大数据上失败



我正在使用Google Colab向我的学生教授Spark(包括Spark SQL),并使用以下命令集来安装和配置Spark

!pip install -q pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

加载数据
!wget 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
!mv kddcup.data_10_percent.gz kdd10.gz
data_file = "./kdd10.gz"
raw_data = sc.textFile(data_file)                    # approx 490K records
raw_data_sample = raw_data.sample(False, 0.1, 1234)  # 10% sample, 49K records

准备数据

from pyspark.sql import Row
csv_data = raw_data.map(lambda l: l.split(","))              # using full data
#csv_data = raw_data_sample.map(lambda l: l.split(","))      # using 10% sample data
row_data = csv_data.map(lambda p: Row(
duration=int(p[0]), 
protocol_type=p[1],
service=p[2],
flag=p[3],
src_bytes=int(p[4]),
dst_bytes=int(p[5])
)
)

创建表
#interactions_df = sqlContext.createDataFrame(row_data) -- deprecated
interactions_df = spark.createDataFrame(row_data)
#interactions_df.registerTempTable("interactions") -- deprecated
interactions_df.createOrReplaceTempView("interactions")

Run SQL Query

#tcp_interactions = sqlContext.sql("""               --- deprecated
tcp_interactions = spark.sql("""
SELECT duration, protocol_type, dst_bytes FROM interactions WHERE protocol_type = 'udp' 
""")
tcp_interactions.show()

我的问题如下。

对于10%的样本数据,查询可以完美地运行并给出结果,但是对于完整的490K记录数据文件,查询会无限期挂起。除了中止

命令外,没有这样的错误。
/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
492 
493         if isinstance(truncate, bool) and truncate:
--> 494             print(self._jdf.showString(n, 20, vertical))
495         else:
496             try:
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
1318             proto.END_COMMAND_PART
1319 
-> 1320         answer = self.gateway_client.send_command(command)
1321         return_value = get_return_value(
1322             answer, self.gateway_client, self.target_id, self.name)
/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in send_command(self, command, retry, binary)
1036         connection = self._get_connection()
1037         try:
-> 1038             response = connection.send_command(command)
1039             if binary:
1040                 return response, self._create_connection_guard(connection)
/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py in send_command(self, command)
473         try:
474             while True:
--> 475                 answer = smart_decode(self.stream.readline()[:-1])
476                 logger.debug("Answer received: {0}".format(answer))
477                 # Happens when a the other end is dead. There might be an empty
/usr/lib/python3.7/socket.py in readinto(self, b)
587         while True:
588             try:
--> 589                 return self._sock.recv_into(b)
590             except timeout:
591                 self._timeout_occurred = True
KeyboardInterrupt: 

我在另一个类似大小的数据集上观察到了同样的问题。有趣的是,6个月前,在教授上一批学生时,这段代码可以完美地处理490K的数据。那么到底是哪里出了问题,我该如何解决呢?感谢您在这方面的帮助。

令人费解的是,下面的查询既适用于完整数据,也适用于示例数据。但其他方法都不起作用!

#tcp_interactions = sqlContext.sql("""  --- deprecated
tcp_interactions = spark.sql("""
SELECT distinct(protocol_type) FROM interactions 
""")
tcp_interactions.show()

通过退回到早期版本的Spark修复了这个问题。手动下载并安装spark-3.0.3,而不是使用pip(下载spark 3.2.1)解决了这个问题。要查看解决方案的工作情况,请访问https://github.com/Praxis-QR/BDSN/blob/main/SQL_Spark_with_OLD_version_JoseDianes_Intro.ipynb