当块大小 = 100 时,大(600 万行)熊猫 df 会导致内存错误并显示"to_sql",但可以



我在Pandas中创建了一个大型数据库,大约有600万行文本数据。我想将其另存为 SQL 数据库文件,但是当我尝试保存它时,出现内存不足 RAM 错误。我什至将卡盘尺寸减小到 100,但它仍然崩溃。

但是,如果我只有该数据帧的较小版本,其中包含 100,000 行,并将其保存到未指定 chucksize 的数据库中,则保存数据帧没有问题。

这是我的代码

from sqlalchemy import create_engine
engine = sqlalchemy.create_engine("sqlite:///databasefile.db")
dataframe.to_sql("CS_table", engine, chunksize = 100)

我的理解是,由于它一次只处理 100 行,因此 RAM 使用情况应反映保存 100 行的使用情况。幕后还有什么事情发生吗?也许是多线程?

在运行此代码之前,我使用的是 4.8 GB RAM,而 Google Colab 中可用的 12.8 GB RAM。运行上面的代码会占用所有 RAM,直到环境崩溃。

我希望能够将我的熊猫数据帧保存到 SQL 文件中,而不会使我的环境崩溃。我所处的环境是谷歌Colab。熊猫数据是 2 列,~600 万行。每个单元格包含大约这么多文本:

"主导序列转导模型基于复杂的 编码器-解码器中的循环或卷积神经网络 配置。性能最佳的型号还连接编码器和 通过注意力机制的解码器。我们提出一个新的简单 网络架构,变压器,完全基于注意力 机制,完全省去复发和卷积。 对两个机器翻译任务的实验表明,这些模型是 质量卓越,同时更可并行化和要求更高 显著减少训练时间。我们的模型BLEU在 WMT 2014 英语到德语翻译任务,改进了 现有最佳结果,包括超过 2 BLEU的合奏。在WMT上 2014年英法翻译任务,我们的模型建立了新的 单模型最先进的BLEU在训练后得分为 41.8 在八个 GPU 上花费 3.5 天,只是文献中最佳模型训练成本的一小部分。我们展示了变压器 通过将其成功应用于英语,可以很好地推广到其他任务 使用大量和有限的训练数据进行选区解析。

编辑:

