从PySpark读取Elasticsearch索引



谁能告诉我为什么这个测试脚本PySpark错误出来?(python 3.6.8, hadoop 3.3.1, spark 3.2.1, elasticsearch-hadoop 7.14)

from pyspark.sql import SparkSession, SQLContext
myspark = SparkSession.builder 
.appName("My test.") 
.master("spark://xx.xx.xx:7077") 
.config("es.nodes", "xx.xx.xx.xx") 
.config("es.port", "9200") 
.config("es.net.http.auth.user", "xxxx") 
.config("es.net.http.auth.pass", "xxxx") 
.getOrCreate()
mycontext = SQLContext(myspark)
myquery = '{ "query": { "match_all": {} }}'
myreader = mycontext.read.format("org.elasticsearch.spark.sql") 
.option("es.nodes", "xx.xx.xx.xx") 
.option("es.port", "9200") 
.option("es.net.http.auth.user", "xxxx") 
.option("es.net.http.auth.pass", "xxxx") 
.option("es.query", myquery)
myframe = myreader.load("myindex")

.load()得到的错误是:

py4j.protocol.Py4JJavaError: An error occurred while calling 039.load.
: java.lang.NoClassDefFoundError: scala/Product$class
at org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:220)
at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:97)
...

我也有一个使用旧的SparkConf(),SparkContext().newAPIHadoopRDD()的测试片段,它可以很好地连接到相同的spark主机和弹性集群。因此,这就排除了我的类路径、防火墙或身份验证的许多潜在问题。

为了使用spark 3.2.1您需要elasticsearch-hadoop 8.2.0版本。

你可以在发行说明上看到

最新更新