Elasticsearch analyze()与Python中的Spark不兼容



我使用Python 3在PySpark中使用elasticsearch-py客户端,我在使用ES与RDD结合的analyze()函数时遇到了一个问题。特别是,我的RDD中的每条记录都是一串文本,我试图分析它以获取令牌信息,但是当我试图在Spark中的map函数中使用它时,我得到了一个错误。

例如:

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]
{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

然而,当我尝试这个:

trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

我得到一个非常非常长的与酸洗有关的错误消息(这里是它的结尾):

(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg
  save_memoryviewself obj
: Could not pickle object as excessively deep recursion required.
raise.()    112    113def(,):PicklingError

我不确定这个错误是什么意思。我做错了什么吗?是否有一种方法将ES分析功能映射到RDD的记录上?

编辑:当从elasticsearch-py应用其他函数时,我也得到了这种行为(例如,es.termvector())。

本质上,Elasticsearch客户端是不可序列化的。因此,您需要做的是为每个分区创建一个客户机实例,并处理它们:

def get_tokens(part): es = Elasticsearch() yield [es.indices.analyze(text=x)['tokens'][0] for x in part] rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2) rdd.mapPartitions(lambda p: get_tokens(p)).collect()

应该给出以下结果:Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]

注意,对于大型数据集,这将是非常低效的,因为它涉及到对数据集中每个元素的REST调用ES。

相关内容

  • 没有找到相关文章

最新更新