我正在寻找构建ETL管道来填充PostgreSQL表的想法。我希望批量加载数万行,所以不是大量的数据,但足够大,可以在实现任何东西之前进行思考。
我期望在绝大多数情况下,传入的数据将是干净的,并且应该没有问题地插入。然而,我不会感到惊讶,如果我不时地得到一个或两个记录在批处理一个问题:也许是重复键,或字符字段比预期更长的时间,或失踪的字段非空的列。诸如此类的事情。
当发生这种情况时,我希望所有不出错的行都能成功插入,并且我的代码以某种方式确定哪一行失败。
我可以通过在一个单独的事务中插入每条记录来做到这一点,但是对于绝大多数情况来说,这将是非常低效的,因为所有的记录都希望被干净地插入。
有哪些工具和策略可以实现这些目标?
我建议有两个选择。
。下面是逻辑层中的一些伪代码,我希望清楚。
open the data file;
issue 'BEGIN TANSACTION' to the DB;
while (there are lines left to read from the data file)
{
read a line;
try
{
issue 'SAVEPOINT <name>' to the DB;
process and insert the line into the target DB tables; // This may bang
issue 'RELEASE SAVEPOINT <name>' to the DB;
}
catch (exception ex)
{
issue 'ROLLBACK TO SAVEPOINT <name>' to the DB;
handle the unfortunate event;
log information about the failing line;
}
}
issue 'COMMIT TANSACTION' to the DB;
close the data file;
同时,如果发生了灾难性的事情,你可以像往常一样向DB发出ROLLBACK TANSACTION
。
B。另一种方法是在plpgsql函数中执行相同的。我以前这样做过,效果很好:
- 从
pg_temp
模式的数据文件中创建一个外表(即临时外表,以便解决方案是并发的); - 外表应该有一列类型为
text
; - 然后在隐式游标循环中逐行读取。
更多类似plpgsql的伪代码
for running_line in select the_line_column from pg_temp.the_foreign_table loop
begin
-- process and insert running_line into the target tables;
exception when others then
-- handle the unfortunate event;
end;
end loop;
都很有效。后者非常快。