在Apache光束(云数据流)上连接(带有侧输入)的处理功能,不会在其他部分中附加新记录



我正在尝试使用侧面输入在 Beam 上加入

连接工作(使用侧输入(并更新公共键的基本员工详细信息。 在我的自定义要求中,我还想在联接后将新员工 ID 附加到现有的(更新的数据集(中。

两种 PCollections 具有相同的布局,emp_id是通用键。 我的工作在云数据流上运行。

Apache Beam 过程函数否则无法按预期工作,我正在尝试附加新的 empid ,新输入的一部分。此处的键在此输入集合中是不同的。

工艺功能:

def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
try:
result.update(new_emp_details[row['emp_id']])
except KeyError as err:
pass
else:
result.update(new_emp_details[row['emp_id']]) 
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result

调用此函数:

history_data = (
p
| 'Read historical data from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=emp_hist_data, use_standard_sql=True))
|'Join Data with sideInput' >> beam.Map(datalakecomparison.process_history_details, AsDict(*new_emp_data*))

new_emp_data字典的生成如下所述:

new_emp_data = (
p
| 'Read base from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query= new_emp_query, use_standard_sql=True))
|
'New Employee details' >> beam.Map(
lambda x:(
x['emp_id'], x))
)

使用belwo 查询拉取数据,然后使用 lambda 函数传递数据(如上所述(。

def new_emp_query(self):
new_emp_query = """
SELECT 
emp_id, 
emp_name,
emp_code,
emp_unit,
emp_sal 
FROM snow.new_emp_data 
"""
return new_emp_query

电流输入 :

史料

emp_id,emp_name,emp_code,emp_unit,emp_sal

1,A,34,45,70000

2,B,35,45,80000

3,C,34,45,90000

新员工数据

emp_id,emp_name,emp_code,emp_unit,emp_sal

1,A,34,45,1000000

6,F,36,47,90000

电流输出:

emp_id,emp_name,emp_code,emp_unit,emp_sal

1,A,34,45,1000000

2,B,35,45,80000

3,C,34,45,90000

预期输出 :

emp_id,emp_name,emp_code,emp_unit,emp_sal

1,A,34,45,1000000

2,B,35,45,80000

3,C,34,45,90000

6,F,36,47,90000

Emp_id 6是我希望附加到结果的内容

我相信问题出在您的process_history_details函数中。 当您尝试追加6,F,36,47,90000时,您会收到一个KeyError,except子句通过并且您的else语句不会触发。

您可以使用如下所示的简单测试函数来检查该行为:

def lookup(x):
a = {'Rick':1}
try:
print(a[x])
except KeyError as err:
print('pass')
pass
else:
print(a[x])
lookup('Morty')

输出:

通过


您可以:

  • 摆脱else语句:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
try:
result.update(new_emp_details[row['emp_id']])
except KeyError as err:
result.update(new_emp_details[row['emp_id']]) 
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result
  • 或者使用if语句:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
if row['emp_id'] in new_emp_details.keys():
result.update(new_emp_details[row['emp_id']])
else:
result.update(new_emp_details[row['emp_id']]) 
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result

最新更新