这是我上一个问题的延续。我有两个文件,file1.csv
和一个名为master_file.csv
的大csv。它们有几个列,并且有一个名为EMP_Code
的公共列名。
文件1示例:
EMP_name | EMP_Code | EMP_dept|
---|---|---|
b | f367 | abc |
a | c264 | xyz |
c | d264 | abc |
我在你的小例子上运行了第二个例子中的代码(使用csv.DictReader(,它成功了。我猜你的问题可能与你提到的master_file的真实规模有关。
问题可能是,尽管使用csv。DictReader在流式传输信息时,您仍然使用Pandas数据帧来聚合所有内容,然后再将其写出来,可能输出会超出您的内存预算。
如果这是真的,那么使用csv。要流出的DictWriter。唯一棘手的一点是设置编写器,因为它需要知道字段名,直到我们读取第一行才能知道,所以我们将在读取循环的第一次迭代中设置编写器。
(我删除了with open(...
上下文,因为我认为它们添加了太多缩进(
df = pd.read_csv(r"file1.csv")
list_codes = list(df.EMP_Code)
f_in = open(r"master_file.csv", newline="")
reader = csv.DictReader(f_in)
f_out = open(r"output.csv", "w", newline="")
init_writer = True
for row in reader:
if init_writer:
writer = csv.DictWriter(f_out, fieldnames=row)
writer.writeheader()
init_writer = False
if row["EMP_Code"] in list_codes:
writer.writerow(row)
f_out.close()
f_in.close()
EMP_name | EMP_age | ||||||
---|---|---|---|---|---|---|---|
a | 30 | 6 | >c264 | xyz | |||
b | 29 | d | 45 | 10 | c264 | >abc |
您只需要将chunksize=<SOME INTEGER>
传递给pandas的.read_csv函数(请参阅此处的文档(
如果您传递一个chunksize=2
,您将把该文件读取为2行的数据帧。或更准确地说,它将把csv的2行读取到一个数据帧中。然后,您可以将您的过滤器应用于该2行数据帧;累积";将其转换为另一个数据帧。下一次迭代将读取接下来的两行,您可以随后对其进行筛选。。。冲洗、漂洗并重复:
import pandas as pd
li = ['c264', 'f367']
result_df = pd.DataFrame()
with pd.read_csv("master_file.csv", chunksize=2) as reader:
for chunk_df in reader:
filtered_df = chunk_df[chunk_df.EMP_Code.isin(li)]
result_df = pd.concat([result_df, filtered_df])
print(result_df)
# Outputs:
# EMP_name EMP_age EMP_Service EMP_Code EMP_dept
# 0 a 30 6 c264 xyz
# 1 b 29 3 f367 abc
# 3 d 45 10 c264 abc
修复这些类型的文件读/写任务的一种方法是使用生成器,以可以处理的块或部分读取所需的数据(内存或etc约束(。
def read_line():
with open('master_file.csv','r') as fid:
while (line:= fid.readline().split()):
yield line
这个简单的生成器在每个调用中提供一条新行。现在,您可以简单地对此进行迭代,以进行您感兴趣的任何过滤,并构建新的数据帧。
r_line = read_line()
for l in r_line:
print(l)
您可以修改生成器,例如解析和返回列表,或多行,等等