python mysqldb multiple connections



嘿伙计们,我有以下问题:进程1执行一个非常大的查询并将结果写入一个文件,在进程之间应向数据库更新一个状态。

第一次讲:没问题,伪代码:

db = mysqldb.connect()
cursor = db.cursor()
large = cursor.execute(SELECT * FROM VERYLARGETABLE)
for result in large.fetchall():
     file.write(result)
if timetoUpdateStatus: cursor.execute(UPDATE STATUS)

问题:当获得 900 万个结果时,"large = cursor.execute(SELECT * FROM VERYLARGETABLE)"永远不会完成...我在 4 列的 200 万个条目处找到了边界,MySQL 服务器在 30 秒后完成了查询,但 Python 进程运行了几个小时......这可能是Python MySQLDB库中的错误。

所以第二次尝试:带有 db.use_results() 和 fetch_row() 的 db.query 函数:

db = mysqldb.connect()
cursor = db.cursor()
db.query(SELECT * FROM VERYLARGETABLE)
large = large.use_result()
while true:
    for row in large.fetch_row(100000):
        file.write(row)
    if timetoUpdateStatus: cursor.execute(UPDATE STATUS) <-- ERROR (2014, "Commands out of sync; you can't run this command now")

所以第三次尝试使用了 2 个 MySQL 连接......这不起作用,当我打开第二个连接时,第一个连接消失了....

有什么建议吗??

尝试使用 MySQL SSCursor。它将结果集保留在服务器中(MySQL数据结构),而不是将结果集传输到客户端(Python数据结构),这是默认游标所做的。使用 SSCursor 将避免由于默认光标尝试构建 Python 数据结构并为巨大的结果集分配内存而导致的长时间初始延迟。因此,SSCursor 也应该需要更少的内存。

import MySQLdb
import MySQLdb.cursors
import config
cons = [MySQLdb.connect(
    host=config.HOST, user=config.USER,
    passwd=config.PASS, db=config.MYDB,
    cursorclass=MySQLdb.cursors.SSCursor) for i in range(2)]
select_cur, update_cur = [con.cursor() for con in cons]
select_cur.execute(SELECT * FROM VERYLARGETABLE)
for i, row in enumerate(select_cur):
    print(row)
    if i % 100000 == 0 or timetoUpdateStatus:
        update_cur.execute(UPDATE STATUS)
尝试将

"从数据库中选择 *"查询拆分为较小的块

index=0
while True:
    cursor.execute('select * from verylargetable LIMIT %s,%s', (index, index+10000))
    records = cursor.fetchall()
    if len(records)==0:
          break
    file.write(records)
    index+=10000
file.close()

在大选择中使用 LIMIT 语句:

limit = 0
step = 10000
query = "SELECT * FROM VERYLARGETABLE LIMIT %d, %d"
db = mysqldb.connect()
cursor = db.cursor()
while true:
    cursor.execute(query, (step, limit))
    for row in cursor.fetch_all():
        file.write(row)
    if timetoUpdateStatus:
        cursor.execute(update_query)
    limit += step

代码没有经过测试,但你应该明白这个想法。

最新更新