pyspark数据框架中的文件问题



我一直在pyspark工具上工作,该工具基于搜索进行过滤,然后对这些结果进行排序。该数据框架是1400多个csv文件的汇编。当我试图运行代码时,我得到一条非常长的错误消息。对于意外的EOF,它似乎分解为java错误:

Py4JJavaError: An error occurred while calling o1331.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 202 in stage 56.0 failed 4 times, most recent failure: Lost task 202.3 in stage 56.0 (TID 7632, emr-master-f35-eels.sss.local, executor 31): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=128
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=-1
Line separator detection enabled=false
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=field selection: [2]
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=
Field delimiter=,
Line separator (normalized)=n
Line separator sequence=n
Quote character="
Quote escape character="
Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
... 23 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1080)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1062)
at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1484)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1471)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3267)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3264)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3264)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=128
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=-1
Line separator detection enabled=false
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=field selection: [2]
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=
Field delimiter=,
Line separator (normalized)=n
Line separator sequence=n
Quote character="
Quote escape character="
Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
... 23 more
(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o1331.collectToPython.n', JavaObject id=o1338), <traceback object at 0x7ff3f2ff9230>)

我在每个csv上单独运行这段代码,并将其缩小到导致此错误的6个。显然,我可以从列表中删除这6个文件,然后代码可以正常运行,但如果有一种方法可以使用代码来诊断和潜在地修复这些文件,我想先尝试这条路线。我该怎么做,有什么建议/想法吗?

编辑

根据下面的建议,我尝试用以下代码打开文件,然后打印内容:

with open('file.csv.bz2', 'r', encoding='ISO-8859-1') as f:
lines = f.readlines()
print(lines)

可以正常运行。然而,当我试图在pandas中打开它时,得到了一个EOFError。

您可以加载文件,同时保留所有记录:

  1. 你可以把没有这6个文件的所有记录加载到一个数据帧中。

  2. 在使用PERMISSIVE模式(参见示例)的情况下,使用#1中的模式加载6个文件,并保留格式不正确的列。

  3. 可选地,您可以重命名当前列名。参见使用PERMISSIVE模式不能在pyspark中保留损坏的行

  4. 现在你可以看到什么是畸形的,并决定如果放弃这些,写日志deal-letter队列,他们不管你决定。

相关内容

  • 没有找到相关文章