我在一个存储桶中有很多json文件,使用python 3,我想获取文件名,然后创建文件的键值对并读取它们。我相信匹配文件现在适用于 python,但我想知道如何实现这一点:
files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json")
| #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json
目标是假设我在一个存储桶中有 10 个文件:
gs://mybucket/myfile1.json
gs://mybucket/myfile2.json
存储桶中的 json 文件都共享相同的结构
我将其传递到自定义的 ParseFile 类中(我认为通过 ParDo,我的 apache beam 知识是有限的(,对于 json 中的每一行,我都会输出一个字典(我将保存为换行符分隔的 json(,其中一个键是文件名。
编辑 9/24 11:15 am PST:这是我尝试过的
file_content_pairs = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
| beam.ParDo(TestThis())
)
TestThis(( 只是应该打印内容:
class TestThis(beam.DoFn):
def process(self, element):
print(element)
print("stop")
yield element
但我在输出中看到的只是:INFO:root:在 2.2762866020202637 秒内完成了列出 1 个文件。
我不明白。是否要拥有(filename, json-parsed-contents)
的键值对?
如果是这样,您将:
file_content_pairs = (
p | fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)
因此,如果您的文件如下所示:
==============myfile.json===============
{"a": "b",
"c": "d",
"e": 1}
然后,您的file_content_pairs
集合将包含键值对("myfile.json", {"a":"b", "c": "d", "e": 1})
。
如果您的文件是 json 行格式,您将执行以下操作:
def consume_file(f):
other_name = query_bigquery(f.metadata.path)
return [(other_name, json.loads(line))
for line in f.read_utf8().strip().split('n')]
with Pipeline() as p:
result = (p
| fileio.MatchFiles("gs://mybucketname/*.json")
| fileio.ReadMatches()
| beam.FlatMap(consume_file))