从这个问题出发:如何对数据进行分组并构造一个新的列- python pandas?,我知道如何通过使用pandas
来分组多列并构建一个新的唯一id,但是如果我想在Python中使用Apache beam
来实现该问题中所描述的相同的事情,我如何才能实现它,然后将新数据写入换行分隔的JSON格式文件(每行是一个unique_id
与属于该unique_id的对象数组)?
假设数据集存储在csv文件中。
我是Apache beam的新手,以下是我现在拥有的:
import pandas
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("example.csv", names=cols)
agg_df = df.insert(0, 'unique_id',
df.groupby(['postcode', 'house_number'], sort=False).ngroup())
agg_df.to_csv('test_output')
这给了我一个错误:
NotImplementedError: 'ngroup' is not yet supported (BEAM-9547)
这真的很烦人,我不是很熟悉Apache beam,有人能帮我吗?
(ref: https://beam.apache.org/documentation/dsls/dataframes/overview/)将连续整数赋值给一个集合并不是很适合并行计算的。它也不是很稳定。是否有任何其他标识符(例如元组(postcode, house_number)
或其哈希值不合适?