如何将PostgreSQL查询中的数据流传输到拼花文件中



我有下面的代码,它查询一个大约有500k行的数据库。并且当它命中CCD_ 1时抛出SIGKILL。我尝试过遍历光标,而不是将其全部加载到行中,但这似乎仍然会导致OOM问题。

如何从数据库中获取所有数据,并将其安全地转换为拼花地板文件,而不管表的大小?

def get_parquet_for_dataset_id(self, dataset, lob, max_dt):
query = _table_query(lob, table_name, max_dt)
conn = self.conns[lob]
with conn:
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute(query)
rows = cur.fetchall()
table = rows_to_table(rows)
pq_bytes = io.BytesIO()
pq.write_table(table, pq_bytes)
_ = pq_bytes.seek(0)
return pq_bytes;

这里有一种方法,它使用psycopg2、服务器端游标和Pandas来批处理/块化PostgreSQL查询结果,并将它们写入一个拼花文件,而不需要一次将所有结果都存储在内存中。

import itertools
import pandas as pd
import psycopg2
import pyarrow as pa
import pyarrow.parquet as pq
def get_schema_and_batches(query, chunk_size):
def _batches():
with 
psycopg2.connect("host=localhost dbname=postgres user=postgres password=password") as conn, 
conn.cursor(name='my-cursor') as cur:
cur.itersize = chunk_size
cur.arraysize = chunk_size
cur.execute(query)
while True:
batch_rows = cur.fetchmany()
column_names = tuple(col[0] for col in cur.description)
batch = pa.RecordBatch.from_pandas(pd.DataFrame(batch_rows, columns=column_names), preserve_index=False)
if not batch:
break
yield batch
# Faffy to infer the schema from the first batch
# Could be simplified if we knew it ahead of time
batches = iter(_batches())
first_batch = next(batches)
return first_batch.schema, itertools.chain((first_batch,), batches)
query = 'SELECT * FROM generate_series(0, 100000) AS s(my_col)'
schema, batches = get_schema_and_batches(query, chunk_size=10000)
with pq.ParquetWriter('example.parquet', schema=schema) as writer:
for batch in batches:
writer.write_batch(batch)

服务器端游标,请参阅此处:

执行数据库查询时,Psycopg游标通常会获取后端返回的所有记录,并将它们传输到客户端进程。如果查询返回了大量数据,则客户端将按比例分配大量内存。

如果数据集太大,无法在客户端实际处理,则可以创建服务器端游标。使用这种游标,可以只向客户端传输控制量的数据,这样就可以在不将大型数据集完全保存在内存中的情况下对其进行检查。

这里有一种方法,它使用streampq(完全公开:由我编写(和Pandas来批处理/组块PostgreSQL查询结果,并将它们写入一个镶木地板文件,而所有结果都不在内存中。

请注意,这并不像使用psycopg2的答案那样使用服务器端游标。您可能希望避免使用服务器端游标的一个原因是,有些查询使用它们可能会更慢。

import itertools
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from streampq import streampq_connect
def get_schema_and_batches(sql, chunk_size):
def _batches():
with streampq_connect((('host','localhost'), ('dbname','postgres'), ('user', 'postgres'), ('password','password'))) as query:
for (columns, rows) in query(sql):
rows_it = iter(rows)
while True:
batch = pa.RecordBatch.from_pandas(
pd.DataFrame(itertools.islice(rows_it, chunk_size), columns=columns), preserve_index=False,
)
if not batch:
break
yield batch
# Faffy to infer the schema from the first batch
# Could be simplified if we knew it ahead of time
batches = iter(_batches())
first_batch = next(batches)
return first_batch.schema, itertools.chain((first_batch,), batches)
sql = 'SELECT * FROM generate_series(0, 100000) AS s(my_col)'
schema, batches = get_schema_and_batches(sql, chunk_size=10000)
with pq.ParquetWriter('example.parquet', schema=schema) as writer:
for batch in batches:
writer.write_batch(batch)

最新更新