如何使用动态SQL设置复合变量字段的值



给定此类型:

-- Just for testing purposes:
CREATE TYPE testType as (name text)

我可以使用此函数动态获取字段的值:

CREATE OR REPLACE FUNCTION get_field(object anyelement, field text) RETURNS text as
$BODY$
DECLARE
    value text;
BEGIN
    EXECUTE 'SELECT $1."' || field || '"'
      USING object
       INTO value;
    return value;
END;
$BODY$
LANGUAGE plpgsql

调用get_field('(david)'::testType, 'name')按预期工作,返回"大卫"。

但是,如何在复合类型中设置字段的值呢?我尝试了这些功能:

CREATE OR REPLACE FUNCTION set_field_try1(object anyelement, field text, value text)
RETURNS anyelement
as
$BODY$
DECLARE
    value text;
BEGIN
    EXECUTE '$1."' || field || '" := $2'
      USING object, value;
    return object;
END;
$BODY$
LANGUAGE plpgsql
CREATE OR REPLACE FUNCTION set_field_try2(object anyelement, field text, value text)
RETURNS anyelement
as
$BODY$
DECLARE
    value text;
BEGIN
    EXECUTE 'SELECT $1 INTO $2."' || field || '"'
      USING value, object;
    return object;
END;
$BODY$
LANGUAGE plpgsql
CREATE OR REPLACE FUNCTION set_field_try3(object anyelement, field text, value text)
RETURNS anyelement
as
$BODY$
DECLARE
    value text;
BEGIN
    EXECUTE 'BEGIN $1."' || field || '" := $2; SELECT $1; END;'
       INTO object
      USING value, object;
    return object;
END;
$BODY$
LANGUAGE plpgsql

和一些变化。调用set_field_tryX不起作用。我总是收到"错误:语法错误在或接近..."。我怎样才能做到这一点?

笔记:

  • 参数anyelement,字段可以是复合类型中的任何字段。我不能只使用 object.name。
  • 我担心SQL注入。这方面的任何建议将不胜感激,但这不是我的问题。

hstore更快

自 Postgres 9.0 以来,在您的数据库中安装了附加模块hstore有一个非常简单和快速的解决方案,#=运算符可以...

record 中的字段替换为 hstore 中的匹配值。

要安装模块:

CREATE EXTENSION hstore;

例子:

SELECT my_record #= '"field"=>"value"'::hstore;  -- with string literal
SELECT my_record #= hstore(field, value);        -- with values

显然,必须将值转换为text和返回。

包含更多详细信息的 plpgsql 函数示例:

  • 触发功能中的无限循环
  • 在 Postgres 触发器中按键分配给 NEW

现在也适用于json/jsonb

