我可以在多个键(连接条件)上连接数据流(Apache Beam)中的两个表吗?



我有两个表格,如下所示:

ID Name  Age
1  Alex  20
2  Sarah 21 and so on
.....................
ID Name  Marks
1  Alex  80
2  Sarah 78 and so on
.....................

我想在多个键(连接条件(上使用云数据流(Apache Beam(连接这两个表,即 ID 和 Name 都是公共列。我该怎么做?

我尝试使用一个键(一个公共列(加入它,但我不知道如何使用多个键

我使用此代码作为参考:

https://github.com/GoogleCloudPlatform/professional-services/blob/master/examples/dataflow-python-examples/dataflow_python_examples/data_lake_to_mart.py

class JoinTables:
    def add_key_details(self, row, key_details):
        result = row.copy()
        try:
            result.update(key_details[row['name']])
        except KeyError as err:
            traceback.print_exc()
            logging.error("Name Not Found error: %s", err)
        return result
def run(argv=None):
    jointables = JoinTables()
    table1= (p 
        | 'Read table1 details from BigQuery ' >> beam.io.Read(
             beam.io.BigQuerySource(
                  query='SELECT * FROM `dataset.table1`',
                  use_standard_sql=True
             )
        )
        | 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
    )
    table2 = (p 
        | 'Read table2 details from BigQuery ' >> beam.io.Read(
            beam.io.BigQuerySource(
                query='SELECT * FROM `dataset.table2`',
                use_standard_sql=True
            )
        )
        | 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
    )

TLDR : 您需要使用元组键(ID, name)映射table1,然后使用这两个值访问行。

# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))
# Access using tuple
result.update(key_details[(row['id'], row['name'])])

解释:

在这里加入基本上是:

  1. 将表1转换为KV对,其中K是字段,V是行
beam.Map(lambda row: (row['name'], row))
  1. 将 table1 作为端输入作为字典传递
beam.Map(jointables.add_key_details, AsDict(table1))
    对于表2
  1. 的每一行,使用相同的键获取表1等效项并更新表2行
result.update(key_details[row['name']])
  1. 返回包含新字段的新行。

因此,在这里,您在步骤 1 和 3 中使用的字段是"name"。如果要使用其他内容,只需调用名称以外的其他名称(例如:row['id'](。获取多个字段的诀窍是使用元组作为键。这样,只需在(row['id'], row['name'])上映射行,并在add_key_details中使用它即可访问正确的table1行。

希望这有帮助!

是的,你可以这样做。一旦你有两个来自两个 BigQuery 源的 PCollect,你需要做一些预处理,使用 CoGroupByKey,然后按照 Rafael 的回答取消嵌套记录。如果需要在多个位置执行类似的联接,还可以使用复合转换来抽象所有这些联接,并仅传递要联接的列名 PCollections。这在本博客中有很好的描述。

最新更新