pandas 的 to_sql() 方法将主键列作为 NULL 发送,即使该列在数据帧中不存在也是如此



我想将数据框插入到雪花数据库表中。数据库具有id等列,这是一个primary_keyevent_id是一个integer字段,也是nullable

我使用SQLAlchemy创建了一个declarative_base()类,如下所示 -

class AccountUsageLoginHistory(Base):
__tablename__ = constants.TABLE_ACCOUNT_USAGE_LOGIN_HISTORY
__table_args__ = {
'extend_existing':True,
'schema' : os.environ.get('SCHEMA_NAME_AUDITS')
}
id = Column(Integer, Sequence('id_account_usage_login_history'), primary_key=True)
event_id = Column(Integer, nullable=True)

上述类在雪花数据库中创建一个表。

我有一个数据框,它只有一个列event_id

当我尝试使用熊猫to_sql()方法插入数据时,雪花返回我如下所示的错误 -

snowflake.connector.errors.ProgrammingError: 100072 (22000): 01991f2c-0be5-c903-0000-d5e5000c6cee: NULL result in a non-nullable column

此错误是由雪花生成的,因为to_sql()正在追加列id并且该列的每一行的值都设置为null

dataframe.to_sql(table_name, self.engine, index=False, method=pd_writer, if_exists="append")

将此视为案例 1 -

我尝试直接对雪花运行插入查询 -

insert into "SFOPT_TEST"."AUDITS"."ACCOUNT_USAGE_LOGIN_HISTORY" (ID, EVENT_ID) values(NULL, 33)

上面的查询向我返回了相同的错误 -

NULL result in a non-nullable column

上面所述的查询是to_sql()方法可能执行的操作。

将此视为案例 2 -

我还尝试通过执行下面所述的查询来插入一行 -

insert into "SFOPT_TEST"."AUDITS"."ACCOUNT_USAGE_LOGIN_HISTORY" (EVENT_ID) values(33)

现在,此特定查询已通过将数据插入表中成功执行,并且它还为列id自动生成了值。

如何将熊猫to_sql()方法制作到用例 2

请注意,pandas.DataFrame.to_sql()默认具有参数index=True,这意味着在插入数据时它将添加一个额外的列(df.index)。

一些数据库(如 PostgreSQL)具有数据类型serial允许您按顺序用增量数字填充列。

Snowflake DB没有这个概念,但有其他方法可以处理它:

第一个选项:您可以使用CREATE SEQUENCE语句并直接在 db 中创建序列 - 这是有关此主题的官方文档。这种方法的缺点是您需要将数据帧转换为正确的 SQL 语句:

数据库准备部分:

CREATE OR REPLACE SEQUENCE schema.my_sequence START = 1 INCREMENT = 1;
CREATE OR REPLACE TABLE schema.my_table (i bigint, b text);

您需要将数据帧转换为 SnowLake 的INSERT语句,并使用schema.my_sequence.nextval获取下一个 ID 值

INSERT INTO schema.my_table VALUES
(schema.my_sequence.nextval, 'string_1'),
(schema.my_sequence.nextval, 'string_2');

结果将是:

i b
1 string_1
2 string_2

请注意,这种方法有一些限制,您需要确保以这种方式执行的每个插入语句都将成功,因为调用schema.my_sequence.nextval而不插入它将意味着会有间隙数字。 为了避免这种情况,您可以有一个单独的脚本来检查当前插入是否成功,如果没有,它将通过调用重新创建序列:

REPLACE SEQUENCE schema.my_sequence start = (SELECT max(i) FROM schema.my_table) increment = 1;

替代选项:您需要创建一个额外的函数来运行 SQL 以获取您之前插入的最后一个 i。

SELECT max(i) AS max_i FROM schema.my_table;

,然后在运行to_sql()之前更新数据帧中的index

df.index = range(max_i+1, len(df)+max_i+1)

这将确保数据帧索引在表中继续存在。 完成后,您可以使用

df.to_sql(index_label='i', name='my_table', con=connection_object)

它将使用您的索引作为您插入的列之一,允许您维护表中的唯一索引。

最新更新