读取公共http csv数据到Apache Beam



我试图使用apache_beam.dataframe.io.read_csv函数来阅读在线源,但没有成功。如果文件托管在谷歌存储'gs://bucket/source.csv'上,但无法从'https://github.com/../source.csv'获取文件,如源..

from apache_beam.dataframe.io import read_csv
url  = 'https://github.com/datablist/sample-csv-files/raw/main/files/people/people-100.csv'
with beam.Pipeline() as pipeline:
original_collection = pipeline | read_csv(path=url)
original_collection = original_collection[:5]
original_collection | beam.Map(print)

给我

ValueError: Unable to get filesystem from specified path, please use the correct path or ensure the required dependency is installed, e.g., pip install apache-beam[gcp]. Path specified: https://github.com/datablist/sample-csv-files/raw/main/files/people/people-100.csv
有谁能给我点提示吗?

Beam只能从文件系统(如gcs, hdfs等)读取文件,而不能从任意url(难以并行读取)读取文件。本地文件在直接运行程序上也可以工作。

或者,您也可以这样做

def parse_csv(contents):
[use pandas, the csv module, etc. to parse the contents string into rows]
with beam.Pipeline() as pipeline:
urls = pipeline | beam.Create(['https://github.com/datablist/sample-csv-files/...'])
contents = urls | beam.Map(lambda url: urllib.request.urlopen(url).read())
rows = contents | beam.FlatMap(parse_csv)

可能更容易将文件保存到合适的文件系统中并读取…

我认为在Beam上加载外部文件是不可能的。

你可以考虑另一个进程或服务,而不是Beam,将你的外部文件复制到Cloud Storage桶(例如gsutil cp)。

然后在Dataflow作业中,您可以从GCS读取文件而不会出现问题。

最新更新