使用 Spark 处理 txt 文件



我需要在Spark中将文本文件读取到数据集[T]中。该文件的格式不正确,因为它有一些空白字段,并且很难定义参数来拆分字符串。我一直在尝试将数据读取到 RDD 中,然后将其转换为案例类类型,但是,并非所有字段都正确解析,并且出现错误:

java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at captify.test.spark.Stats$$anonfun$2.apply(Stats.scala:53)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:255)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:836)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我该怎么做才能正确处理此文件? 我的.txt文件如下所示(匿名随机数据,但格式相同):

NEW50752085  84.0485 -76.3851  85.1   THE NAME OF AN OBJECT                       
DEM00752631  51.9581 -85.3315  98.5   THE NAME OF AN OBJECT                                  
KI004867205  40.8518  15.9351 276.5   THE NAME OF AN OBJECT           FHG   41196

我试图以这样的方式处理它:

val dataRdd = spark.sparkContext
.textFile("file.txt")
val dataArray = dataRdd
.map(_.split(" "))
case class caseClass(
c1: String,
c2: Double,
c3: Double,
c4: Double,
c5: String,
c6: String,
c7: String
)
val df = dataArray
.map(record => (record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), record(5), record(6)))
.map{case (c1, c2, c3, c4, c5, c6, c7) => CaseClass(c1, c2, c3, c4, c5, c6, c7)
}.toDF()

我将在这个答案中做出一些可能不正确的假设,但我相信根据您给出的数据和提供的错误,它们是正确的。

  • 假设 1:您的数据由多个空格分隔。我是根据你提供的空字符串的数字格式异常得出这个假设的。如果您的文件由制表符分隔,我们就不会遇到这种情况。
  • 假设 2(这是出于我自己的考虑,但可能不是真的):每个数据元素都由相同数量的空格分隔。对于这个答案的其余部分,我将假设空格数是四个。如果不是这个假设,这将成为一个更加困难的问题。
  • 假设 3:7 个数据元素中只有最后 2 个是可选的,有时不会出现。

您的 NumberFormatException 是由您拆分一个空格引起的。假设以下行由空格分隔:

NEW50752085    84.0485    -76.3851    85.1    THE NAME OF AN OBJECT 

当您在一个空格上拆分时,此行将转换为以下数组:

Array(NEW50752085, "", "", "", 84.0485, "", "", "", -76.3851, "", "", "", 85.1, "", "", "", THE, NAME, OF, AN, OBJECT)

此数组的第二个元素(即空字符串)是您尝试转换为 Double 的元素。这就是给你空字符串上的NumberFormatException的原因。

.map(_.split("    "))

当您将其更改为拆分为 4 个空格时(基于我的假设,这可能是也可能不是真的),你会得到以下内容:

Array(NEW50752085, 84.0485, -76.3851, 85.1, THE NAME OF AN OBJECT)

但是现在我们遇到了另一个问题——这只有五个元素!我们想要七个。

我们可以通过修改您以后的代码来更改此设置:

val df = dataArray.map(record => {
(record(0), record(1).toDouble, record(2).toDouble, record(3).toDouble, record(4), 
if(record.size > 5) record(5) else "",
if(record.size > 6) record(6) else "")
}).map{case (c1, c2, c3, c4, c5, c6, c7) => caseClass(c1, c2, c3, c4, c5, c6, c7)}.toDF
df.show
+-----------+-------+--------+----+--------------------+---+-----+
|         c1|     c2|      c3|  c4|                  c5| c6|   c7|
+-----------+-------+--------+----+--------------------+---+-----+
|NEW50752085|84.0485|-76.3851|85.1|THE NAME OF AN OB...|   |     |
|DEM00752631|51.9581|-85.3315|98.5|THE NAME OF AN OB...|   |     |
|KI004867205|40.8518| 15.9351|76.5|THE NAME OF AN OB...|FHG|41196|
+-----------+-------+--------+----+--------------------+---+-----+

同样,仅当所有元素都由相同数量的空格分隔时,此方法才有效。

如果你的数据没有 Spark 可读的封闭格式,你唯一的选择是使用FileInputFormat创建自己的自定义阅读器

这样,您将能够定义到数据的每一行的解析流,以确定如何拆分和处理边缘情况。

深入了解它的最好方法是通过示例。这是一个非常可靠的: https://www.ae.be/blog-en/ingesting-data-spark-using-custom-hadoop-fileinputformat/

最新更新