我在 mysql 数据库表中有多达 500k 行数据。我必须使用一些查询来处理该数据,并将生成的查询数据插入到 5 个不同的表中。
我的代码片段如下所示:
def jobsFunction(values):
unique_values = []
ref_value = {}
for value in values:
if value not in unique_values:
unique_values.append(value[0])
# some select queries with other tables
# from the result insert into table1
for query_vals in select_query:
ref_val[id] = some_val
# Insert into table2 with query_vals
# Update table3 with query_vals
# insert into table4 for each iteration with some process
# insert into table5 based on ref_val[id]
if __name__ == '__main__':
query = "SELECT roll_no, user_id, tenant_item_id FROM table_name"
cursor.execute(query)
vals = cursor.fetchall()
values = list(vals)
jobFunction(values)
问题是完成整个过程需要12个多小时。 所以我决定用multiprocessing.Pool
代码完成这个过程,如下所示:
import multiprocessing as mp
def jobsFunction(values):
# jobs function code
if __name__ == '__main__':
# values fetching
lock = mp.Lock()
p = mp.Pool()
p.map(jobsFunction, values)
p.close()
p.join()
但在这种情况下,从主函数到jobsFunction
的数据流不在顺序中。
我的问题是:我是否使用了正确的方法来满足我的需求 如何使用多处理或多线程有效地满足我的需求?
从数据库中获取数据然后重新写入很慢。尽量避免它。一些数字:如果每个查询只需要 100 毫秒,那么仅执行它们就需要超过 13 小时。
考虑使用此设计:不要将所有数据传输到 Python 进行处理,而是使用一系列或 SQL 查询在数据库中执行所有操作。因此,与其将数据读入 Python 列表,不如使用 SQL 查询,例如
insert into table1 (...)
select ... from table_name
或
update table1 out
set out.col1 = source.col2,
out.col2 = source.col3 ...
from table_name source
where out.pk = source.pk
and ...
数据库经过优化,可以复制数据。这些查询的运行速度将非常快,尤其是在正确设置索引时。
请考虑使用帮助程序表使查询更简单或更高效,因为您可以创建查询、截断查询、用数据填充查询,然后为案例创建完美的索引。
只在 Python 中做真正复杂的事情,并确保它只处理几行。