我在pyspark/python中有这段代码:
n --> 是新列表的数目
n = 2
new tuple ( x, y, z )
哪里:
x --> index
y --> list [where variable n is a number of itens of list]
z --> value of front value of list
我正在做这段代码:
RDD_Dados = RDD_one.map(lambda x: (x[0],
[list(x[i][1]) for i in range(n)],
x[i+1][1])
)
RDD_one的内容是:
(0, [(0, '5'), (1, '1'), (2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(1, [(1, '1'), (2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(2, [(2, '2'), (3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(3, [(3, '4'), (4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(4, [(4, '4'), (5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(5, [(5, '3.5'), (6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(6, [(6, '-2'), (7, '1'), (8, '2'), (9, '0')])
(7, [(7, '1'), (8, '2'), (9, '0')])
(8, [(8, '2'), (9, '0')])
(9, [(9, '0')])
我想得到每个二重奏的每个 secound 项目,并像这样制作一个新列表:
(0, ['5','1'], '2')
(1, ['1','2'], '4')
(2, ['2','4'], '4')
(3, ['4','4'], '3.5')
(4, ['4','3.5'], '-2')
(5, ['3.5','-2'], '1')
(6, ['-2','1'], '2')
(7, ['1','2'], '0')
但不起作用。
我一直在收到此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-12-3532f6a9c36a> in <module>(
59 RDD_Dados = RDD_one.map(lambda x: (x[0],[list(x[i][1]) for i in range(n)],x[i+1][1]))
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 229, in main
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 224, in process
File "C:Sparkspark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-12-3532f6a9c36a>", line 59, in <lambda>
File "<ipython-input-12-3532f6a9c36a>", line 59, in <listcomp>
TypeError: 'int' object is not subscriptable
更改
RDD_Dados = RDD_one.map(lambda x: (x[0],
[list(x[i][1]) for i in range(n)],
x[i+1][1]))
为
RDD_Dados = RDD_one.map(lambda x: (x[0],
[x[i][1] for i in range(1,n+1)],
x[n][1]))
range
从0
开始,所以你在第一次迭代中x[0][1]
。但是,x[0]
是一个整数,因此您的问题。此外,您不需要用list
包裹x[i][1]
否则您会得到例如(0, [ ['5'], ['1'] ], '2')
而不是(0, ['5','1'], '2')
编辑:
对于更通用和灵活的方法,您可以创建一个自定义函数,您可以在其中进一步扩展逻辑。
例如:
def my_logic(x):
ret = [x[0], []]
for i in range(1, n):
try:
ret[1].append(x[1][i][1])
except IndexError:
ret[1].append(0)
break
if len(x[1]) > n:
ret.append(x[1][n][1])
else:
ret.append(0)
return ret
然后就做
RDD_Dados = RDD_one.map(my_logic)
我这样做解决了这个问题:
首先,我使用过滤器制作一个 RDD 来删除不必要的数据,并按照参数 N 的数量。
RDDFilter = DadosAgrupadosPorChaveOrdenados.filter(lambda x: n < len(x[1]))
之后,我做另一个RDD挂载数据,就像我需要的那样:
DadosFinal = RDDFilter.map(lambda x: (x[0],
[x[1][i][1] for i in range(0,int(n))],
x[1][int(n)][1]))
非常感谢您的任何帮助。