在数据流python中连接两个csv数据



我有两个csv文件-我想在数据流中进行完全连接

我作为PCollection 读取的两个csv文件

csv1

A | B | C | D | E

csv2

A | B | C | F | G

我需要加入基于密钥A,B的两个p集合,并获得如下所示的结果p集合

A | B | C | D | E | F | G

试验1

{'left': P_collection_1, 'right': P_collection_2}
| ' Combine' >> beam.CoGroupByKey()
| ' ExtractValues' >> beam.Values()

这基本上就像sql中的完全联接

我相信您确实可以使用CoGroupBykey:

将Apache Beam编程指南中的电话和电子邮件示例应用到您的案例中,您可以尝试向CoGroupByKey提供一个"C、D、E"的集合,用"a、B"键控,以及一个"F、G"的集合(也用"a和B"键键控(。

为了更清楚一点,每个PCollection中的元素必须是元组,它们的第一个元素是"a,B"键,第二个是"C,D,E"或"F,G"值:

PColl1 = PCollection(
('2,4', '1,2,5'),
('1,10', '4,4,9'),
...) # this is the PCollection of CDE's
PColl2 = PCollection(
('2,4', '30,3'),
('20,1', '2,1'),
...) # this is the PCollection of FG's

(PCollection符号仅用于说明(

然后我们会申请:

join = {'CDE': PColl1, 'FG': Pcoll2} | beam.CoGroupByKey()

根据编程指南,结果应为:

PCollection(
('2,4', {
'CDE': ['1,2,5'],
'FG': ['30,3']
}
),
('1,10', {
'CDE': ['4,4,9']
}
),
('20,1', {
'FG': ['2,1']
}
),
...)

如果A和B在同一个文件中多次取值2,4,这应该不是问题,我们应该在CDE或FG中有几个值。

最新更新