我正在尝试使用侧面输入在 Beam 上加入。
连接工作(使用侧输入(并更新公共键的基本员工详细信息。 在我的自定义要求中,我还想在联接后将新员工 ID 附加到现有的(更新的数据集(中。
两种 PCollections 具有相同的布局,emp_id是通用键。 我的工作在云数据流上运行。
Apache Beam 过程函数否则无法按预期工作,我正在尝试附加新的 empid ,新输入的一部分。此处的键在此输入集合中是不同的。
工艺功能:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
try:
result.update(new_emp_details[row['emp_id']])
except KeyError as err:
pass
else:
result.update(new_emp_details[row['emp_id']])
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result
调用此函数:
history_data = (
p
| 'Read historical data from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=emp_hist_data, use_standard_sql=True))
|'Join Data with sideInput' >> beam.Map(datalakecomparison.process_history_details, AsDict(*new_emp_data*))
new_emp_data字典的生成如下所述:
new_emp_data = (
p
| 'Read base from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query= new_emp_query, use_standard_sql=True))
|
'New Employee details' >> beam.Map(
lambda x:(
x['emp_id'], x))
)
使用belwo 查询拉取数据,然后使用 lambda 函数传递数据(如上所述(。
def new_emp_query(self):
new_emp_query = """
SELECT
emp_id,
emp_name,
emp_code,
emp_unit,
emp_sal
FROM snow.new_emp_data
"""
return new_emp_query
电流输入 :
史料
emp_id,emp_name,emp_code,emp_unit,emp_sal
1,A,34,45,70000
2,B,35,45,80000
3,C,34,45,90000
新员工数据
emp_id,emp_name,emp_code,emp_unit,emp_sal
1,A,34,45,1000000
6,F,36,47,90000
电流输出:
emp_id,emp_name,emp_code,emp_unit,emp_sal
1,A,34,45,1000000
2,B,35,45,80000
3,C,34,45,90000
预期输出 :
emp_id,emp_name,emp_code,emp_unit,emp_sal
1,A,34,45,1000000
2,B,35,45,80000
3,C,34,45,90000
6,F,36,47,90000
Emp_id 6是我希望附加到结果的内容
我相信问题出在您的process_history_details
函数中。 当您尝试追加6,F,36,47,90000
时,您会收到一个KeyError,except子句通过并且您的else语句不会触发。
您可以使用如下所示的简单测试函数来检查该行为:
def lookup(x):
a = {'Rick':1}
try:
print(a[x])
except KeyError as err:
print('pass')
pass
else:
print(a[x])
lookup('Morty')
输出:
通过
您可以:
- 摆脱else语句:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
try:
result.update(new_emp_details[row['emp_id']])
except KeyError as err:
result.update(new_emp_details[row['emp_id']])
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result
- 或者使用if语句:
def process_history_details(self, row, new_emp_details):
import traceback
result = row.copy()
if row['emp_id'] in new_emp_details.keys():
result.update(new_emp_details[row['emp_id']])
else:
result.update(new_emp_details[row['emp_id']])
for k in new_emp_details[k['emp_id']]:
if k not in result['emp_id']:
result.update(new_emp_details)
return result