我有两个表格,如下所示:
ID Name Age
1 Alex 20
2 Sarah 21 and so on
.....................
ID Name Marks
1 Alex 80
2 Sarah 78 and so on
.....................
我想在多个键(连接条件(上使用云数据流(Apache Beam(连接这两个表,即 ID 和 Name 都是公共列。我该怎么做?
我尝试使用一个键(一个公共列(加入它,但我不知道如何使用多个键
我使用此代码作为参考:
https://github.com/GoogleCloudPlatform/professional-services/blob/master/examples/dataflow-python-examples/dataflow_python_examples/data_lake_to_mart.py
class JoinTables:
def add_key_details(self, row, key_details):
result = row.copy()
try:
result.update(key_details[row['name']])
except KeyError as err:
traceback.print_exc()
logging.error("Name Not Found error: %s", err)
return result
def run(argv=None):
jointables = JoinTables()
table1= (p
| 'Read table1 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table1`',
use_standard_sql=True
)
)
| 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
)
table2 = (p
| 'Read table2 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table2`',
use_standard_sql=True
)
)
| 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
)
TLDR : 您需要使用元组键(ID, name)
映射table1
,然后使用这两个值访问行。
# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))
# Access using tuple
result.update(key_details[(row['id'], row['name'])])
解释:
在这里加入基本上是:
- 将表1转换为KV对,其中K是字段,V是行
beam.Map(lambda row: (row['name'], row))
- 将 table1 作为端输入作为字典传递
beam.Map(jointables.add_key_details, AsDict(table1))
- 对于表2
- 的每一行,使用相同的键获取表1等效项并更新表2行
result.update(key_details[row['name']])
- 返回包含新字段的新行。
因此,在这里,您在步骤 1 和 3 中使用的字段是"name"。如果要使用其他内容,只需调用名称以外的其他名称(例如:row['id']
(。获取多个字段的诀窍是使用元组作为键。这样,只需在(row['id'], row['name'])
上映射行,并在add_key_details
中使用它即可访问正确的table1
行。
希望这有帮助!
是的,你可以这样做。一旦你有两个来自两个 BigQuery 源的 PCollect,你需要做一些预处理,使用 CoGroupByKey,然后按照 Rafael 的回答取消嵌套记录。如果需要在多个位置执行类似的联接,还可以使用复合转换来抽象所有这些联接,并仅传递要联接的列名 PCollections。这在本博客中有很好的描述。