我正在使用python ibm_db2模块从db2 LUW读取大约1500万个数据。为了避免内存问题,我会一次又一次地读取每一百万的数据。这里的问题是完成一个读取一百万数据的循环,需要4分钟。如何使用多处理来避免延迟。Blow是我的密码。
start =0
count = 15000000
check_point = 1000000
chunk_size = 100000
connection = get_db_conn_cur(secrets_db2)
for i in range(start, count, check_point):
query_str = "SELECT * FROM (SELECT ROW_NUMBER() OVER (ORDER BY a.timestamp) row_num, * from table a) where row_num between " + str( i + 1) + " and " + str(i + check_point) + ""
number_of_batches = check_point // chunk_size
last_chunk = check_point - (number_of_batches * chunk_size)
counter = 0
cur = connection.cursor()
cur.execute(query_str) s
chunk_size_l = chunk_size
while True:
counter = counter + 1
columns = [desc[0] for desc in cur.description]
print('counter', counter)
if counter > number_of_batches:
chunk_size_l = last_chunk
results = cur.fetchmany(chunk_size_l)
if not results:
break
df = pd.DataFrame(results)
#further processing
这里的问题不是多处理。是您读取数据的方法。据我所见,您使用ROW_NUMBER((只是对行进行编号,然后在每个循环中获取100万行。
由于您没有在table a
上使用任何WHERE条件,这将导致您正在运行的每个循环都有一个FULL TABLE SCAN。你在Db2服务器上浪费了太多的CPU和I/O,只是为了有一个行号,这就是为什么每个循环需要4分钟或更长时间。
此外,这远不是一种有效的获取数据的方法。只要程序中的数据发生变化,就可以进行重复读取或幻影读取。但这是另一个主题。
您应该使用这里描述的4种获取方法之一,从SQL CURSOR中逐行读取。这样你就可以使用非常少量的RAM,并且可以有效地读取数据:
sql = "SELECT * FROM a ORDER BY a.timestamp"
stmt = ibm_db.exec_immediate(conn, sql)
dictionary = ibm_db.fetch_both(stmt)
while dictionary != False:
dictionary = ibm_db.fetch_both(stmt)