我想将数据框插入到雪花数据库表中。数据库具有id
等列,这是一个primary_key
,event_id
是一个integer
字段,也是nullable
。
我使用SQLAlchemy创建了一个declarative_base()
类,如下所示 -
class AccountUsageLoginHistory(Base):
__tablename__ = constants.TABLE_ACCOUNT_USAGE_LOGIN_HISTORY
__table_args__ = {
'extend_existing':True,
'schema' : os.environ.get('SCHEMA_NAME_AUDITS')
}
id = Column(Integer, Sequence('id_account_usage_login_history'), primary_key=True)
event_id = Column(Integer, nullable=True)
上述类在雪花数据库中创建一个表。
我有一个数据框,它只有一个列event_id
。
当我尝试使用熊猫to_sql()
方法插入数据时,雪花返回我如下所示的错误 -
snowflake.connector.errors.ProgrammingError: 100072 (22000): 01991f2c-0be5-c903-0000-d5e5000c6cee: NULL result in a non-nullable column
此错误是由雪花生成的,因为to_sql()
正在追加列id
并且该列的每一行的值都设置为null
。
dataframe.to_sql(table_name, self.engine, index=False, method=pd_writer, if_exists="append")
将此视为案例 1 -
我尝试直接对雪花运行插入查询 -
insert into "SFOPT_TEST"."AUDITS"."ACCOUNT_USAGE_LOGIN_HISTORY" (ID, EVENT_ID) values(NULL, 33)
上面的查询向我返回了相同的错误 -
NULL result in a non-nullable column
上面所述的查询是to_sql()
方法可能执行的操作。
将此视为案例 2 -
我还尝试通过执行下面所述的查询来插入一行 -
insert into "SFOPT_TEST"."AUDITS"."ACCOUNT_USAGE_LOGIN_HISTORY" (EVENT_ID) values(33)
现在,此特定查询已通过将数据插入表中成功执行,并且它还为列id
自动生成了值。
如何将熊猫to_sql()
方法制作到用例 2?
请注意,pandas.DataFrame.to_sql()
默认具有参数index=True
,这意味着在插入数据时它将添加一个额外的列(df.index)。
一些数据库(如 PostgreSQL)具有数据类型serial
允许您按顺序用增量数字填充列。
Snowflake DB没有这个概念,但有其他方法可以处理它:
第一个选项:您可以使用CREATE SEQUENCE
语句并直接在 db 中创建序列 - 这是有关此主题的官方文档。这种方法的缺点是您需要将数据帧转换为正确的 SQL 语句:
数据库准备部分:
CREATE OR REPLACE SEQUENCE schema.my_sequence START = 1 INCREMENT = 1;
CREATE OR REPLACE TABLE schema.my_table (i bigint, b text);
您需要将数据帧转换为 SnowLake 的INSERT
语句,并使用schema.my_sequence.nextval
获取下一个 ID 值
INSERT INTO schema.my_table VALUES
(schema.my_sequence.nextval, 'string_1'),
(schema.my_sequence.nextval, 'string_2');
结果将是:
i b
1 string_1
2 string_2
请注意,这种方法有一些限制,您需要确保以这种方式执行的每个插入语句都将成功,因为调用schema.my_sequence.nextval
而不插入它将意味着会有间隙数字。 为了避免这种情况,您可以有一个单独的脚本来检查当前插入是否成功,如果没有,它将通过调用重新创建序列:
REPLACE SEQUENCE schema.my_sequence start = (SELECT max(i) FROM schema.my_table) increment = 1;
替代选项:您需要创建一个额外的函数来运行 SQL 以获取您之前插入的最后一个 i。
SELECT max(i) AS max_i FROM schema.my_table;
,然后在运行to_sql()
之前更新数据帧中的index
df.index = range(max_i+1, len(df)+max_i+1)
这将确保数据帧索引在表中继续存在。 完成后,您可以使用
df.to_sql(index_label='i', name='my_table', con=connection_object)
它将使用您的索引作为您插入的列之一,允许您维护表中的唯一索引。