Psycopg3:COPY语句中的NaN到null转换



在psycopg2中,可以使用以下适配器将NaN值转换为null:

__REGISTERED = False

def _nan_to_null(f,
_NULL=psycopg2.extensions.AsIs('NULL'),
_Float=psycopg2.extensions.Float):
if not np.isnan(f):
return _Float(f)
return _NULL

def register_nan_adapter():
global __REGISTERED
if not __REGISTERED:
print('Register nan to null adapter for psycopg2...')
psycopg2.extensions.register_adapter(float, _nan_to_null)
__REGISTERED = True
else:
print('nan to null adapter for psycopg2 is already registered!')

如何在psycopg3中实现相同的目标?

文档说你可以为每个数据类型创建一个转储程序,所以我一开始是这样做的:

class NullNan(FloatDumper):
def dump(self, elem):
if np.isnan(elem):
return b"NULL"
else:
return super().dump(elem)

connection.adapters.register_dumper(float, NullNan)

但这并不奏效。只返回"无"也无济于事。

Adrian Clavier回答后更新:

看起来,让dumper返回None对于插入和更新来说是可以的,但对于COPY:来说是不起作用的

class NullNan(FloatDumper):
def dump(self, elem):
if np.isnan(elem):
return None
else:
return super().dump(elem)
with cursor.copy(f"COPY _r ({col_names_str}) FROM STDIN") as copy:
for index, row in df.iterrows():
rec = row.values.tolist()
copy.write_row(rec)

导致异常:

psycopg.errors.QueryCanceled: COPY from stdin failed: error from Python: TypeError - expected string or bytes-like object

嗯,那比我想象的要容易。事实上并不像两次更新所证明的那样。

import numpy as np
import psycopg
from psycopg.types.numeric import FloatDumper
class NanDumper(FloatDumper):
def dump(self, obj):
if np.isnan(obj):
return None
else:
return super().dump(obj)
con = psycopg.connect(dbname="test", host='localhost', user='postgres', port=5432)
cur = con.cursor()
cur.adapters.register_dumper(float, NanDumper)
cur.execute("select %s::float", [np.NaN]).fetchone()                                                                                                                      
(None,)

cur.execute("select %s::float", [3.68]).fetchone()                                                                                                                        
(3.68,)
d float_test 
Table "public.float_test"
Column   |       Type       | Collation | Nullable | Default 
-----------+------------------+-----------+----------+---------
id        | integer          |           |          | 
float_fld | double precision |           |          | 
cur.execute("insert into float_test(id, float_fld) values(%s,  %s)", [1, np.NaN])
cur.execute("insert into float_test(id, float_fld) values(%s,  %s)", [2, 4.56])
con.commit()
select * from float_test;
id | float_fld 
----+-----------
1 |      NULL
2 |      4.56

使用COPY更新。部分解决方案。

data = [(5, 8.92), (6, np.NaN)]
with cur.copy("COPY float_test (id, float_fld) FROM STDIN  WITH (FORMAT BINARY)") as copy:
copy.set_types(["int4", "float8"])
for row in data:
copy.write_row(row)
con.commit()
select * from float_test;
id | float_fld 
----+-----------
5 |      8.92
6 |       NaN

NaN是每个数字类型的有效"数字"。不确定为什么NanDumper转换没有被拾取。

更新#2。找到神奇的酱汁:

from psycopg.types.numeric import FloatBinaryDumper
class NanBinaryDumper(FloatBinaryDumper):
def dump(self, obj):
if np.isnan(obj):
return None
else:
return super().dump(obj)
con = psycopg.connect(dbname="test", host='localhost', user='postgres', port=5432)
cur = con.cursor()
cur.adapters.register_dumper(float, NanBinaryDumper)
with cur.copy("COPY float_test (id, float_fld) FROM STDIN WITH (FORMAT BINARY)") as copy:
copy.set_types(["int4", "float8"])
for row in data:
copy.write_row(row)
con.commit()
select * from float_test;
id | float_fld 
----+-----------
5 |      8.92
6 |      NULL

最新更新