我正在尝试构建一个应用程序,该应用程序在Elasticsearch中索引一堆文档,并通过布尔查询将文档检索到Spark进行机器学习。我试图通过Python通过pySpark和elasticsearch-py来完成这一切。
对于机器学习部分,我需要使用来自每个文本文档的令牌创建特征。要做到这一点,我需要处理/分析每个文档的典型内容,如小写、词干提取、删除停止词等。
基本上我需要把"Quickly the brown fox is getting away."
变成"quick brown fox get away"
或["quick", "brown", "fox", "get", "away"]
。我知道您可以通过各种Python包和函数很容易地做到这一点,但我想使用Elasticsearch分析器来做到这一点。此外,我需要以一种对大数据集有效的方式来做。
基本上,我想直接从Elasticsearch中提取文本的分析版本或分析的令牌,并在Spark框架中以有效的方式完成。作为一个相对的ES新手,我已经弄清楚了如何通过调整elasticsearch-hadoop插件直接从Spark查询文档:
http://blog.qbox.io/elasticsearch-in-apache-spark-python基本上是这样的:
read_conf = {
'es.nodes': 'localhost',
'es.port': '9200',
'es.resource': index_name + '/' + index_type,
'es.query': '{ "query" : { "match_all" : {} }}',
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass = 'org.elasticsearch.hadoop.mr.EsInputFormat',
keyClass = 'org.apache.hadoop.io.NullWritable',
valueClass = 'org.elasticsearch.hadoop.mr.LinkedMapWritable',
conf = read_conf)
此代码将或多或少地从ES中检索未分析的原始存储文本版本。我还没有弄清楚的是如何以有效的方式查询分析的文本/令牌。到目前为止,我已经想出了两种可能的方法:
- 将elasticsearch-py提供的es.termvector()函数映射到RDD的每个记录上,以检索分析过的令牌。
- 将elasticsearch-py提供的es. indexes .analyze()函数映射到RDD的每条记录上,分析每条记录。
参见相关信息:Elasticsearch analyze()在Python中不兼容Spark ?
根据我的理解,这两种方法对于大数据集来说都是非常低效的,因为它们涉及到对RDD中的每条记录的REST调用。
因此,我的问题是- 是否有一种替代的有效方法,我可以从ES中提取分析的文本/令牌,而无需为每个记录进行REST调用?也许是一个ES设置,将分析的文本与原始文本一起存储在一个字段中?或者在查询本身请求分析令牌/文本的能力,以便我可以将其包含在elasticsearch-hadoop配置中。 是否有一个替代或更好的解决方案,可以利用Spark的并行机器学习能力与es类似的查询/存储/分析能力?
我可能已经找到了一个临时解决方案,通过在搜索查询的正文中使用"fielddata_fields"参数。
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-fielddata-fields.html 例如,read_conf = {
'es.nodes': 'localhost',
'es.port': '9200',
'es.resource': index_name + '/' + index_type,
'es.query': '{ "query" : { "match_all" : {} }, "fields": ["_id"], "fielddata_fields": "text" }',
}
返回带有"text"字段的id和(分析过的)令牌的文档。目前还不清楚这对我工作中的内存消耗有何影响。它也不包括文档中每个令牌的术语频率,这可能是必要的信息。如果有人知道如何将术语频率添加到符号中,我很乐意听到。