我正在尝试使用熊猫执行csv数据解析。read_csv(bytes, chunksize=n),其中bytes是我想从数据库CLOB字段接收的正在进行的数据流,按块读取。
reader = pandas.read_csv(io.BytesIO(b'1;qwern2;asdfn3;zxcv'), sep=';', chunksize=2)
for row_chunk in reader:
print(row_chunk)
上面的代码工作良好,但我想使用一些可更新的流而不是固定的io.BytesIO(b'...')
我尝试像这样重新定义read方法
class BlobIO(io.BytesIO):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._chunk_size = 4
self._file_data_table = 'my_table'
self._job_id = 'job_id'
self._get_raw_sql = """
select dbms_lob.substr(body, {0}, {1})
from {2}
where job_id = '{3}'
"""
dsn_tns = cx_Oracle.makedsn('host', 'port', 'service_name')
self.ora_con = cx_Oracle.connect('ora_user', 'ora_pass', dsn_tns)
self.res = b''
self.ora_cur = self.ora_con.cursor()
self.chunker = self.get_chunk()
next(self.chunker)
def get_chunk(self):
returned = 0
sended = (yield)
self._chunk_size = sended or self._chunk_size
while True:
to_exec = self._get_raw_sql.format(
self._chunk_size,
returned + 1,
self._file_data_table,
self._job_id)
self.ora_cur.execute(to_exec)
self.res = self.ora_cur.fetchall()[0][0]
returned += self._chunk_size
yield self.res
sended = (yield self.res)
self._chunk_size = sended or self._chunk_size
if not self.res:
break
def read(self, nbytes=None):
if nbytes:
self.chunker.send(nbytes)
else:
self.chunker.send(self._chunk_size)
try:
to_return = next(self.chunker)
except StopIteration:
self.ora_con.close()
to_return = b''
return to_return
buffer = BlobIO()
reader = pandas.read_csv(buffer, encoding='cp1251', sep=';', chunksize=2)
但是看起来我做了一些完全错误的事情,因为pd.read_csv
从未在最后一行执行,我不明白那里发生了什么。
也许创建buffer = BytesIO(b'')
,然后写入新的数据到缓冲区buffer.write(new_chunk_from_db)
可能是一个更好的方法,但我不明白什么时候我应该调用这样的写操作。
我相信我可以用CLOB的内容创建一个临时文件,然后将其传递给read_csv
,但我真的想跳过这一步,直接从数据库读取数据。
请给我指路。
cx_Oracle提供本机方式读取lob。似乎用cx_Oracle LOB read重写BytesIO read完成了工作:
class BlobIO(BytesIO):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.res = b''
self.ora_con = db.get_conn()
self.ora_cur = self.ora_con.cursor()
self.ora_cur.execute("select lob from table")
self.res = self.ora_cur.fetchall()[0][0]
self.offset = 1
def read(self, size=None):
r = self.res.read(self.offset, size)
self.offset += size
# size + 1 should be here to perform nonoverlaping reads
# but looks like panadas C parser uses some kind of overlaping
# because while testing size+1 - parser occasionally missed some bytes
if not r:
self.ora_cur.close()
self.ora_con.close()
return r
blob_buffer = BlobIO()
reader = pandas.read_csv(
blob_buffer,
chunksize=JobContext.rchunk_size)
for row_chunk in reader:
print(row_chunk)