我的设置
我正在使用以下组件
- 火花-core_2.10
- 火花-sql_2.10
我的问题
这基本上是我的代码
Dataset<Row> rowsSource = spark.read()
.option("header", "true")
.option("delimiter", ";")
.csv("source.csv");
Dataset<Row> rowsTarget = spark.read()
.option("header", "true")
.option("delimiter", ";")
.csv("target.csv");
rowsSource.createOrReplaceTempView("source");
rowsTarget.createOrReplaceTempView("target");
Dataset<Row> result = spark.sql("SELECT source.Id FROM source" +
" LEFT OUTER JOIN target USING(Id)" +
" WHERE target.Id IS NULL");
result.show();
以下是一些测试数据:
源:
"id";"状态">
"1";"错误">
"2";"好">
目标:
"id";"状态">
"2";"好">
我希望 SQL 语句只找到一个 Id,那就是"1">
但是如果我运行它,并且在执行 SQL 语句的行中发生异常
2017-03-21 17:00:09,693 INFO [main] com.materna.mobility.smart.selenium.Aaaa: starting
Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `Detail` cannot be resolved on the left side of the join. The left-side columns: ["Detail", Detailp, Detaild, Detailb, Amount - 2016 48 +0100/1, Amount - 2016 49 +0100/1, Amount - 2016 50 +0100/1, Amount - 2016 51 +0100/1, Amount - 2016 52 +0100/1];
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
at MyClass.main(MyClass.java:48)
如果我在 Id 之前插入一个额外的分号 (;),一切都按预期工作,下面是一个例子:
;"同上";"状态">
我认为 Spark 然后解析 3 列,但由于第一列无效,因此会被忽略。
问题
我的CSV文件包含一个BOM(字节顺序标记)(我刚刚发现)
字节顺序标记 (BOM) 是一个 Unicode 字符,U+FEFF 字节顺序标记 (BOM),它在文本流开头显示为幻数,可以向使用文本的程序发出几个信号
再做一些搜索后,我发现了这个问题: https://github.com/databricks/spark-csv/issues/142
显然,自 2015 年以来,这一直是一个问题
修复
最简单的方法是从文件中删除 BOM。
我发现的另一个修复(见上面的问题)是您可以在第一列名称前面添加一个额外的分号。显然,它会再解析一列,但第一列无效并被忽略。但是:我强烈建议您不要使用它,因为它将来可能会修复,并且上述解决方案更可靠
可视化效果
维基百科指出,如果我使用 UTF-8(我做到了),我应该在我的文件"EF BB BF"的前面看到以下字符(十六进制)
在这里,您可以看到我期望的CSV文件应该是什么样子(因为我还不知道它们有BOM),但实际上它们的外观
由于我缺乏声誉,我无法发布内容的图像,但是您来了:
- 预期和实际内容:https://i.stack.imgur.com/oggPb.jpg