json(第 9.3+ 页(或jsonb(第 9.4+ 页(也有类似的解决方案

SELECT json_populate_record (my_record, json_build_object('key', 'new-value');

该功能没有记录,但自 Postgres 13 以来一直是官方的。手册:

但是,如果 base 不是 NULL,则它包含的值将用于不匹配的列。

因此,您可以获取任何现有行并填充任意字段(覆盖其中的内容(。

jsonhstore的主要优势:

  • 适用于库存 Postgres,因此您不需要额外的模块。
  • 也适用于嵌套数组和复合类型。

小缺点:有点慢。

有关详细信息,请参阅@Geir添加的答案。

没有hstorejson

如果您使用的是旧版本或无法hstore安装其他模块或无法假设已安装,以下是我之前发布的改进版本。不过,仍然比hstore运算符慢:

CREATE OR REPLACE FUNCTION f_setfield(INOUT _comp_val anyelement
                                          , _field text, _val text)
  RETURNS anyelement
  LANGUAGE plpgsql STABLE AS
$func$
BEGIN
EXECUTE 'SELECT ' || array_to_string(ARRAY(
      SELECT CASE WHEN attname = _field
                THEN '$2'
                ELSE '($1).' || quote_ident(attname)
             END AS fld
      FROM   pg_catalog.pg_attribute
      WHERE  attrelid = pg_typeof(_comp_val)::text::regclass
      AND    attnum > 0
      AND    attisdropped = FALSE
      ORDER  BY attnum
      ), ',')
USING  _comp_val, _val
INTO   _comp_val;
END
$func$;

叫:

CREATE TEMP TABLE t( a int, b text);  -- Composite type for testing
SELECT f_setfield(NULL::t, 'a', '1');

笔记

  • 不需要显式强制转换_val到目标数据类型的值,动态查询中的字符串文本将自动强制转换,从而避免pg_type上的子查询。但我更进一步:

  • 通过 USING 子句将quote_literal(_val)替换为直接值插入。保存一个函数调用和两个强制转换,无论如何都更安全。 text在现代PostgreSQL中自动强制到目标类型。(未在 9.1 之前的版本进行测试。

  • array_to_string(ARRAY())string_agg()快.

  • 无需变量,无需DECLARE。更少的作业。

  • 动态 SQL 中没有子查询。 ($1).field更快。

  • pg_typeof(_comp_val)::text::regclass
    相同 (SELECT typrelid FROM pg_catalog.pg_type WHERE oid = pg_typeof($1)::oid) 对于有效的复合类型,只需更快.
    最后的修改基于以下假设:pg_type.typname始终与已注册复合类型的关联pg_class.relname相同,并且双重命名可以替换子查询。我在一个大型数据库中运行了这个测试来验证,它如预期的那样是空的:

    SELECT *
    FROM   pg_catalog.pg_type t
    JOIN   pg_namespace  n ON n.oid = t.typnamespace
    WHERE  t.typrelid > 0  -- exclude non-composite types
    AND    t.typrelid IS DISTINCT FROM
          (quote_ident(n.nspname ) || '.' || quote_ident(typname))::regclass
  • 使用 INOUT 参数可避免显式RETURN。这只是一个符号快捷方式。帕维尔不会喜欢它,他更喜欢明确的RETURN声明......

所有东西放在一起,速度是以前版本的两倍

<小时 />

原始(过时(答案:

结果是一个快 ~ 2.25 倍的版本。但是如果没有Pavel的第二个版本,我可能无法做到这一点。

此外,此版本通过在单个查询中执行所有操作来避免大部分强制转换为文本和返回,因此它应该不那么容易出错.
使用 PostgreSQL 9.0 和 9.1 进行测试。

CREATE FUNCTION f_setfield(_comp_val anyelement, _field text, _val text)
  RETURNS anyelement
  LANGUAGE plpgsql STABLE AS
$func$
DECLARE
   _list text;
BEGIN
_list := (
   SELECT string_agg(x.fld, ',')
   FROM  (
      SELECT CASE WHEN a.attname = $2
              THEN quote_literal($3) || '::'|| (SELECT quote_ident(typname)
                                                FROM   pg_catalog.pg_type
                                                WHERE  oid = a.atttypid)
              ELSE quote_ident(a.attname)
             END AS fld
      FROM   pg_catalog.pg_attribute a 
      WHERE  a.attrelid = (SELECT typrelid
                           FROM   pg_catalog.pg_type
                           WHERE  oid = pg_typeof($1)::oid)
      AND    a.attnum > 0
      AND    a.attisdropped = false
      ORDER  BY a.attnum
      ) x
   );
EXECUTE 'SELECT ' || _list || ' FROM  (SELECT $1.*) x'
USING  $1
INTO   $1;
RETURN $1;
END
$func$;

我写了第二个版本的setfield函数。它适用于 postgres 9.1 我没有在旧版本上测试它。这不是一个奇迹(从性能的角度来看(,但它比以前的更强大,速度快约 8 倍。

CREATE OR REPLACE FUNCTION public.setfield2(anyelement, text, text)
 RETURNS anyelement
 LANGUAGE plpgsql
AS $function$
DECLARE 
  _name text;
  _values text[];
  _value text;
  _attnum int;
BEGIN
  FOR _name, _attnum
     IN SELECT a.attname, a.attnum
          FROM pg_catalog.pg_attribute a 
         WHERE a.attrelid = (SELECT typrelid
                               FROM pg_type
                              WHERE oid = pg_typeof($1)::oid)
           AND a.attnum > 0 
  LOOP
    IF _name = $2 THEN
      _value := $3;
    ELSE
      EXECUTE 'SELECT (($1).' || quote_ident(_name) || ')::text' INTO _value USING $1;
    END IF;
    _values[_attnum] :=  COALESCE('"' || replace(replace(_value, '"', '""'), '''', '''''') || '"', ''); 
  END LOOP;
  EXECUTE 'SELECT (' || quote_ident(pg_typeof($1)::text) || ' ''(' || array_to_string(_values,',') || ')'').*' INTO $1; 
  RETURN $1;
END;
$function$;

更新/警告:Erwin指出,这目前没有记录,手册指出不可能以这种方式更改记录。

改用hstore或Pavel的解决方案。

这个简单的基于json的解决方案几乎和hstore一样快,只需要Postgres 9.3或更高版本。如果您不能使用 hstore 扩展,这应该是一个不错的选择,并且性能差异应该可以忽略不计。基准:https://stackoverflow.com/a/28673542/1914376

a( 我们可以通过 cast/concat 内联来做。Json 函数需要 Postgres 9.3:

SELECT json_populate_record( 
     record
    , ('{"'||'key'||'":"'||'new-value'||'"}')::json
);

b( 或使用 Postgres 9.4 中的函数内联。

SELECT json_populate_record (
      record
     ,json_object(ARRAY['key', 'new-value'])
);

注意:我选择了json_object(ARRAY[键,值](,因为它比json_build_object(键,值(快一点:

要隐藏转换细节,您可以在函数中使用 a(,开销很小。

CREATE FUNCTION x.setfield_json(in_element anyelement, key text, value text)
    RETURNS anyelement AS
$BODY$
    SELECT json_populate_record( in_element, ('{"'||key||'":"'||value||'"}')::json);
$BODY$ LANGUAGE sql;

plpgsql(在动态SQL上下文中(之外的"SELECT INTO"与你的预期不同 - 它将查询结果存储到表中。

可以修改任何字段,但并不简单

CREATE OR REPLACE FUNCTION public.setfield(a anyelement, text, text)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
begin
  create temp table aux as select $1.*;
  execute 'update aux set ' || quote_ident($2) || ' = ' || quote_literal($3);
  select into $1 * from aux;
  drop table aux;
  return $1;
end;
$function$

但是这段代码不是很有效 - 不可能在 plpgsql 中写得很好。你可以找到一些C库,这应该可以。

测试设置和基准测试 v2

Erwin 鼓励在这个线程中重现他的基准测试(https://stackoverflow.com/a/7782839/1914376(,所以我用合成测试数据修改了他的代码,并从我的答案中添加了 hstore 解决方案和 json-solution(以及在另一个线程中找到的 Pavel 的 json 解决方案(基准现在作为一个查询运行,更容易捕获结果。

DROP SCHEMA IF EXISTS x CASCADE;
CREATE SCHEMA x;

-- Pavel 1:
--------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION x.setfield(anyelement, text, text)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
begin
  create temp table aux as select $1.*;
  execute 'update aux set ' || quote_ident($2) || ' = ' || quote_literal($3);
  select into $1 * from aux;
  drop table aux;
  return $1;
end;
$function$;

-- Pavel 2 (with patches)
--------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION x.setfield2(anyelement, text, text)
 RETURNS anyelement
 LANGUAGE plpgsql
AS $function$
DECLARE
  _name text;
  _values text[];
  _value text;
  _attnum int;
BEGIN
  FOR _name, _attnum
     IN SELECT a.attname, a.attnum
           FROM pg_catalog.pg_attribute a
          WHERE a.attrelid = (SELECT typrelid
                                 FROM pg_type
                                WHERE oid = pg_typeof($1)::oid)
  LOOP
    IF _name = $2 THEN
      _value := $3;
    ELSE
      EXECUTE 'SELECT (($1).' || quote_ident(_name) || ')::text' INTO _value USING $1;
    END IF;
    _values[_attnum] :=  COALESCE('"' || replace(replace(_value, '"', '""'), '''', '''''') || '"', '');
  END LOOP;
  EXECUTE 'SELECT (' || pg_typeof($1)::text || '''(' || array_to_string(_values,',') || ')'').*' INTO $1;
  RETURN $1;
END;
$function$;

-- Erwin 1
--------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION x.setfield3(anyelement, text, text)
RETURNS anyelement
AS $body$
DECLARE
 _list text;
BEGIN
_list := (
   SELECT string_agg(x.fld, ',')
   FROM   (
      SELECT CASE WHEN a.attname = $2
              THEN quote_literal($3)
              ELSE quote_ident(a.attname)
             END AS fld
      FROM   pg_catalog.pg_attribute a
      WHERE  a.attrelid = (SELECT typrelid
                           FROM   pg_type
                           WHERE  oid = pg_typeof($1)::oid)
      ORDER BY a.attnum
   ) x
);
EXECUTE '
SELECT ' || _list || '
FROM   (SELECT $1.*) x'
USING  $1
INTO   $1;
RETURN $1;
END;
$body$ LANGUAGE plpgsql;

-- Erwin 2
--------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION x.setfield4(INOUT _comp_val anyelement
                                       , _field text, _val text)
  RETURNS anyelement AS
$func$
BEGIN
EXECUTE 'SELECT ' || array_to_string(ARRAY(
      SELECT CASE WHEN attname = _field
                THEN '$2'
                ELSE '($1).' || quote_ident(attname)
             END AS fld
      FROM   pg_catalog.pg_attribute
      WHERE  attrelid = pg_typeof(_comp_val)::text::regclass
      AND    attnum > 0
      AND    attisdropped = FALSE
      ORDER  BY attnum
      ), ',')
USING  _comp_val, _val
INTO   _comp_val;
END
$func$ LANGUAGE plpgsql;

-- Pavel 3: json. (Postgres 9.4)
-- Found here: https://stackoverflow.com/a/28284491/1914376
--------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION x.setfield5(r anyelement, fn text, val text,OUT result anyelement)
 RETURNS anyelement
 LANGUAGE plpgsql
AS $function$
declare jo json;
begin
  jo := (select json_object(array_agg(key), 
                            array_agg(case key when fn then val
                                               else value end)) 
            from json_each_text(row_to_json(r)));
  result := json_populate_record(r, jo);
end;
$function$;

-- Json. Use built-in json functions (Postgres 9.3)
-- This is available from 9.3 since we create json by casting 
-- instead of using json_object/json_build_object only available from 9.4
--------------------------------------------------------------------------------------------------
CREATE FUNCTION x.setfield_json(in_element anyelement, key text, value text)
    RETURNS anyelement AS
$BODY$
    SELECT json_populate_record( in_element, ('{"'||key||'":"'||value||'"}')::json);
$BODY$ LANGUAGE sql;

--------------------------------------------------------------------------------------------------
-- Test setup
--------------------------------------------------------------------------------------------------
-- composite type for tests.
CREATE TYPE x.t_f as (
 id       int
,company  text
,sort     text
,log_up   timestamp
,log_upby smallint
);
-- Create temp table with synthetic test data
DROP TABLE IF EXISTS tmp_f;
CREATE TEMP table tmp_f AS
   SELECT ROW(i, 'company'||i, NULL, NULL, NULL)::x.t_f AS f
   FROM generate_series(1, 5000) S(i);

-- Run the benchmark
DO $$  DECLARE  start_time timestamptz; test_count integer; test_description TEXT; BEGIN
    test_count := 200;
    test_description := 'setfield, Pavel 1: temptable';
    start_time := clock_timestamp();    
    PERFORM x.setfield (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    test_count := 5000;
    test_description := 'setfield2, Pavel 2: reflection';
    start_time := clock_timestamp();
    PERFORM x.setfield2 (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    test_count := 5000;
    test_description := 'setfield3, Erwin 1: reflection';
    start_time := clock_timestamp();
    PERFORM x.setfield3 (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    test_count := 5000;
    test_description := 'setfield4, Erwin 2: reflection';
    start_time := clock_timestamp();
    PERFORM x.setfield4 (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    test_count := 5000;
    test_description := 'setfield5, Pavel 3: json (PG 9.4)';
    start_time := clock_timestamp();
    PERFORM x.setfield5 (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    
    test_count := 5000;
    test_description := 'setfield_json, Geir 1: casting (PG 9.3)';
    start_time := clock_timestamp();
    PERFORM x.setfield_json (f, 'company','new-value-'||md5(random()::text)) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    --json_object(ARRAY(key,value]) is actually faster than json_build_object(key, value)
    test_count := 5000;
    test_description := 'no function/inlined: json_object (PG 9.4)';
    start_time := clock_timestamp();
    PERFORM json_populate_record( f, json_object(ARRAY['company', 'new-value'||md5(random()::text)]  )) FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    test_count := 5000;
    test_description := 'no function/inlined: hstore (PG 9.0)';
    start_time := clock_timestamp();
    PERFORM f #= hstore('company', 'new-value'||md5(random()::text))  FROM tmp_f LIMIT test_count;
    RAISE NOTICE 'Test took: % ms (for % rows) Name: %', extract(MILLISECONDS FROM (clock_timestamp() - start_time))::INTEGER, test_count, test_description;
    
END; $$;

9.4.1、win32、i5-4300U 上的测试结果

NOTICE:  Test took: 1138 ms (for 200 rows) Name: setfield, Pavel 1: temptable
NOTICE:  Test took: 652 ms (for 5000 rows) Name: setfield2, Pavel 2: reflection
NOTICE:  Test took: 364 ms (for 5000 rows) Name: setfield3, Erwin 1: reflection
NOTICE:  Test took: 275 ms (for 5000 rows) Name: setfield4, Erwin 2: reflection
NOTICE:  Test took: 192 ms (for 5000 rows) Name: setfield5, Pavel 3: json (PG 9.4)
NOTICE:  Test took: 23 ms (for 5000 rows) Name: setfield_json, Geir 1: casting (PG 9.3)
NOTICE:  Test took: 25 ms (for 5000 rows) Name: no function/inlined: json_object (PG 9.4)
NOTICE:  Test took: 14 ms (for 5000 rows) Name: no function/inlined: hstore (PG 9.0)

2015 年 3 月更新:
现在基本上已经过时了。通过@Geir更快的变体来考虑新的基准。


测试设置和基准测试

我采用了三个解决方案(2011 年 10 月 16 日之前(并在 PostgreSQL 9.0 上运行了测试。您可以在下面找到完整的设置。仅不包括测试数据,因为我使用的是现实生活中的数据库(不是合成数据(。它全部封装在自己的架构中,以便非侵入性使用。

我想鼓励任何想要重现测试的人。也许使用 postgres 9.1?并在此处添加您的结果?:)

-- DROP SCHEMA x CASCADE;
CREATE SCHEMA x;
-- Pavel 1
CREATE OR REPLACE FUNCTION x.setfield(anyelement, text, text)
RETURNS anyelement
LANGUAGE plpgsql
AS $function$
begin
  create temp table aux as select $1.*;
  execute 'update aux set ' || quote_ident($2) || ' = ' || quote_literal($3);
  select into $1 * from aux;
  drop table aux;
  return $1;
end;
$function$;
-- Pavel 2 (with patches)
CREATE OR REPLACE FUNCTION x.setfield2(anyelement, text, text)
 RETURNS anyelement
 LANGUAGE plpgsql
AS $function$
DECLARE 
  _name text;
  _values text[];
  _value text;
  _attnum int;
BEGIN
  FOR _name, _attnum
     IN SELECT a.attname, a.attnum
           FROM pg_catalog.pg_attribute a 
          WHERE a.attrelid = (SELECT typrelid
                                 FROM pg_type
                                WHERE oid = pg_typeof($1)::oid) 
  LOOP
    IF _name = $2 THEN
      _value := $3;
    ELSE
      EXECUTE 'SELECT (($1).' || quote_ident(_name) || ')::text' INTO _value USING $1;
    END IF;
    _values[_attnum] :=  COALESCE('"' || replace(replace(_value, '"', '""'), '''', '''''') || '"', '');
  END LOOP;
  EXECUTE 'SELECT (' || pg_typeof($1)::text || '''(' || array_to_string(_values,',') || ')'').*' INTO $1; 
  RETURN $1;
END;
$function$;
-- Erwin 1
CREATE OR REPLACE FUNCTION x.setfield3(anyelement, text, text)
RETURNS anyelement
AS $body$
DECLARE
 _list text;
BEGIN
_list := (
   SELECT string_agg(x.fld, ',')
   FROM   (
      SELECT CASE WHEN a.attname = $2
              THEN quote_literal($3)
              ELSE quote_ident(a.attname)
             END AS fld
      FROM   pg_catalog.pg_attribute a 
      WHERE  a.attrelid = (SELECT typrelid
                           FROM   pg_type
                           WHERE  oid = pg_typeof($1)::oid) 
      ORDER BY a.attnum
   ) x
);
EXECUTE '
SELECT ' || _list || '
FROM   (SELECT $1.*) x'
USING  $1
INTO   $1;
RETURN $1;
END;
$body$ LANGUAGE plpgsql;
-- composite type for tests.
CREATE TYPE x.t_f as (
 id       int
,company  text
,sort     text
,log_up   timestamp 
,log_upby smallint
);
-- temp table with real life test data
DROP   TABLE IF EXISTS tmp_f;
CREATE TEMP table tmp_f AS 
   SELECT ROW(firma_id,firma,sort,log_up,log_upby)::x.t_f AS f
   FROM   ef.firma
   WHERE  firma !~~ '"%';
-- SELECT count(*) FROM tmp_f;  -- 5183
-- Quick test: results are identical?
SELECT *,
       x.setfield (f, 'company','test')
      ,x.setfield2(f, 'company','test')
      ,x.setfield3(f, 'company','test')
 FROM tmp_f
LIMIT 10;

基准

我运行了几次查询来填充缓存。呈现的结果是五个总运行时中最好的,EXPLAIN ANALYZE .

1000 行的圆形

Pavel的第一个原型最大化了共享内存,具有更多行。

铺路 1: 2445.112 ms

SELECT x.setfield (f, 'company','test') FROM tmp_f limit 1000;

帕维尔 2: 263.753 ms

SELECT x.setfield2(f, 'company','test') FROM tmp_f limit 1000;

欧文 1: 120.671 毫秒

SELECT x.setfield3(f, 'company','test') FROM tmp_f limit 1000;

另一个包含 5183 行的测试。

帕维尔 2: 1327.429 ms

SELECT x.setfield2(f, 'company','test') FROM tmp_f;

欧文1: 588.691 毫秒

SELECT x.setfield3(f, 'company','test') FROM tmp_f;

最新更新