上下文
我正在使用 Spark 1.5 。
i有一个文件 records.txt ,该文件是 ctrl A
划定的,在该文件中,31st index用于subscriber_id。对于某些记录,subscriber_id为空。使用subbiber_id记录并不为空。
此处subscriber_id( UK8jikahasjp23
)位于最后一个属性之前:
99^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^AUK8jikahasjp23^A
用subbiber_id记录为空:
23^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^A^A
问题
我正在获取 java.lang.arayindexoutofboundsexception 用于带有空subscriber_id的记录。
为什么spark投掷 java.lang.arayindexoutofboundsexception 对于字段subscriber_id的空值?
16/08/20 10:22:18警告Scheduler.taskSetManager:阶段8.0中的丢失任务31.0:
case class CustomerCard(accountNumber:String, subscriber_id:String,subscriptionStatus:String )
object CustomerCardProcess {
val log = LoggerFactory.getLogger(this.getClass.getName)
def doPerform(sc: SparkContext, sqlContext: HiveContext, custCardRDD: RDD[String]): DataFrame = {
import sqlContext.implicits._
log.info("doCustomerCardProcess method started")
val splitRDD = custCardRDD.map(elem => elem.split("\u0001"))
val schemaRDD = splitRDD.map(arr => new CustomerCard( arr(3).trim, arr(31).trim,arr(8).trim))
schemaRDD.toDF().registerTempTable("customer_card")
val custCardDF = sqlContext.sql(
"""
|SELECT
|accountNumber,
|subscriber_id
|FROM
|customer_card
|WHERE
|subscriptionStatus IN('AB', 'AC', 'PC')
|AND accountNumber IS NOT NULL AND LENGTH(accountNumber) > 0
""".stripMargin)
log.info("doCustomerCardProcess method ended")
custCardDF
}
}
错误
13/09/12 23:22:18 WARN SHEDULER.TASKSETSETMANAGER:丢失任务31.0 in 阶段8.0(TID 595,:Java.lang.ArrayIndexoutofBoundSexception:31 at com.org.customercardprocess $$ anonfun $ 2.Apply(castureCardProcess.scala:23) 在 com.org.customercardprocess $$ anonfun $ 2.Apply(castureCardProcess.scala:23) 在scala.collection.iterator $$ anon $ 11.next(iterator.scala:328) scala.collection.iterator $$ anon $ 11.next(iterator.scala:328)at scala.collection.iterator $$ anon $ 14.hasnext(iterator.scala:389)at scala.collection.iterator $$ anon $ 11.hasnext(iterator.scala:327)at scala.collection.iterator $$ anon $ 11.hasnext(iterator.scala:327)at org.apache.spark.shuffle.sort.bypassmergesortshufflewriter.insertall(bypassmergesortshufflewriter.java:118) 在 org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter.scala:73) 在 org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:73) 在 org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:41) atorg.apache.spark.scheduler.task.run(task.scala:88)at org.apache.spark.executor.executor $ taskrunner.run(executor.scala:214) 在 java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) 在 java.util.concurrent.threadpoolexecutor $ worker.run(threadpoolexecutor.java:615) 在java.lang.thread.run(thread.java:745)
谁能帮助我解决这个问题?
split
函数正在忽略分裂行末尾的所有空字段。所以,
更改您的以下行
val splitRDD = custCardRDD.map(elem => elem.split("\u0001"))
to
val splitRDD = custCardRDD.map(elem => elem.split("\u0001", -1))
-1
告诉所有空字段。