我有一个数据帧,大约有155000行和12列。如果我用dataframe.to_csv将其导出到csv,则输出是一个11MB的文件(立即生成)。
但是,如果我使用to_SQL方法导出到Microsoft SQL Server,则需要5到6分钟!没有列是文本:只有int、float、bool和date。我见过ODBC驱动程序设置nvarchar(max)的情况,这会减慢数据传输速度,但这里不能这样。
关于如何加快出口进程,有什么建议吗?导出11 MB的数据需要6分钟,这使得ODBC连接实际上无法使用。
谢谢!
我的代码是:
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"
engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database)
conn = engine.connect()
metadata = MetaData(conn)
my_data_frame.to_sql(TableName,engine)
我最近遇到了同样的问题,我想为其他人添加一个答案。to_sql
似乎为每一行发送一个INSERT
查询,这使得它非常慢。但由于0.24.0
,pandas.to_sql()
中有一个method
参数,您可以在其中定义自己的插入函数,或者只使用method='multi'
告诉panda在一个INSERT查询中传递多行,这使它更快。
请注意,您的数据库可能有参数限制。在这种情况下,您还必须定义一个chunksize。
因此,解决方案应该看起来像这样:
my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')
如果您不知道数据库参数的限制,请尝试不使用chunksize参数。它会运行,或者给你一个错误,告诉你你的极限。
DataFrame.to_sql
方法为ODBC连接器生成插入语句,然后ODBC连接器将其视为常规插入。
如果进展缓慢,那就不是熊猫的错。
将DataFrame.to_sql
方法的输出保存到一个文件中,然后通过ODBC连接器重播该文件将花费相同的时间。
将数据批量导入数据库的正确方法是生成csv文件,然后使用load命令,在SQL数据库的MS风格中,该命令被称为BULK INSERT
例如:
BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';
语法参考如下:
BULK INSERT
[ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ]
FROM 'data_file'
[ WITH
(
[ [ , ] BATCHSIZE = batch_size ]
[ [ , ] CHECK_CONSTRAINTS ]
[ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ]
[ [ , ] DATAFILETYPE =
{ 'char' | 'native'| 'widechar' | 'widenative' } ]
[ [ , ] FIELDTERMINATOR = 'field_terminator' ]
[ [ , ] FIRSTROW = first_row ]
[ [ , ] FIRE_TRIGGERS ]
[ [ , ] FORMATFILE = 'format_file_path' ]
[ [ , ] KEEPIDENTITY ]
[ [ , ] KEEPNULLS ]
[ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ]
[ [ , ] LASTROW = last_row ]
[ [ , ] MAXERRORS = max_errors ]
[ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ]
[ [ , ] ROWS_PER_BATCH = rows_per_batch ]
[ [ , ] ROWTERMINATOR = 'row_terminator' ]
[ [ , ] TABLOCK ]
[ [ , ] ERRORFILE = 'file_name' ]
)]
您可以使用它:使它更快的是Pandato_sql
的method
参数。我希望这种帮助能有所帮助。
根据我的经验,结果是从无限时间到8秒。
df = pd.read_csv('test.csv')
conn = create_engine(<connection_string>)
start_time = time.time()
df.to_sql('table_name', conn, method='multi',index=False, if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))
使用SQLAlchemy>=1.3
,在创建engine
对象时,设置fast_executemany=True
。参考
您可以使用d6tstack,它具有快速pandas-to-SQL功能,因为它使用本机DB导入命令。支持MS SQL、Postgres和MYSQL
uri_psql = 'postgresql+psycopg2://usr:pwd@localhost/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
uri_mssql = 'mssql+pymssql://usr:pwd@localhost/db'
d6tstack.utils.pd_to_mssql(df, uri_mssql, 'table', 'schema') # experimental
对于导入多个带有数据模式更改的CSV和/或在写入数据库之前使用panda进行预处理也很有用,请参阅示例笔记本
d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'),
apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')
为什么pandas.DataFrame.to_sql
速度慢
将数据从pandas
上载到Microsoft SQL Server时,实际上大部分时间都用于将pandas
到Python对象转换为MS SQL ODBC驱动程序所需的表示形式。pandas
在分析方面比基本Python代码快得多的原因之一是,它在整数/浮点/…的精简本机数组上工作,这些数组的开销与相应的Python代码不同。to_sql
方法实际上是将所有这些精简列转换为许多单独的Python对象,因此不像其他pandas
操作那样得到通常的性能处理。
使用turbodbc.Cursor.insertmanycolumns
加快速度
给定pandas.DataFrame
,您可以使用turbodbc
和pyarrow
插入数据,与转换为Python对象相比,转换开销更小。
import pyarrow as pa
import turbodbc
cursor = … # cursor to a MS SQL connection initiated with turbodbc
df = … # the pd.DataFrame to be inserted
# Convert the pandas.DataFrame to a pyarrow.Table, most of the columns
# will be zero-copy and thus this is quite fast.
table = pa.Table.from_pandas(table)
# Insert into the database
cursor.executemanycolumns("INSERT INTO my_table VALUES (?, ?, ?)",
table)
为什么速度更快
代替CCD_ 25->Python对象的集合->ODBC数据结构,我们正在做一个转换路径pd.DataFrame
->pyarrow.Table
->ODBC结构。这更具性能,因为:
pandas.DataFrame
的大多数列可以在不复制的情况下转换为pyarrow.Table
的列。表中的列将引用相同的内存。因此没有进行实际的转换- 转换完全在具有本机类型的本机代码中完成。这意味着,只要没有
object
类型的列,就不会出现Python对象的开销
对于sqlalchemy
>=1.3,而不是使用to_sql()
的方法参数,在sqlalchemy's create_engine()
中使用fast_executemany=True
。这应该至少与method="multi"
一样快,同时避免T-SQL对存储过程的2100个参数值的限制,这会导致此处所示的错误。
这要归功于来自同一链接的戈尔德·汤普森。
我的时间和内存不足(为从120MB CSV加载的DataFrame分配了超过18GB的空间),这一行:
df.to_sql('my_table', engine, if_exists='replace', method='multi', dtype={"text_field": db.String(64), "text_field2": db.String(128), "intfield1": db.Integer(), "intfield2": db.Integer(), "floatfield": db.Float()})
以下是帮助我同时导入和跟踪插入进度的代码:
import sqlalchemy as db
engine = db.create_engine('mysql://user:password@localhost:3306/database_name', echo=False)
connection = engine.connect()
metadata = db.MetaData()
my_table = db.Table('my_table', metadata,
db.Column('text_field', db.String(64), index=True),
db.Column('text_field2', db.String(128), index=True),
db.Column('intfield1', db.Integer()),
db.Column('intfield2', db.Integer()),
db.Column('floatfield', db.Float())
)
metadata.create_all(engine)
kw_dict = df.reset_index().sort_values(by="intfield2", ascending=False).to_dict(orient="records")
batch_size=10000
for batch_start in range(0, len(kw_dict), batch_size):
print("Inserting {}-{}".format(batch_start, batch_start + batch_size))
connection.execute(my_table.insert(), kw_dict[batch_start:batch_start + batch_size])
基于这个答案-Aseem。
可以使用copy_from方法模拟带有光标对象的大容量加载。这是在Postgres上测试过的,用你的DB:试试吧
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
from StringIO import StringIO
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"
engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database) #don't forget to add a password if needed
my_data_frame.head(0).to_sql(TableName, engine, if_exists='replace', index=False) # create an empty table - just for structure
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
my_data_frame.to_csv(output, sep='t', header=False, index=False) # a CSV that will be used for the bulk load
output.seek(0)
cur.copy_from(output, TableName, null="") # null values become ''
conn.commit()
conn.close()
cur.close()
如果这对任何人都有帮助,我的解决方案如下。根据我所读到的内容,pandastosql方法一次加载一条记录。
您可以制作一个大容量插入语句,加载1000行并提交该事务,而不是每次提交一行。这大大提高了速度。
import pandas as pd
from sqlalchemy import create_engine
import pymssql
import os
connect_string = [your connection string]
engine = create_engine(connect_string,echo=False)
connection = engine.raw_connection()
cursor = connection.cursor()
def load_data(report_name):
# my report_name variable is also my sql server table name so I use that variable to create table name string
sql_table_name = 'AR_'+str(report_name)
global chunk # to QC chunks that fail for some reason
for chunk in pd.read_csv(report_full_path_new,chunksize=1000):
chunk.replace(''','''',inplace=True,regex=True) #replace single quotes in data with double single quotes to escape it in mysql
chunk.fillna('NULL',inplace=True)
my_data = str(chunk.to_records(index=False).tolist()) # convert data to string
my_data = my_data[1:-1] # clean up the ends
my_data = my_data.replace('"',''').replace(''NULL'','NULL') #convert blanks to NULLS for mysql
sql_table_name = [your sql server table name]
sql = """
INSERT INTO {0}
VALUES {1}
""".format(sql_table_name,my_data)
cursor.execute(sql)
# you must call commit() to persist your data if you don't set autocommit to True
connection.commit()
正如其他答案中所说,速度减慢和/或超时的原因是panda一遍又一遍地插入许多单行。大量的插入命令是缓慢的和/或可能使目标数据库过载。
使用method=‘multi’命令熊猫分块上传。这要快得多,不会那么容易超时。
sqlEngine=create_engine('mysql+mysqlconnector://'+config['user']+':'+config['pass']+'@'+config['host']+'/'+config['dbname'])
dbConnection=sqlEngine.connect()
df.to_sql('table_name',con=dbConnection,method='multi',if_exists='append',index=False)
dbConnection.close()
上面的pyarrow答案可能是最好的,但对于mariadb,我在DataFrame上编写了一个使用executemany
和fetchall
的包装器,这使我的速度提高了300倍。这还有一个额外的好处,那就是根本不使用sqlalchemy。
您可以正常使用它:df.to_sql(...)
或df = read_sql_table(...)
。
请参阅https://gist.github.com/MichaelCurrie/b5ab978c0c0c1860bb5e75676775b43b