我写了一个脚本,使用均方根比较来比较一组巨大的图像(超过4500个文件)。起初,它将每个图像的大小调整为800x600,并获取直方图。然后,它构建一个组合数组,并将它们均匀地分布到四个线程中,这些线程计算每个组合的均方根。RMS低于500的图像将被移动到文件夹中,以便稍后手动排序。
#!/usr/bin/python3
import sys
import os
import math
import operator
import functools
import datetime
import threading
import queue
import itertools
from PIL import Image
def calc_rms(hist1, hist2):
return math.sqrt(
functools.reduce(operator.add, map(
lambda a, b: (a - b) ** 2, hist1, hist2
)) / len(hist1)
)
def make_histogram(imgs, path, qout):
for img in imgs:
try:
tmp = Image.open(os.path.join(path, img))
tmp = tmp.resize((800, 600), Image.ANTIALIAS)
qout.put([img, tmp.histogram()])
except Exception:
print('bad image: ' + img)
return
def compare_hist(pairs, path):
for pair in pairs:
rms = calc_rms(pair[0][1], pair[1][1])
if rms < 500:
folder = 'maybe duplicates'
if rms == 0:
folder = 'exact duplicates'
try:
os.rename(os.path.join(path, pair[0][0]), os.path.join(path, folder, pair[0][0]))
except Exception:
pass
try:
os.rename(os.path.join(path, pair[1][0]), os.path.join(path, folder, pair[1][0]))
except Exception:
pass
return
def get_time():
return datetime.datetime.now().strftime("%H:%M:%S")
def chunkify(lst, n):
return [lst[i::n] for i in range(n)]
def main(path):
starttime = get_time()
qout = queue.Queue()
images = []
for img in os.listdir(path):
if os.path.isfile(os.path.join(path, img)):
images.append(img)
imglen = len(images)
print('Resizing ' + str(imglen) + ' Images ' + starttime)
images = chunkify(images, 4)
threads = []
for x in range(4):
threads.append(threading.Thread(target=make_histogram, args=(images[x], path, qout)))
[x.start() for x in threads]
[x.join() for x in threads]
resizetime = get_time()
print('Done resizing ' + resizetime)
histlist = []
for i in qout.queue:
histlist.append(i)
if not os.path.exists(os.path.join(path, 'exact duplicates')):
os.makedirs(os.path.join(path, 'exact duplicates'))
if not os.path.exists(os.path.join(path, 'maybe duplicates')):
os.makedirs(os.path.join(path, 'maybe duplicates'))
combinations = []
for img1, img2 in itertools.combinations(histlist, 2):
combinations.append([img1, img2])
combicount = len(combinations)
print('Going through ' + str(combicount) + ' combinations of ' + str(imglen) + ' Images. Please stand by')
combinations = chunkify(combinations, 4)
threads = []
for x in range(4):
threads.append(threading.Thread(target=compare_hist, args=(combinations[x], path)))
[x.start() for x in threads]
[x.join() for x in threads]
print('nstarted at ' + starttime)
print('resizing done at ' + resizetime)
print('went through ' + str(combicount) + ' combinations of ' + str(imglen) + ' Images')
print('all done at ' + get_time())
if __name__ == '__main__':
main(sys.argv[1]) # sys.argv[1] has to be a folder of images to compare
这是有效的,但在15到20分钟内完成调整大小后,比较会持续数小时。起初,我假设这是一个锁定队列,工作人员从中获得他们的组合,所以我用预定义的数组块替换了它。这并没有减少执行时间。我还运行了它,没有移动文件,以排除可能的硬盘驱动器问题。
使用cProfile对此进行分析可提供以下输出。
Resizing 4566 Images 23:51:05
Done resizing 00:05:07
Going through 10421895 combinations of 4566 Images. Please stand by
started at 23:51:05
resizing done at 00:05:07
went through 10421895 combinations of 4566 Images
all done at 03:09:41
10584539 function calls (10584414 primitive calls) in 11918.945 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
16/1 0.001 0.000 11918.945 11918.945 {built-in method exec}
1 2.962 2.962 11918.945 11918.945 imcomp.py:3(<module>)
1 19.530 19.530 11915.876 11915.876 imcomp.py:60(main)
51 11892.690 233.190 11892.690 233.190 {method 'acquire' of '_thread.lock' objects}
8 0.000 0.000 11892.507 1486.563 threading.py:1028(join)
8 0.000 0.000 11892.507 1486.563 threading.py:1066(_wait_for_tstate_lock)
1 0.000 0.000 11051.467 11051.467 imcomp.py:105(<listcomp>)
1 0.000 0.000 841.040 841.040 imcomp.py:76(<listcomp>)
10431210 1.808 0.000 1.808 0.000 {method 'append' of 'list' objects}
4667 1.382 0.000 1.382 0.000 {built-in method stat}
完整的探查器输出可以在这里找到。
考虑到第四行,我猜测线程在某种程度上锁定了。但是,不管图像的数量如何,为什么要精确地进行51次呢?
我在Windows 7 64位上运行这个。
提前谢谢。
一个主要问题是,您使用线程来执行至少部分CPU绑定的工作。由于全局解释器锁,一次只能运行一个CPython线程,这意味着您不能利用多个CPU内核。这将使CPU绑定任务的多线程性能充其量与单核执行没有什么不同,甚至可能更糟,因为线程增加了额外的开销。这在threading
文档中有说明:
CPython实现细节:在CPython中,由于解释器锁,一次只能有一个线程执行Python代码(即使某些面向性能的库可能会克服此限制)。如果你想让你的应用程序更好地利用多核机器的计算资源,建议您使用CCD_ 2。然而,线程仍然是一种合适的模式如果要同时运行多个I/O绑定任务。
为了绕过GIL的局限性,您应该按照文档中所说的去做,并使用multiprocessing
库而不是threading
库:
import multiprocessing
...
qout = multiprocessing.Queue()
for x in range(4):
threads.append(multiprocessing.Process(target=make_histogram, args=(images[x], path, qout)))
...
for x in range(4):
threads.append(multiprocessing.Process(target=compare_hist, args=(combinations[x], path)))
正如您所看到的,multiprocessing
在很大程度上是threading
的替代品,因此更改应该不会太难。唯一复杂的是,如果你在流程之间传递的任何论点都是不可选择的,尽管我认为所有这些论点都是你的情况。IPC在进程之间发送Python数据结构的成本也会增加,但我怀疑真正并行计算的好处将超过额外的开销。
尽管如此,由于依赖于对磁盘的读/写操作,您可能在某种程度上仍受I/O限制。并行化不会使您的磁盘I/O更快,因此在那里没有什么可做的。
要比较4500个图像,我建议在文件级别进行多处理,而不一定在图像中进行多线程处理。正如@dano所指出的那样,GIL将阻碍这一进程。我的策略是:
- 每个核心(或配置的数量)一个工作进程
- 一个编排过程,从上面分叉;做一些IPC来协调工人的工作
(简要地)查看您的代码,看起来它将受益于懒惰的语言;我不认为这会使比较短路。例如,如果对图像的每个片段进行RMS比较,那么一旦确定块之间的差异足够大,就可以在结束比较后停止比较。然后,您可能还需要更改块的迭代方式,以及块的大小/形状。
除此之外,我会考虑寻找更便宜的机制,避免做一些平方根;可能使用创建"近似"平方根的东西,也可能使用查找表。
如果我没有错的话,你也可以创建一个中间形式(直方图),你应该暂时保留。无需保存800x600图像。
此外,了解你在这项练习中"平等"的意思也会很有用。