Py4JJava将PySpark数据帧写入Parquet文件时出错



总之,我有100k行数据作为csv文件。这是它的样本:

ID,Name,Surname,Birthday,Details
0,Agjqyru,Qtlzi,1923-02-23,";{城市=奈夫查拉,性别=男性,教育=大学}">
1,Zkaczi,Gvuvwle,2002-02-28,";{城市=明加奇韦,性别=女性,教育=医生}">
2,赫克布罗斯,莱姆夫克,1948-02-29,";{城市=乌贾,性别=男性,教育=大学}">
3,Dddtulkeo,Fdnccbp,1903-07-01,";{城市=达什卡桑,性别=女性,教育=学士}">
4,Wssqm,Kzekihhqjmrd,1935-05-10,";{城市=巴库,性别=女性,教育=未受过教育}">
5,Iurg,Nglzxwu,1915-04-02,";{城市=科哈文德,性别=男性,教育=学校}">

我的任务是将详细信息列划分为3列,并将数据保存为镶木地板文件。这是我迄今为止的尝试:

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext).getOrCreate(sparkContext)
sparkSession = SparkSession.builder.master("local").appName("parquet_example").getOrCreate()
# read csv file 
__DATA = spark.read.csv('data.csv', header = True)
# divide column details into 3
__DATA = __DATA.withColumn("Details", split(regexp_replace(regexp_replace((regexp_replace("Details",'{|}',"")),':',','),'"|"',""),','))
.withColumn("City", element_at("Details",1))
.withColumn("Gender", element_at("Details",2))
.withColumn("Education",element_at("Details",3)).drop("Details").rdd
# clean data
__DATA = __DATA.map(lambda x : (x[0], x[1], x[2], x[3][x[3].index("=")+1:], x[4][x[4].index("=")+1:], x[5][x[5].index("=")+1:]))
dfSchema = StructType([       
StructField('Name', StringType(), True),
StructField('Surname', StringType(), True),
StructField('Birthdate', StringType(), True),
StructField('City', StringType(), True),
StructField('Gender', StringType(), True),
StructField('Education', StringType(), True)
])
__DATA = __DATA.toDF(dfSchema)

然而,每当我尝试以下代码将其保存到拼花地板文件时,我都会收到Py4JJavaError错误:

__DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet

那么,我错过了什么?

以下是完整的错误堆栈:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-55-67be3fc6fc33> in <module>
----> 1 __DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet
C:<user>spark-3.0.0-bin-hadoop2.7pythonpysparksqlreadwriter.py in parquet(self, path, mode, partitionBy, compression)
934             self.partitionBy(partitionBy)
935         self._set_opts(compression=compression)
--> 936         self._jwrite.parquet(path)
937 
938     @since(1.6)
C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpy4j-0.10.9-src.zippy4jjava_gateway.py in __call__(self, *args)
1302 
1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
1305             answer, self.gateway_client, self.target_id, self.name)
1306 
C:<user>spark-3.0.0-bin-hadoop2.7pythonpysparksqlutils.py in deco(*a, **kw)
129     def deco(*a, **kw):
130         try:
--> 131             return f(*a, **kw)
132         except py4j.protocol.Py4JJavaError as e:
133             converted = convert_exception(e.java_exception)
C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpy4j-0.10.9-src.zippy4jprotocol.py in get_return_value(answer, gateway_client, target_id, name)
324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.n".
328                     format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1290.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 60, bkhddev01, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 605, in main
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 597, in process
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkutil.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
... 32 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 605, in main
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 597, in process
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:<user>spark-3.0.0-bin-hadoop2.7pythonlibpyspark.zippysparkutil.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

错误是由于以下代码造成的。

x[3][x[3].index("=")+1:]

在索引3处,您有Birthdate字段。您应该在索引3之后开始切片操作,从这里您将清理Details列下的字段。

您可以只在DF级别执行清除操作,而不是将DF转换为RDD。

__DATA = __DATA.withColumn("Details", split(regexp_replace("Details",'{|}',""),','))
.withColumn("City", split(col("Details")[0], '=')[1]) 
.withColumn("Gender", split(col("Details")[1], '=')[1])
.withColumn("Education",split(col("Details")[2], '=')[1]).drop("Details")

最新更新