遍历pandas数据帧,将数据以100000条记录(而不是日期)批量发送到数据库



我正在尝试遍历pandas数据帧,但目前我正在根据列"DATE"进行筛选。基本上,这些数据相当大,每个日期包含大约50万条记录,将数据插入数据库大约需要40分钟。我正在尝试从这种方法转向数据帧,尝试迭代数据帧,一次发送100000条记录,直到数据帧中的每一条记录都上传到数据库中。这是我到目前为止所做的,但不确定如何迭代一次获取100000条记录。

请不要告诉我使用chunksize(已在代码中注释(。它太慢了,我不得不使用fast_executemy=True。

for f in hours['DATE'].unique():
hours.loc[hours['DATE'] == f]
print("--------------------------------------------")
print(f)
print('Time Starts: ', datetime.now().strftime("%m/%d/%Y %H:%M:%S"))

# Variable names
Server='server'
Database='db'
Driver='SQL Server Native Client 11.0'
Database_Con = 'mssql+pyodbc://@' + Server + '/' + Database + '?trusted_connection=yes&driver=' + Driver


# Creating an engine connection 
engine=create_engine(Database_Con, fast_executemany=True)
engine = engine.connect()


hours.loc[hours['DATE'] ==f].to_sql('KRONOS_EMPLOYEE_HOURS_TEMP',
engine,
schema='dbo',
#chunksize=math.floor(2000/len(employee_roster.columns)), # this works along with method='multi'
#chunksize=100000,
##method='multi',
if_exists='append',
index=False
)

print('Time Ends: ', datetime.now().strftime("%m/%d/%Y %H:%M:%S"))

您可以通过使用SQLAlchemy的ORM会话功能或在计数循环中运行该功能来轻松实现您想要的内容。ORM功能更好,因为您启动一个会话,然后添加所有要插入的记录,然后提交会话管理的更改。据我所知,一次可以插入的记录数量没有限制(我查看了SQLAlchemy文档,没有看到任何提到这一点(。考虑到SQLAlchemy被设计为与驱动程序无关(相同的代码应该与SQLite、PostgreSQL、ODBC、MySQL、Oracle等接口(,并且每个驱动程序都有不同的插入限制,我认为会话会自动批量刷新x记录,其中x是给定驱动程序的插入限制。ORM工作得很好,所以我会给出一些可能对您有用的示例代码,但我也会提供一些代码,用于您可以将代码放入的循环计数。

ORM解决方案

其原理是,您可以设置一个类(从declarative_base继承而来(来保存表信息。这也为您创建记录插入提供了一个可读的结构。正如您将看到的,每个类属性都是一个Column(),构造函数参数包含SQL类型和关键字。之后,创建一个与现有引擎类似的引擎,然后使用sessionmaker创建会话。之后,您可以运行for循环,逐行迭代Panda数据帧,为构建的记录运行Session.add(record)方法。添加完所有记录后,只需Session.commit()即可运行查询。对我来说,代码在大约8秒内将75k+行的数据集插入SQLite(插入限制为1000(,或在24秒内插入Azure托管的远程PostgreSQL数据库。可能有一些方法可以优化这一点,但它至少有效,并且在40分钟内会有显著的改进(比如如何?(。此外,正如您所看到的,您可以通过从pandas数据帧转换为numpy数组(请参阅此处(来大幅增加处理量

from sqlalchemy import (
create_engine,
Column,
Integer,
Text,
DateTime,
Float)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from time import time
import numpy as np
import pandas as pd
class Employees(Base):
__tablename__ = "tblEmployees"
id = Column(Integer, autoincrement=True, primary_key=True, nullable=False)
name = Column(Text)
class Hours(Base):
__tablename__ = "tblHours"
id  = Column(Integer, autoincrement=True, primary_key=True, nullable=False)
datetime = Column(DateTime)
hours = Column(Float)
employee_id = Column(Integer, ForeignKey('tblEmployees.id'), nullable=False)

# Variable names
Server='server'
Database='db'
Driver='SQL Server Native Client 11.0'
Database_Con = 'mssql+pyodbc://@' + Server + '/' + Database + '?trusted_connection=yes&driver=' + Driver
engine = create_engine(Database_Con, fast_executemany=True)
Session = sessionmaker()
Session.configure(bind=engine)
s = Session()
t = time()
print('Time Starts: ', datetime.now().strftime("%m/%d/%Y %H:%M:%S"))
try:
n = df.to_numpy().tolist()
for x in n:
record = Hours(**{
'datetime'    : x[0],
'hours'       : x[1],
'employee_id' : x[2]
})
s.add(record)
s.commit()
except Exception as e:
s.rollback()
print(f"[{datetime.now()}][WARNING] Error inserting rows to database.")
print(f"[INFO] {e}")
finally:
s.close()
print(f"Time elapsed: {str(time()-t)}s.")
print('Time Ends: ', datetime.now().strftime("%m/%d/%Y %H:%M:%S"))

计数环路解决方案

好的,因此,只需创建两个变量,isqlLimit。后者是一个常数,您可以将其设置为任意值(例如99999,因为0被计数,给出100000条记录(,第一个在执行过程中更新。您创建一个for循环来遍历pandas数据帧的每一行,然后在内部运行检查:如果是i == sqlLimiti == len(df),则运行sql,重置i = 0,启动新sql,添加行;否则增加CCD_ 11并增加行。

i = 0
sqlLimit = 99999
for x in range(len(df)): #where df is your dataframe
if i == sqlLimit or i == len(df):
# sql execution
if i == len(df):
break
else:
i = 0
sql = "" # or whatever to set it to what it was at the beginning of the loop
# add formatted row to sql (with .iloc?)
else:
i += 1
# add formatted row to sql (with .iloc?)

相关内容

  • 没有找到相关文章

最新更新