我在不同的阶段做了键盘中断。这是 RAM 中第一次跳转后键盘中断的结果

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-22-51b6e444f80d> in <module>()
----> 1 dfAllT.to_sql("CS_table23", engine, chunksize = 100)
12 frames
/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
2529         sql.to_sql(self, name, con, schema=schema, if_exists=if_exists,
2530                    index=index, index_label=index_label, chunksize=chunksize,
-> 2531                    dtype=dtype, method=method)
2532 
2533     def to_pickle(self, path, compression='infer',
/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
458     pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
459                       index_label=index_label, schema=schema,
--> 460                       chunksize=chunksize, dtype=dtype, method=method)
461 
462 
/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
1172                          schema=schema, dtype=dtype)
1173         table.create()
-> 1174         table.insert(chunksize, method=method)
1175         if (not name.isdigit() and not name.islower()):
1176             # check for potentially case sensitivity issues (GH7815)
/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in insert(self, chunksize, method)
684 
685                 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
--> 686                 exec_insert(conn, keys, chunk_iter)
687 
688     def _query_iterator(self, result, chunksize, columns, coerce_float=True,
/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in _execute_insert(self, conn, keys, data_iter)
597         """
598         data = [dict(zip(keys, row)) for row in data_iter]
--> 599         conn.execute(self.table.insert(), data)
600 
601     def _execute_insert_multi(self, conn, keys, data_iter):
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
986             raise exc.ObjectNotExecutableError(object_)
987         else:
--> 988             return meth(self, multiparams, params)
989 
990     def _execute_function(self, func, multiparams, params):
/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py in _execute_on_connection(self, connection, multiparams, params)
285     def _execute_on_connection(self, connection, multiparams, params):
286         if self.supports_execution:
--> 287             return connection._execute_clauseelement(self, multiparams, params)
288         else:
289             raise exc.ObjectNotExecutableError(self)
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_clauseelement(self, elem, multiparams, params)
1105             distilled_params,
1106             compiled_sql,
-> 1107             distilled_params,
1108         )
1109         if self._has_events or self.engine._has_events:
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1246         except BaseException as e:
1247             self._handle_dbapi_exception(
-> 1248                 e, statement, parameters, cursor, context
1249             )
1250 
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
1466                 util.raise_from_cause(sqlalchemy_exception, exc_info)
1467             else:
-> 1468                 util.reraise(*exc_info)
1469 
1470         finally:
/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
127         if value.__traceback__ is not tb:
128             raise value.with_traceback(tb)
--> 129         raise value
130 
131     def u(s):
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
1222                 if not evt_handled:
1223                     self.dialect.do_executemany(
-> 1224                         cursor, statement, parameters, context
1225                     )
1226             elif not parameters and context.no_parameters:
/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py in do_executemany(self, cursor, statement, parameters, context)
545 
546     def do_executemany(self, cursor, statement, parameters, context=None):
--> 547         cursor.executemany(statement, parameters)
548 
549     def do_execute(self, cursor, statement, parameters, context=None):
KeyboardInterrupt: 

这是我在崩溃之前进行键盘中断的结果

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-24-68b60fe221fe>", line 1, in <module>
dfAllT.to_sql("CS_table22", engine, chunksize = 100)
File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
dtype=dtype, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
chunksize=chunksize, dtype=dtype, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
table.insert(chunksize, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
exec_insert(conn, keys, chunk_iter)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 599, in _execute_insert
conn.execute(self.table.insert(), data)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 988, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1468, in _handle_dbapi_exception
util.reraise(*exc_info)
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 129, in reraise
raise value
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1224, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 547, in do_executemany
cursor.executemany(statement, parameters)
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
filename = getsourcefile(frame) or getfile(frame)
File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
if getattr(getmodule(object, filename), '__loader__', None) is not None:
File "/usr/lib/python3.6/inspect.py", line 739, in getmodule
f = getabsfile(module)
File "/usr/lib/python3.6/inspect.py", line 708, in getabsfile
_filename = getsourcefile(object) or getfile(object)
File "/usr/lib/python3.6/inspect.py", line 693, in getsourcefile
if os.path.exists(filename):
File "/usr/lib/python3.6/genericpath.py", line 19, in exists
os.stat(path)
KeyboardInterrupt

我在它崩溃之前又跑了一次,这似乎给出了另一个不同的结果

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-28-f18004debe33>", line 1, in <module>
dfAllT.to_sql("CS_table25", engine, chunksize = 100)
File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
dtype=dtype, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
chunksize=chunksize, dtype=dtype, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
table.insert(chunksize, method=method)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
exec_insert(conn, keys, chunk_iter)
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in _execute_insert
data = [dict(zip(keys, row)) for row in data_iter]
File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in <listcomp>
data = [dict(zip(keys, row)) for row in data_iter]
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
filename = getsourcefile(frame) or getfile(frame)
File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
if getattr(getmodule(object, filename), '__loader__', None) is not None:
File "/usr/lib/python3.6/inspect.py", line 742, in getmodule
os.path.realpath(f)] = module.__name__
File "/usr/lib/python3.6/posixpath.py", line 388, in realpath
path, ok = _joinrealpath(filename[:0], filename, {})
File "/usr/lib/python3.6/posixpath.py", line 421, in _joinrealpath
newpath = join(path, name)
KeyboardInterrupt
---------------------------------------------------------------------------

我尝试过的其他事情:

使用 dropna 删除所有 none/nan 值

dfAllT = dfAllT.applymap(str) 以确保我的所有值都是字符串

dfAllT.reset_index(drop=True,inplace=True)以确保索引没有不对齐。

编辑:

就像评论中提到的一样,我现在尝试循环使用to_sql。

for i in range(586147):
print(i)
dfAllT.iloc[i*10000:(i+1)*10000].to_sql('CS_table', engine, if_exists= 'append')

此操作最终会占用我的 RAM,并最终导致大约中途崩溃。我想知道这是否表明 sqlite 将所有内容保存在内存中,以及是否有解决方法。

编辑:

我尝试了更多的东西,更短的卡盘,在每一步之后处理引擎并创建一个新的引擎。最终仍然吃掉了所有的RAM并崩溃了。

for i in range(586147):
print(i)
engine = sqlalchemy.create_engine("sqlite:///CSTitlesSummariesData.db")
dfAllT.iloc[i*10:(i+1)*10].to_sql('CS_table', engine, index = False, if_exists= 'append')
engine.dispose() 
gc.collect 

我的想法:

所以看起来整个数据库以某种方式以某种方式保存在活动内存中。

从中获取它的熊猫数据帧是 5 个 gigs(或者至少这是我尝试将其转换为 sqlite 之前有多少 RAM)。我的系统在大约 12.72 演出时崩溃。我会想象 sqlite 数据库比熊猫数据帧占用更少的 RAM。

我已经使用df.to_sql1 年了,现在我正在努力解决我运行大量资源并且它不起作用的事实。我意识到 chucksize 会超载你的记忆,熊猫加载到内存中,然后通过 chucks 发送它。我必须直接使用 sql 进行控制。(这是我找到解决方案的地方 -> https://github.com/pandas-dev/pandas/issues/12265 我真的鼓励你阅读到最后。

如果您需要在不重载内存的情况下从数据库中读取数据,请检查以下代码段:

def get_data_by_chunks(cls, table, chunksize: int) -> iter:
with MysqlClient.get_engine().begin() as conn:
query_count = "select COUNT(*) from my_query"
row_count = conn.execute(query_count, where).fetchone()[0]
for i in range(math.ceil(row_count / chunksize)):
query = """
SELECT * FROM my_table
WHERE my_filiters
LIMIT {offset}, {row_count};
"""
yield pd.read_sql(query, conn)
for df in get_data_by_chunks(cls, table, chunksize: int):
print(df.shape)

从单步执行代码中,我认为这是这一行,它读取创建一堆数据帧:

chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])

看起来这可能是一个错误。具体来说,这发生在数据库插入之前,正在准备中。

你可以做的一个技巧是在内存迅速增加时按 CTRL-C,看看哪一行暂停(我的赌注是这个)。

用户编辑:

通过使用解决了问题

explicit loop (rather than using chunk), ie. for i in range(100): df.iloc[i * 100000:(i+1):100000].to_sql(...)

这仍然导致内存错误,但允许用户在崩溃前循环停止的地方继续。

一个更健壮的解决方案是"也许尝试原始连接,而不是使用 SQLEngine?\" 但是用户没有机会尝试这个

我知道这不是问题的答案。我写这篇文章是为那些匆忙的人写的,to_sql很快就会耗尽他们的记忆。我阅读了源代码并提出了我自己的迭代 pandas-to-sqlite 函数,该函数使用现有的 API 并且不复制数据帧。它目前使用带有警告的SQLiteDatabase,因为此代码与sqlalchemy不兼容。

这是在 3 列 9400 万行 (2.2GB) 索引数据帧上测试的。在我相当旧的机器上,插入整个数据需要不到 5 分钟和不到 5GB RAM。我还添加了 tqdm 以采取很好的措施。

import pandas as pd
from pandas.io.sql import SQLiteDatabase, SQLiteTable
import sqlite3
from tqdm import tqdm
def df_to_sqlite(df: pd.DataFrame, db_file_name: str, table_name: str, chunk_size = 1_000_000):
# see https://stackoverflow.com/a/70488765/227755
con = sqlite3.connect(db_file_name)
db = SQLiteDatabase(con=con)
table = SQLiteTable(table_name, db, df, index=True, if_exists="fail", dtype=None)
table.create()  # can be optimized further by postponing index creation, but that means we use private/protected APIs.
insert = table.insert_statement(num_rows=1)  # single insert statement
it = df.itertuples(index=True, name=None)  # just regular tuples
pb = tqdm(it, total=len(df))  # not needed but nice to have
with con:
while True:
con.execute("begin")
try:
for c in range(0, chunk_size):
row = next(it, None)
if row is None:
pb.update(c)
return
con.execute(insert, row)
pb.update(chunk_size)
finally:
con.execute("commit")

相关内容

最新更新