为什么这个Apache Beam管道读取Excel文件并从中创建.csv不工作?



我是Apache Beam的新手,我正在经历这个简单任务的以下问题:我正在尝试创建一个新的.csv.xlsxExcel文件。为此,我使用Apache Beam与Python 3语言和Pandas图书馆。我承认这些话题对我来说都很新鲜。

我正在Google Colab工作,但我认为这没有信息那么重要。

我以这种方式导入Apache Beam和Pandas。是唯一的方式给shell命令谷歌Colab):

!{'pip install --quiet apache-beam pandas'}
这是我实现Apache Bean管道的Python代码:
import apache_beam as beam
import pandas as pd
def parse_excel(line):
# Use the pandas library to parse the line into a DataFrame
df = pd.read_excel(line)
print("DATAFRAME")
# Convert the DataFrame to a list of dictionaries, where each dictionary represents a row in the DataFrame
# and has keys that are the column names and values that are the cell values
return [row.to_dict() for _, row in df.iterrows()]
def print_json(json_object):
# Print the JSON object
print(json_object)
def run(argv=None):
print("START run()")
p = beam.Pipeline()
# Read the Excel file as a PCollection
lines = (
p 
| 'Read the Excel file' >> beam.io.ReadFromText('Pazienti_export_reduced.xlsx')
| "Convert to pandas DataFrame" >> beam.Map(lambda x: pd.DataFrame(x))
| "Write to CSV" >> beam.io.WriteToText(
'data/csvOutput', file_name_suffix=".csv", header=True
)
)

print("after lines pipeline")
# Parse the lines using the pandas library
#json_objects = lines | 'ParseExcel' >> beam.Map(parse_excel)
# Print the values of the json_objects PCollection
#json_objects | 'PrintJSON' >> beam.ParDo(print_json)

if __name__ == '__main__':
print("START main()")
print(beam.__version__)
print(pd.__version__)
run()

当我运行它时,我没有得到错误,但我的数据文件夹还是空的。基本上,预期的csvOutput.csv在我的管道末尾没有创建输出文件。

怎么了?我错过了什么?我如何尝试修复我的代码?

您正在定义管道,但没有运行它。您需要执行

with beam.Pipeline(...) as p:
...
# p.run() called on __exit__

p = beam.Pipeline(...)
...
p.run().wait_until_finish()

注意beam.io.ReadFromText不能处理xlsx文件,同样WriteToText也不能接受PCollection of Pandas dataframe作为输入。(如果Python是强类型的就更清楚了。)相反,您可能想要做的是像

这样的东西
with beam.Pipeline(...) as p:
filenames = p | beam.Create(['Pazienti_export_reduced.xlsx', ...])
rows_as_dicts = filenames | beam.Map(filenames, parse_excel)
csv_lines = csv_lines | beam.Map(
lambda row: ','.join(str(row[c]) for c in [COL_NAMES]))
csv_lines | beam.WriteToText('out.csv', header=...)

更简单的是使用beam dataframes API

from apache_beam.dataframe.io import read_excel
with beam.Pipeline(...) as p:
data = p | read_excel("/path/to/*.xlsx")
data.to_csv("out.csv")

相关内容

  • 没有找到相关文章

最新更新