我想通过键在rdd中的一些行分组,以便我可以使用一个组中的行执行更多高级操作。请注意,我不想仅计算一些汇总值。行是键值对,其中键是一个GUID,值是一个复杂的对象。
根据Pyspark文档,我首先尝试与CombineByKey实现此功能,因为它比GroupByKey更具性能。开始时的列表仅用于插图,而不是我的真实数据:
l = list(range(1000))
numbers = sc.parallelize(l)
rdd = numbers.map(lambda x: (x % 5, x))
def f_init(initial_value):
return [initial_value]
def f_merge(current_merged, new_value):
if current_merged is None:
current_merged = []
return current_merged.append(new_value)
def f_combine(merged1, merged2):
if merged1 is None:
merged1 = []
if merged2 is None:
merged2 = []
return merged1 + merged2
combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine)
c = combined_by_key.collectAsMap()
i = 0
for k, v in c.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
输出的是:
0 0 0
1 1 0
2 2 0
3 3 0
4 4 0
这不是我所期望的。相同的逻辑但使用GroupByKey实施,返回正确的输出:
grouped_by_key = rdd.groupByKey()
d = grouped_by_key.collectAsMap()
i = 0
for k, v in d.items():
if v is None:
print(i, k, 'value is None.')
else:
print(i, k, len(v))
i += 1
返回:
0 0 200
1 1 200
2 2 200
3 3 200
4 4 200
因此,除非我缺少某些内容,否则当GroupByKey优先于Redsbykey或CombineByKey(相关讨论的主题:GroupByKey曾经比ReeddByKey更喜欢的话题)。
首选理解基本API的情况。特别是如果您检查list.append
DOCSTRING:
?list.append
## Docstring: L.append(object) -> None -- append object to end
## Type: method_descriptor
您会看到,就像python api中的其他突变方法一样,约定不会返回修改的对象。这意味着f_merge
总是返回None
,并且没有任何积累。
对于大多数问题所说的问题都比groupByKey
更有效的解决方案,但是用combineByKey
(或aggregateByKey
)重写它不是其中之一。