我正在进行批量数据处理,为了速度起见,我首先将数据存储在redis中,然后每隔2分钟将其转储到postgresql数据库中。我使用了redis散列来存储数据,甚至redis中的散列键也与数据库表中的列相对应。
Am使用redis.scan()
来获得存储数据行的redis哈希列表,然后使用redis.hgetall()
来获得哈希中的数据。从那里,我在SqlAlchemy中创建了一个SQLInsert语句,并向数据库中执行批量数据插入。
我面临的问题是,我必须首先使用SCAN:提取包含我的数据的密钥
redis_match = 'data:row_keys:*'
row_keys = rdb.scan_iter(match=redis_match, count=limit_no)
从那里我获取每个散列中的所有数据:
for index, row_id in enumerate(row_keys):
row_data = rdb.hgetall(row_id)
row_data是key:value
的形式,但它以byte
的形式存储,因此使用手动解码每个密钥和值会产生额外的开销
for key, value in row_data.items():
key = ast.literal_eval(key.decode('unicode_escape'))
value = ast.literal_eval(value.decode('unicode_escape'))
我觉得这太过分了,必须有一种更优雅的方式:
- 使用hgetall()从redis获取数据并能够使用这些数据立即进行大容量SQL插入,因为redis哈希中的键与postgresql表中的列名相对应
- 即使1不可能,至少必须有一种更快的方法使用hgetall()从redis获取数据,并实时执行一些操作解码整个条目,即哈希中的所有条目,而不是迭代到每个键和值
编辑:
我读过postgresql的Foreign Data Wrappers,尤其是redis_fdw,我想知道它是否能解决我的问题,即以尽可能少的麻烦将数据从redis移动到postgresql
redis_fdw
就是一条路。请记住,哈希集的每个成员在对应的Pg外部表中不会是不同的行。相反,它将在外部表中为每个Redis哈希创建一行,并为所有哈希值使用Pg数组。
例如,对于Redis中的以下哈希:
myhash = {a:1, b:2}
您可以创建外部表:
CREATE FOREIGN TABLE my_pg_hash (key text, val text[])
SERVER redis_server
OPTIONS (database '0', tabletype 'hash', tablekeyprefix 'myhash');
外部表my_pg_hash
将包含整个Redis哈希集myhash
的一行。此行将有一个postgres数组作为关键字myhash
和值,其中包含redis哈希的所有关键字/值对。
SELECT * FROM my_pg_hash;
key | val
----------+-----------
myhash | {a,1,b,2}
(1 row)
您可以使用Pg的untest()函数将val数组拆分为单独的行:
SELECT key, unnest(val) FROM my_pg_hash;
key | unnest
--------+--------
myhash | a
myhash | 1
myhash | b
myhash | 2
(4 rows)