在 apache beam pipeline 中使用 MatchFiles() 获取文件名并在 python 中解析 j



我在一个存储桶中有很多json文件,使用python 3,我想获取文件名,然后创建文件的键值对并读取它们。我相信匹配文件现在适用于 python,但我想知道如何实现这一点:

files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json") 
| #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json

目标是假设我在一个存储桶中有 10 个文件:

gs://mybucket/myfile1.json
gs://mybucket/myfile2.json

存储桶中的 json 文件都共享相同的结构

我将其传递到自定义的 ParseFile 类中(我认为通过 ParDo,我的 apache beam 知识是有限的(,对于 json 中的每一行,我都会输出一个字典(我将保存为换行符分隔的 json(,其中一个键是文件名。

编辑 9/24 11:15 am PST:这是我尝试过的

file_content_pairs = (p 
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
| beam.ParDo(TestThis())
)

TestThis(( 只是应该打印内容:

class TestThis(beam.DoFn):
def process(self, element):
print(element)
print("stop")
yield element

但我在输出中看到的只是:INFO:root:在 2.2762866020202637 秒内完成了列出 1 个文件。

我不明白。是否要拥有(filename, json-parsed-contents)的键值对?

如果是这样,您将:

file_content_pairs = (
p | fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)

因此,如果您的文件如下所示:

==============myfile.json===============
{"a": "b",
"c": "d",
"e": 1}

然后,您的file_content_pairs集合将包含键值对("myfile.json", {"a":"b", "c": "d", "e": 1})


如果您的文件是 json 行格式,您将执行以下操作:

def consume_file(f):
other_name = query_bigquery(f.metadata.path)
return [(other_name, json.loads(line))
for line in f.read_utf8().strip().split('n')]
with Pipeline() as p:
result = (p
| fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.FlatMap(consume_file))

最新更新