如何在 python 中将动态集合作为侧输入传递?



我有一个用例,我需要将一个file_path(作为新元素(附加到另一个集合中,该集合是元素字典。

所以我决定将pcollection作为下一个转换的输入,并将 file_path(由 pardo 函数生成(作为侧输入

由于生成的file_path是一个pCollection,所以我将其用作:

beam.pvalue.AsSingelton(file_path(

但奇怪的是,它没有执行此语句,因此没有输出,甚至没有任何错误。 但是当我使用以下命令创建集合时:

file_path = 光束。创建(['some_path_value'](并按上述方式传递beam.pvalue.AsSingleton(file_path(它完美地工作。

但我不想显式创建一个集合,我需要使用上面的转换输出,因为它是侧输入。

我尝试了每种形式的"file_path",例如将其设置为列表,字典,元组。

我也尝试了不同的光束功能。

def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")       
return path
class Extract(beam.DoFn):    
def process(self, element, *args, **kwargs):       
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield gcs_url
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))                     
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))

期望:它应该进入收集函数并打印任何提取方法将作为file_path返回。

没有错误和警告,但它甚至没有进入收集方法。它主要发生在数据不会来自上述转换时。但我检查了很多次file_name有数据。

但没有输出,它只在我使用 beam 时显示输出。创建(["pass_some_file_path"]( 并将其结果用作侧输入

转换的输出应采用 2 元组(键,值((格式,以便接受它作为带有 pvalue 的侧输入。AsDict((。它与价值配合得很好。AsList(( 也是。

def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")       
return path
class Extract(beam.DoFn):    
def process(self, element, *args, **kwargs):       
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield pvalue.TaggedOutput("url",("url" :gcs_url))
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract())) 
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())                    
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))

你能用更好的方式解决它吗?

最新更新