不一致的结果是Pyspark CombineByKey(与GroupByKey相对)



我想通过键在rdd中的一些行分组,以便我可以使用一个组中的行执行更多高级操作。请注意,我不想仅计算一些汇总值。行是键值对,其中键是一个GUID,值是一个复杂的对象。

根据Pyspark文档,我首先尝试与CombineByKey实现此功能,因为它比GroupByKey更具性能。开始时的列表仅用于插图,而不是我的真实数据:

l = list(range(1000))
numbers = sc.parallelize(l)
rdd = numbers.map(lambda x: (x % 5, x))
def f_init(initial_value):
    return [initial_value]
def f_merge(current_merged, new_value):
    if current_merged is None:
        current_merged = []
    return current_merged.append(new_value)
def f_combine(merged1, merged2):
    if merged1 is None:
        merged1 = []
    if merged2 is None:
        merged2 = []
    return merged1 + merged2
combined_by_key = rdd.combineByKey(f_init, f_merge, f_combine)
c = combined_by_key.collectAsMap()
i = 0
for k, v in c.items():
    if v is None:
        print(i, k, 'value is None.')
    else:
        print(i, k, len(v))
    i += 1

输出的是:

0 0 0
1 1 0
2 2 0
3 3 0
4 4 0

这不是我所期望的。相同的逻辑但使用GroupByKey实施,返回正确的输出:

grouped_by_key = rdd.groupByKey()
d = grouped_by_key.collectAsMap()
i = 0
for k, v in d.items():
    if v is None:
        print(i, k, 'value is None.')
    else:
        print(i, k, len(v))
    i += 1

返回:

0 0 200
1 1 200
2 2 200
3 3 200
4 4 200

因此,除非我缺少某些内容,否则当GroupByKey优先于Redsbykey或CombineByKey(相关讨论的主题:GroupByKey曾经比ReeddByKey更喜欢的话题)。

首选理解基本API的情况。特别是如果您检查list.append DOCSTRING:

?list.append
## Docstring: L.append(object) -> None -- append object to end
## Type:      method_descriptor

您会看到,就像python api中的其他突变方法一样,约定不会返回修改的对象。这意味着f_merge总是返回None,并且没有任何积累。

对于大多数问题所说的问题都比groupByKey更有效的解决方案,但是用combineByKey(或aggregateByKey)重写它不是其中之一。

最新更新