Pyspark中使用collect和take方法时出现错误



我在collab (v - 3.1.2与JDK 8)上使用Pyspark。当我试图将。txt文本文件转换为基于元组的数据格式时,我面临一个错误。这是我的代码。

#reading data in 4 partitions and repartiotion the data in 6
captain_odi  = sc.textFile("/content/drive/MyDrive/PYspark/ODI data.csv",4,use_unicode=False)
captain_odi.take(10)

输出为:

[b',Player,Span,Mat,Inns,NO,Runs,HS,Ave,BF,SR,100,50,0,Unnamed: 13',
b'0,SR Tendulkar (INDIA),1989-2012,463,452,41,18426,200*,44.83,21367,86.23,49,96,20,',
b'1,KC Sangakkara (Asia/ICC/SL),2000-2015,404,380,41,14234,169,41.98,18048,78.86,25,93,15,',
b'2,RT Ponting (AUS/ICC),1995-2012,375,365,39,13704,164,42.03,17046,80.39,30,82,20,',
b'3,ST Jayasuriya (Asia/SL),1989-2011,445,433,18,13430,189,32.36,14725,91.2,28,68,34,']
现在,当我使用下面的代码创建相同数据的元组时:
fields = ["name","country","career","matches","won","loss","ties","toss"]
from collections import namedtuple
Captain = namedtuple("Captian",fields)
def ParseReader(line):
fields = line.split(",")
return Captain(fields[0],fields[1],fields[2],(fields[3]),(fields[4]),(fields[5]),(fields[6]),(fields[7]))
captains  = captain_odi.map(lambda x : ParseReader(x))
captains.take(10)

在执行。take()操作

时得到以下错误
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-15-01599f2d7aa3> in <module>()
----> 1 captains.take(10)
3 frames
/content/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.n".
--> 328                     format(target_id, ".", name), value)
329             else:
330                 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 7) (786e4ea62989 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
process()
File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
serializer.dump_stream(out_iter, outfile)
File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/content/spark-3.1.2-bin-hadoop3.2/python/pyspark/rdd.py", line 1560, in takeUpToNumLeft
yield next(iterator)
File "/content/spark-3.1.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
return f(*args, **kwargs)
File "<ipython-input-8-f7a02b0b1d17>", line 10, in <lambda>
File "<ipython-input-8-f7a02b0b1d17>", line 8, in ParseReader
TypeError: a bytes-like object is required, not 'str'

有人能帮我找出是什么导致这个错误吗?

错误是:

TypeError:需要一个bytes-like object,而不是'str'

输出为:

b",播放器,跨度,垫子,旅馆,不,运行时,海关,大街,男朋友,老,100年,50岁,0,未具名:13",

你能看到行首的小b吗?这意味着这个"字符串"的类型实际上不是字符串,而是字节。

字符串和字节字符串的区别是什么?

split是一个字符串对象方法。在分割之前,您需要首先使用正确的文件编码(希望是utf8)应用decode方法。

你需要改变这一行:

fields = line.split(",")

和这个:

fields = line.decode("utf8").split(",")  # choose the proper encoding - here, I put utf8

最新更新