我是Python的新手。我也是pysaprk的新手。我正在尝试运行一个代码,该代码需要看起来像这样的元组(id , (span, mention))
来执行.map(lambda (id, (span, text)): (id, text))
。
我正在使用的代码是:
m = text
.map(lambda (id, (span, text)): (id, text))
.mapValues(lambda v: ngrams(v, self.max_ngram))'''error triggered here'''
.flatMap(lambda (target, tokens): (((target, t), 1) for t in tokens))
原始数据的格式(id, source, span, text)
:
{'_id': u'en.wikipedia.org/wiki/Cerambycidae',
'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
'span': (61, 73),
'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
{'_id': u'en.wikipedia.org/wiki/Dru_Drury',
'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
'span': (20, 29),
'text': u'It was described by Dru Drury in 1770.'}]
我收到此错误:
for k, v in iterator:
TypeError: tuple indices must be integers, not str
我知道groupByKey在pairwiseRDDs上工作,所以我想知道如何正确执行groupByKey来解决这个问题?
任何帮助或指导将不胜感激。
我正在使用python 2.7和pyspark 2.3.0。
提前谢谢你。
首先,您需要将数据映射到具有键和值的表单中,然后是 groupByKey。
键和值形式始终是元组 (a, b(,键是 a 和值 b.a 和 b 本身可能是元组。
rdd = sc.parallelize([{'_id': u'en.wikipedia.org/wiki/Cerambycidae',
'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
'span': (61, 73),
'text': u'"Plinthocoelium virens" is a species of beetle in the family Cerambycidae.'},
{'_id': u'en.wikipedia.org/wiki/Dru_Drury',
'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens',
'span': (20, 29),
'text': u'It was described by Dru Drury in 1770.'},
{'_id': u'en.wikipedia.org/wiki/Dru_Drury',
'source': 'en.wikipedia.org/wiki/Plinthocoelium_virens2',
'span': (20, 29, 2),
'text': u'It was described by Dru Drury in 1770.2'}])
print rdd.map(lambda x: (x["_id"], (x["span"], x["text"]))).groupByKey()
.map(lambda x: (x[0], list(x[1]))).collect()
[(u'en.wikipedia.org/wiki/Dru_Drury', [((20, 29(, u'It 被描述 由德鲁·德鲁里在1770年描述,((20,29,2(,u'它是由德鲁·德鲁里描述的 in 1770.2'(](, (u'en.wikipedia.org/wiki/Cerambycidae', [((61, 73(, u'"Plinthocoelium virens"是甲虫科的一种甲虫 Cerambycidae。