pyspark / python 3.6 (TypeError: 'int' 对象不可下标) list / tuples



我在pyspark/python中有这段代码:

n --> 是新列表的数目

n = 2
new tuple ( x, y, z )

哪里:

x --> index
y --> list [where variable n is a number of itens of list]
z --> value of front value of list

我正在做这段代码:

RDD_Dados = RDD_one.map(lambda x: (x[0], 
[list(x[i][1]) for i in range(n)],
x[i+1][1])
)

RDD_one的内容是:

(0, [(0, '5'), (1, '1'), (2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(1, [(1, '1'), (2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(2, [(2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(3, [(3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(4, [(4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(5, [(5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(6, [(6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(7, [(7, '1'), (8, '2'), (9, '0')])
(8, [(8, '2'), (9, '0')])
(9, [(9, '0')])

我想得到每个二重奏的每个 secound 项目,并像这样制作一个新列表:

(0, ['5','1'], '2')
(1, ['1','2'], '4')
(2, ['2','4'], '4')
(3, ['4','4'], '3.5')
(4, ['4','3.5'], '-2')
(5, ['3.5','-2'], '1')
(6, ['-2','1'], '2')
(7, ['1','2'], '0')

但不起作用。

我一直在收到此错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-12-3532f6a9c36a> in <module>(
59 RDD_Dados = RDD_one.map(lambda x: (x[0],[list(x[i][1]) for i in range(n)],x[i+1][1]))
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 229, in main
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 224, in process
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-3532f6a9c36a>", line 59, in <lambda>
File "<ipython-input-12-3532f6a9c36a>", line 59, in <listcomp>
TypeError: 'int' object is not subscriptable

更改

RDD_Dados = RDD_one.map(lambda x: (x[0], 
[list(x[i][1]) for i in range(n)],
x[i+1][1]))

RDD_Dados = RDD_one.map(lambda x: (x[0], 
[x[i][1] for i in range(1,n+1)],
x[n][1]))

range0开始,所以你在第一次迭代中x[0][1]。但是,x[0]是一个整数,因此您的问题。此外,您不需要用list包裹x[i][1]否则您会得到例如(0, [ ['5'], ['1'] ], '2')而不是(0, ['5','1'], '2')


编辑:

对于更通用和灵活的方法,您可以创建一个自定义函数,您可以在其中进一步扩展逻辑。
例如:

def my_logic(x):
ret = [x[0], []]
for i in range(1, n):
try:
ret[1].append(x[1][i][1])
except IndexError:
ret[1].append(0)
break
if len(x[1]) > n:
ret.append(x[1][n][1])
else:
ret.append(0)
return ret

然后就做

RDD_Dados = RDD_one.map(my_logic)

我这样做解决了这个问题:

首先,我使用过滤器制作一个 RDD 来删除不必要的数据,并按照参数 N 的数量。

RDDFilter = DadosAgrupadosPorChaveOrdenados.filter(lambda x: n < len(x[1]))

之后,我做另一个RDD挂载数据,就像我需要的那样:

DadosFinal = RDDFilter.map(lambda x: (x[0], 
[x[1][i][1] for i in range(0,int(n))], 
x[1][int(n)][1]))

非常感谢您的任何帮助。

最新更新