我有一个非常大的json对象,我想将其转储到pickle文件中。有一种方法来显示进度条,而使用pickle.dump
?
您可以在读取文件时监视文件的进度(pickle是否在解码之前读取整个文件是另一个问题)。
class TQDMBytesReader(object):
def __init__(self, fd, **kwargs):
self.fd = fd
from tqdm import tqdm
self.tqdm = tqdm(**kwargs)
def read(self, size=-1):
bytes = self.fd.read(size)
self.tqdm.update(len(bytes))
return bytes
def readline(self):
bytes = self.fd.readline()
self.tqdm.update(len(bytes))
return bytes
def __enter__(self):
self.tqdm.__enter__()
return self
def __exit__(self, *args, **kwargs):
return self.tqdm.__exit__(*args, **kwargs)
示例用法:def test_tqdm_reader():
from pickle import Unpickler
with open("/tmp/list.pkl", "rb") as fd:
total = os.path.getsize("/tmp/list.pkl")
with TQDMBytesReader(fd, total=total) as pbfd:
up = Unpickler(pbfd)
obj = up.load()
print(f"Loaded {str(obj)}")
我知道的唯一方法是定义getstate/setstate方法来返回"子对象",当pickle/unpickle时可以刷新GUI。例如,如果你的对象是一个列表,你可以这样写:
import pickle
class SubList:
on_pickling = None
def __init__(self, sublist):
print('SubList', sublist)
self.data = sublist
def __getstate__(self):
if SubList.on_pickling is not None:
print('SubList pickle state fetch: calling sub callback')
SubList.on_pickling()
return self.data
def __setstate__(self, obj):
if SubList.on_pickling is not None:
print('SubList pickle state restore: calling sub callback')
SubList.on_pickling()
self.data = obj
class ListSubPickler:
def __init__(self, data: list):
self.data = data
def __getstate__(self):
print('creating SubLists for pickling long list')
num_chunks = 10
span = int(len(self.data) / num_chunks)
SubLists = [SubList(self.data[i:(i + span)]) for i in range(0, len(self.data), span)]
return SubLists
def __setstate__(self, subpickles):
self.data = []
print('restoring Pickleable(list)')
for subpickle in subpickles:
self.data.extend(subpickle.data)
print('final', self.data)
def refresh():
# do something: refresh GUI (for example, qApp.processEvents() for Qt), show progress, etc
print('refreshed')
如果在该脚本中运行以下命令,
data = list(range(100)) # your large data object
list_pickler = ListSubPickler(data)
SubList.on_pickling = refresh
print('ndumping pickle of', list_pickler)
pickled = pickle.dumps(list_pickler)
print('nloading from pickle')
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data
print('nloading from pickle, without on_pickling')
SubList.on_pickling = None
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data
您将看到刷新回调被调用了10次。因此,如果您有2GB的列表要转储,并且转储需要1分钟,那么您将需要大约60*10 = 600次GUI刷新,然后将块的数量设置为600。
代码很容易修改为字典,numpy数组等。
所以我找到了一个解决方案,我觉得很满意,虽然它不是100%准确,但到目前为止,我发现这是完全不引人注目的。
我的方法的基本思想非常简单:当pickle转储或加载文件时,我将文件大小与正在转储/加载的数据大小进行比较。然而,实现有点棘手。
为了持续检查文件大小,你需要线程(据我所知是)。我使用PyQt5的QThread,因为我的应用程序已经在PyQt5上运行了,但可能还有其他方法可以做到这一点。
需要两个worker在两个对应的线程中运行。第一个worker处理文件的转储/加载,而另一个worker检查文件大小。像这样:
import os
import time
import pickle
import numpy as np
import psutil
from PyQt5.QtWidgets import (
QDialog, QProgressBar, QVBoxLayout, QWidget, QPushButton, QApplication,
QTextEdit
)
from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject, QThread
from PyQt5.QtGui import QTextCursor
from rememberme import memory
class Worker(QObject):
status_update = pyqtSignal(str)
progress_update = pyqtSignal(float)
finished = pyqtSignal(object)
def __init__(self, file, data, process):
super().__init__()
self._isRunning = True
self._success = False
self.return_value = False
self.file = file
self.data = data
self.process = process
def run(self):
self.status_update.emit(f'n{self.process.title()}ing file...')
if self.process == 'sav':
with open(self.file, 'wb') as output_file:
pickle.dump(self.data, output_file, pickle.HIGHEST_PROTOCOL)
else:
with open(self.file, 'rb') as input_file:
self.return_value = pickle.load(input_file)
self.status_update.emit('done.')
self.progress_update.emit(100)
self._success = True
self.finished.emit(self.return_value)
def stop(self):
self._isRunning = False
if self._success:
self.status_update.emit(f' File {self.process}ed.')
else:
self.status_update.emit(
f' {self.process.title()}ing process canceled.'
)
class SizeChecker(QObject):
progress_update = pyqtSignal(float)
def __init__(self, target_size, file, process):
super().__init__()
self.file = file
self.process = process
self.target_size = target_size
self._isRunning = True
def run(self):
if self.process == 'sav':
while self._isRunning:
time.sleep(0.01)
progress = os.path.getsize(self.file) / self.target_size * 100
self.progress_update.emit(progress)
else:
while self._isRunning:
time.sleep(0.01)
process = psutil.Process(os.getpid()).memory_info().rss
progress = (process - self.file) / self.target_size * 100
self.progress_update.emit(progress)
def stop(self):
self._isRunning = False
class Progress(QDialog):
def __init__(self):
super().__init__()
self.progress = QProgressBar()
lay = QVBoxLayout(self)
lay.addWidget(self.progress)
class Widget(QWidget):
def __init__(self, parent=None):
super(Widget, self).__init__(parent)
dump_btn = QPushButton("dump")
dump_btn.clicked.connect(lambda: self.handle('sav'))
load_btn = QPushButton("load")
load_btn.clicked.connect(lambda: self.handle('load'))
self.status = QTextEdit()
self.file = 'test'
self.data = [np.full(1000, 1000) for _ in range(500000)] # some arbitrary data
self.popup = None
self.worker_thread = QThread()
self.worker = None
self.checker_thread = QThread()
self.size_checker = None
lay = QVBoxLayout(self)
lay.addWidget(dump_btn)
lay.addWidget(load_btn)
lay.addWidget(self.status)
lay.addStretch()
@pyqtSlot()
def handle(self, process):
self.popup = Progress()
self.popup.setWindowTitle(f'{process.title()}ing data...')
self.popup.finished.connect(self.finish_process)
self.popup.show()
data = self.data if process == 'sav' else None
self.worker = Worker(self.file, data, process)
if process == 'sav':
target_size = memory(self.data)
file = self.file
else:
target_size = os.path.getsize(self.file)
file = psutil.Process(os.getpid()).memory_info().rss
self.size_checker = SizeChecker(target_size, file, process)
self.size_checker.moveToThread(self.checker_thread)
self.size_checker.progress_update.connect(self.update_progress)
self.checker_thread.started.connect(self.size_checker.run)
self.checker_thread.start()
self.worker.moveToThread(self.worker_thread)
self.worker.status_update.connect(self.report_status)
self.worker.progress_update.connect(self.update_progress)
self.worker.finished.connect(self.finish_process)
self.worker_thread.started.connect(self.worker.run)
self.worker_thread.start()
def finish_process(self):
self.size_checker.stop()
self.size_checker.progress_update.disconnect(self.update_progress)
self.checker_thread.started.disconnect(self.size_checker.run)
self.size_checker = None
self.checker_thread.terminate()
self.worker.stop()
self.worker.status_update.disconnect(self.report_status)
self.worker.progress_update.disconnect(self.update_progress)
self.worker.finished.disconnect(self.finish_process)
self.worker_thread.started.disconnect(self.worker.run)
self.worker = None
self.worker_thread.terminate()
self.popup.finished.disconnect(self.finish_process)
if self.popup.isVisible():
self.popup.close()
def update_progress(self, value):
self.popup.progress.setValue(value)
def report_status(self, text):
self.status.insertPlainText(text)
self.status.moveCursor(QTextCursor.End)
if __name__ == '__main__':
import sys
app = QApplication(sys.argv)
w = Widget()
w.resize(640, 480)
w.show()
sys.exit(app.exec_())
在本例中,您可以看到,在将数据转储到文件的情况下,我使用了这里解释的memorme工具的memory
函数,以便了解分配给数据的内存大小。然后,我不断地将写入数据的文件大小与该数字进行比较,并给出百分比。正如我所说,这不是100%准确的,因为文件大小和分配的RAM相差几个百分点,但它通常是足够好的。
如果加载了一个文件,那就更棘手了。在加载文件之前,文件大小以及整个Python进程分配的RAM(如这里所述)被存储为引用。然后,在加载过程中,将Python进程初始分配的RAM与当前RAM之间的差值与文件大小进行比较。同样,这不是100%准确,但通常足够接近。
我相信有更熟练的人可以改进这种方法,但我认为总体思路很好。