将 Spark 与 Elasticsearch 集成



我正在尝试将 Sparkdataframe 发送到 Elasticsearch 集群。我有Spark数据帧(df(。

我创建了索引="火花",但是,当我运行以下命令时:

df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes.wan.only","true")
.option("es.port","9092")
.option("es.net.ssl","true")
.option("es.nodes", "localhost")
.save("spark/docs")

我遇到了这个错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o144.save.
: java.lang.NoClassDefFoundError: scala/Product$class

火花版本:火花-3.0.0-bin-hadoop2.7

Elasticsearch 版本: elasticsearch-7.7.0

添加的依赖项:elasticsearch-hadoop-7.7.0.jar

我相信你应该在写入时指定es.resource,格式可以指定为es。以下内容在Spark 2.4.5(在docker上运行(和 ES 版本 7.5.1 上对我有用。首先,确保您使用以下软件包运行pyspark

PYSPARK_SUBMIT_ARGS --packages org.elasticsearch:elasticsearch-hadoop:7.5.1 pyspark-shell

在 PySpark 端,例如在笔记本中:

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setMaster("local").setAppName("ES Test")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "elasticsearch")  # name of my docker container, you might keep localhost
conf.set("es.port", "9200")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
colnames = [('col_' + str(i+1)) for i in range(11)]
df1 = spark._sc.parallelize([
[it for it in range(11)], 
[it for it in range(1,12)]]
).toDF((colnames))
(
df1
.write
.format('es')
.option(
'es.resource', '%s/%s' % ('<resource_name>', '<table_name>'))
.save()
)

另外 - 验证这是使用elasticsearchPython 包编写的:

from elasticsearch import Elasticsearch
esclient = Elasticsearch(['elasticsearch:9200'])

response = esclient.search(
index='<resource_name>*',
body={
"query": {
"match": {
"col1": 1
}
},
"aggs": {
"test_agg": {
"terms": {
"field": "col1",
"size": 10
}
}
}
}
)

最新更新