我已经被困了几天了。所以我的问题是,我使用apache beam和数据流运行器创建数据管道。我在脚本中使用一个全局变量(一个字典)来访问一些函数。问题是,当我在本地运行它时,估计有200.000行数据,它在本地和数据流中都成功了。但是当我在数据流中运行它时,使用包含6,000.000行的数据集,字典变成空的。下面是我的代码:
函数:
global pre_compute
pre_compute = {} # {(transnumber,seq):[dordertxt, dorderupref], (transnumber,seq):[dordertxt, dorderupref]}
def compute_all_upref_and_ordertxt(data):
'''
Compute all dorder_txt and dorder_upref
'''
trans_number = data.get("transaction_number")
seq = data.get("stdetail_seq")
# get and remove ordertxt and upref from data
ordertxt = data.pop("dorder_ordertxt","")
upref = data.pop("dorder_upref","")
global pre_compute
if pre_compute.get((trans_number,seq), None) == None:
pre_compute[(trans_number, seq)] = [ordertxt, upref]
else:
if ordertxt:
pre_compute[(trans_number, seq)][0] = ordertxt
if upref:
pre_compute[(trans_number, seq)][1] = upref
return data # -> data with no upref and ordertxt
def evaluate_and_inject_upref_ordertxt(data):
# Using json.loads() faster 4-6x than eval()
data = data.strip("n")
data = data.replace("'", '"')
data = data.replace("None", "null")
data = json.loads(data) # str to dict
trans_number = data.get('transaction_number')
seq = data.get('stdetail_seq')
global pre_compute
ordertxt, upref = pre_compute[(trans_number, seq)]
data['dorder_ordertxt'] = ordertxt
data['dorder_upref'] = upref
return data
管道代码:
left_join_std_dtdo = (join_stddtdo_dict | 'Left Join STD DTable DOrder' >> Join(left_pcol_name=stdbsap_dimm_data, left_pcol=left_join_std_bsap,
right_pcol_name=dtdo_data, right_pcol=left_join_dtdo,
join_type='left', join_keys=join_keys)
| 'UPDATE PRICE FOR SCCRM01' >> beam.ParDo(update_price_sccrm01())
| 'REMOVE PRICE from DICTIONARY' >> beam.ParDo(remove_dtdo_price())
| 'PreCompute All Upref and ordertxt based on trans_number and seq' >> beam.Map(compute_all_upref_and_ordertxt)
)
rm_left_std_dtdo = (left_join_std_dtdo | 'CHANGE JOINED STD DTDO INTO STR' >> beam.Map(lambda x: str(x))
| 'DISTINCT STD DTDO' >> beam.Distinct()
| 'EVALUATE AND INJECT AS DICT STD DTDO' >> beam.Map(evaluate_and_inject_upref_ordertxt)
| 'Adjust STD_NET_PRICE WITH DODT_PRICE' >> beam.ParDo(replaceprice())
)
它在本地和数据流中都完美运行,包含200,000行数据。但是当我尝试在数据流中使用6.000.000行数据时,当脚本执行
时ordertxt, upref = pre_compute[(trans_number, seq)]
在数据流中运行时总是给我一个键错误,就像字典是空的一样。有解决方案吗?
你可以尝试使用Beam state API。请注意,状态API不是为存储大量数据而设计的。
另一个选项可能是将数据存储在外部存储系统(例如,GCS)中,以便所有工作人员都可以访问该数据。
请注意,如果您尝试存储大量数据,任何解决方案都可能限制管道的并行化(从而限制性能)。在这种情况下,最好重新设计您的管道,使其真正可并行化。
apache beam是基于运行在分布式基础设施上的假设构建的。节点将独立运行,任何状态都必须在工作人员之间共享。因此,全局变量不可用。如果您确实需要在工作人员之间交换信息,您可能必须自己实现。不过,我还是建议你多考虑一下管道。