我一直在尝试使用sqlalchemy将数据转储到mysql数据库。当我尝试这样做时,它给出了错误sqlalchemy.exc.ArgumentError: List argument must consist only of tuples or dictionaries
。下面的代码用于插入。
def insert_data(db, table, rows):
db.execute(f"INSERT INTO {table} VALUES (%s)", rows)
db.commit()
rows
中的含量如下:
[(1, 'asdsewadada', 'lajsdljasld', 'lol@gmail.com', 51)]
所以,我插入的是一个元组列表,我仍然得到相同的错误。
从SQLAlchemy版本2开始,你应该使用字典而不是元组:
:param parameters: parameters which will be bound into the statement. This may be either a dictionary of parameter names to values, or a mutable sequence (e.g. a list) of dictionaries. When a list of dictionaries is passed, the underlying statement execution will make use of the DBAPI ``cursor.executemany()`` method. When a single dictionary is passed, the DBAPI ``cursor.execute()`` method will be used.
所以这应该修复你的代码:
def insert_data(db: sqlalchemy.engine.base.Engine, query: str, parameters: dict):
log_headline: str = "insert_data() ::"
"""
:param db:
:param query: INSERT INTO votes (time_cast, candidate) VALUES (:time_cast, :candidate)
:param parameters: {"time_cast": time_cast, "candidate": team}
:return:
"""
# Insert
stmt = sqlalchemy.text(query)
try:
# Using a with statement ensures that the connection is always released
# back into the pool at the end of statement (even if an error occurs)
with db.connect() as conn:
conn.execute(stmt, parameters=parameters)
conn.commit()
print(f"{log_headline} OK inserted data ")
except Exception as e:
# If something goes wrong, handle the error in this section. This might
# involve retrying or adjusting parameters depending on the situation.
print(f"{log_headline} Error {e}")
可能是sql alchemy错误。我也有同样的问题。我返回到sql alchemy 1.4.46,并成功插入值。
您可以像下面这样格式化您的行数据并尝试一下吗?
[{"Value1","value2"},{"Value1","value2"}]
在使用sqlalchemy执行insert-statement时,我得到了相同的错误。然后我决定用一种新的方式来实现我的需求。我很惊讶这个新方法是快速加载数据到MySQL。
from sqlalchemy.orm import declarative_base
from sqlalchemy import create_engine
from logger import setup_logger
import pandas as pd
import yaml
logger = setup_logger("util.py")
def load_file_into_database(filename, table_name, column_mapping):
Base = declarative_base()
try:
with open('db_config.yaml', 'r') as file:
db_config = yaml.safe_load(file)
url = f'mysql+mysqlconnector://{db_config["user"]}:{db_config["password"]}@{db_config["host"]}/{db_config["database"]}'
engine = create_engine(url)
Base.metadata.create_all(engine)
if filename.endswith('.csv'):
df = pd.read_csv(filename)
elif filename.endswith('.xlsx') or filename.endswith('.xls'):
df = pd.read_excel(filename)
else:
logger.warning('Unknown file type')
return
df.rename(columns=column_mapping, inplace=True)
df.to_sql(table_name, con=engine, index=False, if_exists='replace')
logger.info('Data successfully loaded into the database')
except Exception as e:
logger.error(f'An error occurred while loading the file into the database: {e}')