我正在使用python分析巴西的新冠肺炎数据。联邦政府与该国每次疫苗接种的记录共享一个csv文件。此csv文件的大小超过170GB。
在我的研究中,我需要查询这个csv文件,以获得按city
和day
分组的疫苗接种的quantity
。在sql中,它可能类似于:
select city, day, Count(*)
from my_table
group by city, day
由于在线csv文件太大,我如何从中提取这些信息?
该文件每天更新,因为每天都有新的人接种疫苗。这意味着每天都有新行附加到文件中。
我想每天提取/更新计数器。是否有一种智能/快速的方法来检查csv文件中的新行并更新计数器?
我无法每天下载整个文件并将其导入数据库
数据可在此处获取:https://qsprod.saude.gov.br/extensions/covid-19_html/covid-19_html.html在链路CCD_ 4中S3上的153468857093字节CSV。
此处提供了输入文件示例:https://drive.google.com/file/d/1LRVJMKeE0wzuGshmfsI7pnfpHA800iph/view?usp=sharing
像这样的东西似乎起到了作用(假设您有足够的内存和稳定的互联网连接来处理单个请求中的文件(。
Counter
中的数据最终是例如
Counter({
('BRASILIA', '2021-03-18'): 2,
('SAO PAULO', '2021-03-26'): 1,
('INDAIATUBA', '2021-08-09'): 1,
...
})
当然,您需要删除islice()
来处理100多个第一行。
import pickle
from collections import Counter
from itertools import islice
import requests
import csv
DATA_URL = "https://s3-sa-east-1.amazonaws.com/ckan.saude.gov.br/PNI/vacina/completo/2021-11-15/part-00000-d217d29f-9db0-4280-ad94-ff0afe3d8b11-c000.csv"
resp = requests.get(DATA_URL, stream=True)
resp.raise_for_status()
resp.encoding = "UTF-8"
counter = Counter()
for row in islice(
csv.DictReader(resp.iter_lines(decode_unicode=True), delimiter=";"), 100
):
key = (row.get("estabelecimento_municipio_nome"), row.get("vacina_dataaplicacao"))
counter[key] += 1
with open("data.pickle", "wb") as outf:
pickle.dump(counter, outf)
对于任何更复杂的东西,我真的建议下载该文件并LOAD
将其保存到PostgreSQL表中。