KeyError: '1' 后 zip 方法 - 遵循学习 pyspark 教程



我正在学习"学习PySpark"教程(在此链接(。 当我跑步时

selector = ft.ChiSqSelector(4).fit(births_train)
s1 = births_train.map(lambda row: row.label)
s2 = selector.transform(births_train.map(lambda row: row.features))
print(s1.take(1))
print(s2.take(1))
print(type(s1))
print(type(s2))

我有这个输出:

[0.0]

[密集矢量([0.0, 99.0, 99.0, 999.0](]

<类>

<类>

当我尝试将结果与zip合并时,如教程建议的那样:

s3=s1.zip(s2)
print(type(s3))
print(s3.collect())

我收到此错误:

<类>

--------------------------------------------------------------------------- Py4JJavaError 回溯(最近一次调用( 最后( 在 (( 中 1 s3=s1.zip(s2( 2 打印(类型(S3(( ----> 3 打印(s3.collect(((

/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/rdd.py in 收集(自我( 832 """ 833 使用 SCCallSiteSync(self.context( 作为 css: --> 834 sock_info = self.ctx._jvm。PythonRDD.collectAndServe(self._jrdd.rdd((( 835返回列表(_load_from_socket(sock_info,self._jrdd_deserializer(( 836

/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py 在呼叫中(自我,*参数( 1255 答案 = self.gateway_client.send_command(command( 1256 return_value = get_return_value( -> 1257 回答, self.gateway_client, self.target_id, self.name( 1258 1259 对于temp_args中的temp_arg:

/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/sql/utils.py in 装饰(*a, **kw( 61 def deco(*a, **kw(: 62 尝试: ---> 63 返回 f(*a, **kw( 64 除了 py4j.protocol.Py4JJavaError 作为 e: 65 s = e.java_exception.toString((

/content/spark-2.3.1-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py 在get_return_value(答案、gateway_client、target_id、姓名( 326 提高 Py4JJava错误( 327 "呼叫 {0}{1}{2} 时出错。"。 --> 328格式(target_id,".",名称(,值( 329 其他: 330 提高 Py4JError(

Py4JJava错误:调用时出错 z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException:作业由于阶段故障而中止: 阶段 308.0 中的任务 0 失败 1 次,最近一次失败:任务丢失 阶段 308.0 中的 0.0(TID 8596,本地主机,执行器驱动程序(:org.apache.spark.api.python.PythonException:回溯(最新( 最后呼叫(:文件 "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", 230 号线,在主干线 process(( 文件 "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", 225行,正在进行中 serializer.dump_stream(func(split_index, iterator(, outfile( 文件 "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", 324路,dump_stream self.serializer.dump_stream(self._batched(迭代器(、流(文件 "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", 139号线,dump_stream 对于迭代器中的obj:文件"/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", 313路,_batched 对于迭代器中的项:文件 ",第 1 行,在文件中 "/content/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", 第 75 行,在 返回 lambda *a: f(*a( 文件 "/content/spark-2.3.1-bin-hadoop2.7/python/pyspark/util.py",第 55 行, 在包装器中 返回 f(*args, **kwargs( 文件 ", 第 9 行, 在重新编码中 KeyError: '1'

为什么?

以下代码对我有用,尽管我不知道为什么:

truth = sc.parallelize(births_test.map(lambda row: row.label).collect())
prediction = sc.parallelize(LR_Model.predict(births_test.map(lambda row: row.features)).map(lambda x: x * 1.0).collect())
LR_results = truth.zip(prediction)

最新更新