我有一个用例,我需要将一个file_path(作为新元素(附加到另一个集合中,该集合是元素字典。
所以我决定将pcollection作为下一个转换的输入,并将 file_path(由 pardo 函数生成(作为侧输入。
由于生成的file_path是一个pCollection,所以我将其用作:
beam.pvalue.AsSingelton(file_path(
但奇怪的是,它没有执行此语句,因此没有输出,甚至没有任何错误。 但是当我使用以下命令创建集合时:
file_path = 光束。创建(['some_path_value'](并按上述方式传递beam.pvalue.AsSingleton(file_path(它完美地工作。
但我不想显式创建一个集合,我需要使用上面的转换输出,因为它是侧输入。
我尝试了每种形式的"file_path",例如将其设置为列表,字典,元组。
我也尝试了不同的光束功能。
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield gcs_url
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path)))
期望:它应该进入收集函数并打印任何提取方法将作为file_path返回。
没有错误和警告,但它甚至没有进入收集方法。它主要发生在数据不会来自上述转换时。但我检查了很多次file_name有数据。
但没有输出,它只在我使用 beam 时显示输出。创建(["pass_some_file_path"]( 并将其结果用作侧输入
转换的输出应采用 2 元组(键,值((格式,以便接受它作为带有 pvalue 的侧输入。AsDict((。它与价值配合得很好。AsList(( 也是。
def collect(pcoll, path):
print(pcoll, "-----------")
print(path,"-------------")
return path
class Extract(beam.DoFn):
def process(self, element, *args, **kwargs):
pub_sub_json= json.loads(element)
gcs_url = "gs://" + pub_sub_json["bucket"] + "/" + pub_sub_json["name"]
yield pvalue.TaggedOutput("url",("url" :gcs_url))
file_path = (p |"Read" >> beam.io.ReadFromPubSub(
subscription=options.session_subscription,
with_attributes=False)
| "Extract" >> beam.ParDo(Extract()))
file_path_new= (file_path["url"] | "Group by key" >> beam.GroupByKey())
result = (some_other_pcollection |"display" >> beam.FlatMap(collect, path=beam.pvalue.AsList(file_path_new)))
你能用更好的方式解决它吗?