如何使用SQLAlchemy将Panda DataFrame追加到MySQL



我正在将数据从数据帧推送到MySQL中,现在它只是在数据不存在的情况下向表中添加新数据(追加(。这非常完美,但我也希望我的代码检查记录是否已经存在,然后需要更新。所以我需要它来追加+更新。我真的不知道如何开始解决这个问题,因为我被卡住了。。。。以前有人试过这个吗?

这是我的代码:

engine = create_engine("mysql+pymysql://{user}:{pw}@localhost/{db}"
.format(user="root",
pw="*****",
db="my_db"))
my_df.to_sql('my_table', con = engine, if_exists = 'append')

您可以在DB端使用下一个解决方案:

第一个:为Pandas的插入数据创建表(称之为测试(:

CREATE TABLE `test` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`capacity` INT(11) NOT NULL,
PRIMARY KEY (`id`)
);

第二步:为生成的数据创建与测试完全相同的表(称为累积测试(:

CREATE TABLE `cumulative_test` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`name` VARCHAR(100) NOT NULL,
`capacity` INT(11) NOT NULL,
PRIMARY KEY (`id`)
);

第三:在每次插入测试表时设置触发器会在第二个表中插入矿石更新记录,如:

DELIMITER $$
CREATE
/*!50017 DEFINER = 'root'@'localhost' */
TRIGGER `before_test_insert` BEFORE INSERT ON `test` 
FOR EACH ROW BEGIN
DECLARE _id INT;

SELECT id INTO _id
FROM `cumulative_test` WHERE `cumulative_test`.`name` = new.name;

IF _id IS NOT NULL THEN
UPDATE cumulative_test
SET `cumulative_test`.`capacity` = `cumulative_test`.`capacity` + new.capacity;
ELSE 
INSERT INTO `cumulative_test` (`name`, `capacity`) 
VALUES (NEW.name, NEW.capacity);
END IF; 
END;
$$
DELIMITER ;

因此,您已经将值插入到测试表中,并在第二个表中获得计算结果。触发器内部的逻辑可以根据您的需要进行匹配。

与PostgreSQL中使用的方法类似,您可以在MySQL中使用INSERT … ON DUPLICATE KEY

with engine.begin() as conn:
# step 0.0 - create test environment
conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
conn.execute(
sa.text(
"CREATE TABLE main_table (id int primary key, txt varchar(50))"
)
)
conn.execute(
sa.text(
"INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
)
)
# step 0.1 - create DataFrame to UPSERT
df = pd.DataFrame(
[(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
)
# step 1 - create temporary table and upload DataFrame
conn.execute(
sa.text(
"CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
)
)
df.to_sql("temp_table", conn, index=False, if_exists="append")
# step 2 - merge temp_table into main_table
conn.execute(
sa.text(
"""
INSERT INTO main_table (id, txt) 
SELECT id, txt FROM temp_table
ON DUPLICATE KEY UPDATE txt = VALUES(txt)
"""
)
)
# step 3 - confirm results
result = conn.execute(
sa.text("SELECT * FROM main_table ORDER BY id")
).fetchall()
print(result)  # [(1, 'row 1 new text'), (2, 'new row 2 text')]

最新更新