我试图在远程mongodb集合中插入spark sql数据框架。之前,我用MongoClient编写了一个java程序来检查远程集合是否可以访问,并且我成功地做到了。
我现在的火花代码如下-
scala> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@1a8b22b5
scala> val depts = sqlContext.sql("select * from test.user_details")
depts: org.apache.spark.sql.DataFrame = [user_id: string, profile_name: string ... 7 more fields]
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<hostname>:27017/<dbname>.<collection>")).mode(SaveMode.Overwrite).format("com.mongodb.spark.sql").save()
这将给出以下错误-
java.lang.AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 84 elided
我还尝试了下面的错误:
scala> depts.write.options(scala.collection.Map("uri" -> "mongodb://<username>:<pwd>@<host>:27017/<database>.<collection>")).mode(SaveMode.Overwrite).save()
java.lang.IllegalArgumentException: 'path' is not specified
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$17.apply(DataSource.scala:438)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:117)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:437)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 58 elided
我已经导入了以下包-
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.mongodb.spark.config._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._
depts.show()按预期工作。
数据框创建成功。请问有人能给我一些建议吗?由于假设您使用的是MongoDB Spark Connector v1.0,您可以像下面这样保存DataFrames SQL:
// DataFrames SQL example
df.registerTempTable("temporary")
val depts = sqlContext.sql("select * from test.user_details")
depts.show()
// Save out the filtered DataFrame result
MongoSpark.save(depts.write.option("uri", "mongodb://hostname:27017/database.collection").mode("overwrite"))
更多信息请参见MongoDB Spark Connector: Spark SQL
关于MongoDB和Spark使用docker的简单演示,请参见MongoDB Spark docker: examples。Scala - dataframes看看这个错误,想想可能的解决方法。这是由于MongoDB的Spark连接器和您使用的Spark版本不匹配。
. lang。AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation (Lorg/apache/火花/sql/SQLContext; Lorg/apache/火花/sql/SaveMode; Lscala/收集/不可变/地图;Lorg/apache/火花/sql/数据集;)Lorg/apache/火花/sql/资源/BaseRelation;
引用java.lang.AbstractMethodError的javadoc:
当应用程序试图调用抽象方法时抛出。通常,编译器会捕获此错误;此错误仅可能在运行时发生,如果自当前执行的方法上次编译以来,某些类的定义发生了不兼容的更改。
这很好地解释了您所遇到的情况(注意以"此错误只能在运行时发生"开头的部分)。
我的猜测是堆栈跟踪中DefaultSource.createRelation
方法中的Lorg/apache/spark/sql/Dataset
部分正是罪魁祸首。
换句话说,那行使用data: DataFrame
而不是Dataset
,这在这个方向上是不兼容的,即DataFrame
只是Dataset[Row]
的Scala类型别名,但任何数据集都不是DataFrame
,因此运行时错误。
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation