我有一个Spark SQL的任务,原始数据是:
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/10 8:26| 2.55| 17850|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/10 8:26| 2.75| 17850|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/10 8:26| 3.39| 17850|United Kingdom|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/10 8:26| 7.65| 17850|United Kingdom|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/10 8:26| 4.25| 17850|United Kingdom|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/10 8:28| 1.85| 17850|United Kingdom|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/10 8:34| 1.69| 13047|United Kingdom|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/10 8:34| 2.1| 13047|United Kingdom|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/10 8:34| 3.75| 13047|United Kingdom|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/10 8:34| 1.65| 13047|United Kingdom|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/10 8:34| 4.25| 13047|United Kingdom|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/10 8:34| 4.95| 13047|United Kingdom|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/10 8:34| 9.95| 13047|United Kingdom|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/10 8:34| 5.95| 13047|United Kingdom|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/10 8:34| 7.95| 13047|United Kingdom|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
在我的任务中,我想统计哪个单词是"描述"字段中出现最多的单词。所以我做了以下操作:使用flatMap通过用空格分隔Description字段,从原始DataFrame创建一个新的DataFrame,然后构建一个新表,下面是一个新表格:
+------+-------+---+
|number| word|lit|
+------+-------+---+
| 0| WHITE| 1|
| 1|HANGING| 1|
| 2| HEART| 1|
| 3|T-LIGHT| 1|
| 4| HOLDER| 1|
| 5| WHITE| 1|
| 6| METAL| 1|
| 7|LANTERN| 1|
| 8| CREAM| 1|
| 9| CUPID| 1|
| 10| HEARTS| 1|
| 11| COAT| 1|
| 12| HANGER| 1|
| 13|KNITTED| 1|
| 14| UNION| 1|
| 15| FLAG| 1|
| 16| HOT| 1|
| 17| WATER| 1|
| 18| BOTTLE| 1|
| 19| RED| 1|
+------+-------+---+
这是我的代码:
SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
data.flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
public Iterator<Row> call(Row r) throws Exception {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
List<Row> listItemRow = new ArrayList<Row>();
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
return listItemRow.iterator();
}
}, RowEncoder.apply(new StructType().add("number", "integer").add("word", "string").add("lit", "integer"))).createOrReplaceTempView("data");
spark.sql("select * from data").show();
我有一个问题,如果我按分组或执行其他复杂的SQL操作,程序会出错。
这是我分组时的代码:spark.sql("select word, count(lit) from data group by word").show();
这是我的错误:
java.lang.NullPointerException
at com.spark.part_4.Main$1.call(Main.java:33)
at com.spark.part_4.Main$1.call(Main.java:27)
at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
21/12/03 00:08:39 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
at com.spark.part_4.Main$1.call(Main.java:33)
at com.spark.part_4.Main$1.call(Main.java:27)
at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
21/12/03 00:08:39 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
21/12/03 00:08:39 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
21/12/03 00:08:39 INFO TaskSchedulerImpl: Cancelling stage 2
21/12/03 00:08:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage cancelled
21/12/03 00:08:39 INFO DAGScheduler: ShuffleMapStage 2 (show at Main.java:45) failed in 0.298 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
at com.spark.part_4.Main$1.call(Main.java:33)
at com.spark.part_4.Main$1.call(Main.java:27)
at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
21/12/03 00:08:39 INFO DAGScheduler: Job 2 failed: show at Main.java:45, took 0.312624 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.10 executor driver): java.lang.NullPointerException
at com.spark.part_4.Main$1.call(Main.java:33)
at com.spark.part_4.Main$1.call(Main.java:27)
at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
at org.apache.spark.sql.Dataset.show(Dataset.scala:793)
at com.spark.part_4.Main.main(Main.java:45)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at com.spark.part_4.Main$1.call(Main.java:33)
at com.spark.part_4.Main$1.call(Main.java:27)
at org.apache.spark.sql.Dataset.$anonfun$flatMap$2(Dataset.scala:2876)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我希望得到大家的帮助,谢谢。。。
应用FlatMapFunction
时会得到java.lang.NullPointerException
,因为数据集中可能有空值。在本例中,您似乎使用了Description
列。
如果值为空,则该列可通过火花和以下行读取为null
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
当r.getString(2)
返回null
并且您试图对null引用调用函数split
时,可能会引发此异常。
您可以尝试通过在拆分前检查是否存在null
值来解决此问题,例如
data.flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
public Iterator<Row> call(Row r) throws Exception {
List<Row> listItemRow = new ArrayList<Row>();
//check if null before splitting here
if(r.getString(2) != null) {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
}
return listItemRow.iterator();
}
}, RowEncoder.apply(
new StructType().add("number", "integer")
.add("word", "string")
.add("lit", "integer")
)).createOrReplaceTempView("data");
您可以使用查看具有null
值的这些行
data.where("Description is null").show();
并在应用flatMap
(例如(之前类似地过滤这些行
data.where("Description is not null")
.flatMap(new FlatMapFunction<Row, Row>() {
//continue the rest of your code