如何加速Python CSV读到MySQL写



我有一个5GB的CSV的IP地址,我需要解析到MySQL数据库。

当前从CSV中读取行并插入到MySQL中。它工作得很好,但是我想让它更快。

我可以让阅读和写作并行吗?或者丢弃csv并从进程中生成读取&写每个分裂的csv?

import csv
from csv import reader
from csv import writer
import mysql.connector
cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()
i = 1
with open('iplist.csv', 'r') as read_obj:
csv_reader = reader(read_obj)
for row in csv_reader:
query = """INSERT INTO ips (ip_start,ip_end,continent) VALUES ('%s','%s','%s')""" % (row[0],row[1],row[2])
print (query)
cursor.execute(query)
cursor.execute('COMMIT')
print(i)
i = i + 1
cnx.close()

任何帮助都是感激的。

使用cursor.executemany来提高速度:

# Tested with:
# docker run --rm -e MYSQL_ALLOW_EMPTY_PASSWORD=y -p 3306:3306 mysql
#
# CREATE DATABASE ips;
# USE ips;
# CREATE TABLE ips (id INT PRIMARY KEY NOT NULL AUTO_INCREMENT, ip_start VARCHAR(15), ip_end VARCHAR(15), continent VARCHAR(20));
import mysql.connector
import csv
import itertools
CHUNKSIZE = 1000  # Number of lines
cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()
with open('iplist.csv', 'r') as csvfile:
reader = csv.reader(csvfile)
while True:
records = list(itertools.islice(reader, CHUNKSIZE))
if not records:
break
query = """INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)"""
cursor.executemany(query, records)
cursor.execute('COMMIT')

我创建了一个伪随机CSV文件,其中每一行的样式为&;111.222.333.444,555.666.777.888,a continent&;。该文件包含3300万行。下面的代码能够在3分钟内将所有行插入到MySQL数据库表中:

import mysql.connector
import time
import concurrent.futures
import csv
import itertools
CSVFILE='/Users/Andy/iplist.csv'
CHUNK=10_000

def doBulkInsert(rows):
with mysql.connector.connect(user='andy', password='monster', host='localhost', database='andy') as connection:
connection.cursor().executemany(f'INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)', rows)
connection.commit()

def main():
_s = time.perf_counter()
with open(CSVFILE) as csvfile:
csvdata = csv.reader(csvfile)
_s = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
while (data := list(itertools.islice(csvdata, CHUNK))):
executor.submit(doBulkInsert, data)
executor.shutdown(wait=True)
print(f'Duration = {time.perf_counter()-_s}')    
if __name__ == '__main__':
main()

我的建议是将你的列表分块。将其分解为5,000(或类似的)块,然后遍历这些块。这将减少查询的数量。查询量似乎是你最大的瓶颈。

https://medium.com/code-85/two-simple-algorithms-for-chunking-a-list-in-python-dc46bc9cc1a2

最新更新