我正在将csv导入postgresql 9.5.7数据库。但问题是,csv 部分格式不正确(有些行缺少逗号,因此缺少整列,或者有些行可能太多,或者某些值无效(。
因此,我会在导入之前使用外部工具清理 csv,或者让数据库本身完成过滤。
我更喜欢第二种方法,因为在我看来,它对外部 csv 清理脚本的依赖程度较低,并且所有数据验证都直接在持久性级别进行。
虽然在进行 csv 导入时通常无法处理形状错误的行,但我仍然通过以下方式找到了解决此问题的方法:
-
将 csv 作为外表包含在数据库中,但仅包含文本,并且只有一个文本列,其中包含整行(包括逗号(。
-
通过拆分单个文本列各自的逗号,从该外表插入到干净的目标表中。
但是在我的测试机器上导入一个包含 3300 万行的 200 MB csv 文件大约需要 6 个小时。那么 Insert 语句肯定可以进一步优化吗?我对邮政很陌生,所以这完全有可能。请纠正我做出的决定,可以做得更好。
现在,简要解释要建模的领域:它是关于处理传感器,这些传感器的位置通过其信号强度以特定时间间隔记录到各个站点。这些间隔以毫秒精度记录,非常精确。
因此,为使其工作而发出的所有命令如下。
创建 fdw 服务器:
CREATE EXTENSION file_fdw;
CREATE SERVER csv_import_server FOREIGN DATA WRAPPER file_fdw;
接下来,创建外部 csv 表,只有一个文本列包含所有数据。 干净的行如下所示:
'1465721143588,-83,55,1361'
其中第一个值是具有毫秒精度的Unix 时间戳, 则RSSI信号强度值, 然后是接收信号的电台 ID, 然后传感器的 ID
CREATE FOREIGN TABLE signals_csv (
value TEXT )
SERVER csv_import_server OPTIONS(
filename '<path_to_file>/signals.csv', format 'text');
目标表:
CREATE TABLE signals (
timestamp TIMESTAMP NOT NULL,
rssi INTEGER NOT NULL,
stations_id INTEGER NOT NULL,
distributed_tags_id INTEGER NOT NULL,
PRIMARY KEY(timestamp, stations_id, distributed_tags_id),
FOREIGN KEY(stations_id) REFERENCES stations(stations_id),
FOREIGN KEY(distributed_tags_id) REFERENCES tags(id) );
现在插入:
INSERT INTO signals (timestamp, rssi, stations_id, distributed_tags_id) SELECT
TO_TIMESTAMP( tmp.timestamp::double precision / 1000),
tmp.rssi::INTEGER,
tmp.stations_id::INTEGER,
tmp.distributed_tags_id::INTEGER
FROM ( SELECT
SPLIT_PART ( value, ',', 1) AS timestamp,
SPLIT_PART ( value, ',', 2) AS rssi,
SPLIT_PART ( value, ',', 3) AS stations_id,
SPLIT_PART ( value, ',', 4) AS distributed_tags_id
FROM signals_csv ) AS tmp WHERE (
tmp.timestamp ~ '^[0-9]+$' AND
tmp.rssi ~ '^-[0-9]+$' AND
tmp.stations_id ~ '^[0-9]+$' AND
tmp.distributed_tags_id ~ '^[0-9]+$' AND
EXISTS ( SELECT 1 FROM tags t WHERE t.id::TEXT = tmp.distributed_tags_id ) AND
EXISTS ( SELECT 1 FROM stations s WHERE s.stations_id::TEXT = tmp.stations_id )
)
ON CONFLICT (timestamp, stations_id, distributed_tags_id ) DO NOTHING;
我猜批量性能命中是:
- 将 UNIX 时间戳转换为双精度,然后转换它们的除法,
- 拆分字符串的正则表达式分析。
- 外键查找检查
在我看来,如果我想以一致的方式对数据进行建模,同时以人类可读的方式存储毫秒精度,则无法绕过这些限制。
导入的数据是干净且一致的,至于这个维度,我对我的方法感到满意;唯一的缺点是它的性能很差。因此,如果有人能给我如何改进这一点的指示,我将不胜感激。
干杯!
如果要插入很多行,则可以使用 COPY 而不是 INSERT。
它的性能比插入更好。
我以不同的方式解决了它,并且可以将导入时间从 7 小时减少到只有 1 小时。
因此,我没有在插入之前验证数据(在我最初帖子的 WHERE 分支中(,而是让 INSERT 操作本身验证数据(因为我无论如何都在 CREATE TABLE 中定义了列的类型(。
尽管由于 INSERT 在遇到意外数据类型时会抛出异常,所以我在循环中执行每行 INSERT,以便异常仅中止当前迭代而不是整个事务。
工作代码如下所示:
CREATE OR REPLACE FUNCTION import_tags_csv( path_to_csv TEXT ) RETURNS VOID AS $$
DECLARE
cursor SCROLL CURSOR FOR SELECT
SPLIT_PART ( value, ',', 1) AS id,
SPLIT_PART ( value, ',', 2) AS active_from,
SPLIT_PART ( value, ',', 3) AS active_to
FROM csv_table;
i BIGINT := 0;
BEGIN
-- create the whole foreign data wrapper for integrating the csv:
CREATE EXTENSION file_fdw;
CREATE SERVER csv_import_server FOREIGN DATA WRAPPER file_fdw;
EXECUTE '
CREATE FOREIGN TABLE csv_table ( value TEXT )
SERVER csv_import_server OPTIONS( filename ''' || path_to_csv || ''', format ''text'')';
-- Iterating through the rows, converting the text data and inserting it into table tags
FOR csv_row IN cursor LOOP
BEGIN
i := i +1;
INSERT INTO tags (
id,
active_from,
active_to)
VALUES (
csv_row.id::INTEGER,
TO_TIMESTAMP( csv_row.active_from::double precision / 1000),
TO_TIMESTAMP( csv_row.active_to::double precision / 1000) );
--If invalid data is read, the table constraints throw an exception. The faulty line is dismissed
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE E'% nt line %: %n', SQLERRM, i, csv_row;
END;
END LOOP;
-- Dropping the foreign table which had the csv integrated
DROP FOREIGN TABLE csv_table;
DROP SERVER csv_import_server;
DROP EXTENSION file_fdw;
END;
$$ LANGUAGE plpgsql;