我使用的一个用于图形的应用程序有一个嵌入式Python解释器-它的工作原理与任何其他Python解释器完全相同,除了一些特殊的对象。
基本上,我正在尝试使用Python来下载一堆图像并进行其他网络和磁盘I/O。如果我这样做没有多线程,我的应用程序将冻结(即视频停止播放),直到下载完成。
为了解决这个问题,我试图使用多线程。但是,我不能触摸任何主进程。
我写了这个代码。对程序唯一的部分进行注释。me.store
/me.fetch
基本上是一种获得全局变量的方法。op('files')
为全局表
这是两个东西,"在主进程中",只能以线程安全的方式触摸。我不确定我的代码是否这样做。
我将感谢任何输入,为什么或(为什么不)这段代码是线程安全的,以及我如何能够绕过线程安全的方式访问全局变量。
我担心的一件事是counter
如何被许多线程多次获取。由于它只在文件写入后更新,这可能会导致竞争条件,不同的线程访问具有相同值的计数器(然后不正确存储增加的值)。或者,如果磁盘写入失败,计数器会发生什么。
from urllib import request
import threading, queue, os
url = 'http://users.dialogfeed.com/en/snippet/dialogfeed-social-wall-twitter-instagram.json?api_key=ac77f8f99310758c70ee9f7a89529023'
imgs = [
'http://search.it.online.fr/jpgs/placeholder-hollywood.jpg.jpg',
'http://www.lpkfusa.com/Images/placeholder.jpg',
'http://bi1x.caltech.edu/2015/_images/embryogenesis_placeholder.jpg'
]
def get_pic(url):
# Fetch image data
data = request.urlopen(url).read()
# This is the part I am concerned about, what if multiple threads fetch the counter before it is updated below
# What happens if the file write fails?
counter = me.fetch('count', 0)
# Download the file
with open(str(counter) + '.jpg', 'wb') as outfile:
outfile.write(data)
file_name = 'file_' + str(counter)
path = os.getcwd() + '\' + str(counter) + '.jpg'
me.store('count', counter + 1)
return file_name, path
def get_url(q, results):
url = q.get_nowait()
file_name, path = get_pic(url)
results.append([file_name, path])
q.task_done()
def fetch():
# Clear the table
op('files').clear()
results = []
url_q = queue.Queue()
# Simulate getting a JSON feed
print(request.urlopen(url).read().decode('utf-8'))
for img in imgs:
# Add url to queue and start a thread
url_q.put(img)
t = threading.Thread(target=get_url, args=(url_q, results,))
t.start()
# Wait for threads to finish before updating table
url_q.join()
for cell in results:
op('files').appendRow(cell)
return
# Start a thread so that the first http get doesn't block
thread = threading.Thread(target=fetch)
thread.start()
你的代码看起来一点都不安全。重点:
- 添加到
results
是不安全的——两个线程可能试图同时添加到列表。 - 访问和设置
counter
是不安全的——一个线程在另一个线程设置新的counter
值之前获取counter
。 - 传递url队列是多余的——只需为每个作业传递一个新的url。
另一种方式(concurrent.futures
)
既然你正在使用python 3,为什么不使用并发呢?Futures模块,它使您的任务更容易管理。下面我用一种不需要显式同步的方式写出了你的代码——所有的工作都由futures模块处理。
from urllib import request
import os
import threading
from concurrent.futures import ThreadPoolExecutor
from itertools import count
url = 'http://users.dialogfeed.com/en/snippet/dialogfeed-social-wall-twitter-instagram.json?api_key=ac77f8f99310758c70ee9f7a89529023'
imgs = [
'http://search.it.online.fr/jpgs/placeholder-hollywood.jpg.jpg',
'http://www.lpkfusa.com/Images/placeholder.jpg',
'http://bi1x.caltech.edu/2015/_images/embryogenesis_placeholder.jpg'
]
def get_pic(url, counter):
# Fetch image data
data = request.urlopen(url).read()
# Download the file
with open(str(counter) + '.jpg', 'wb') as outfile:
outfile.write(data)
file_name = 'file_' + str(counter)
path = os.getcwd() + '\' + str(counter) + '.jpg'
return file_name, path
def fetch():
# Clear the table
op('files').clear()
with ThreadPoolExecutor(max_workers=2) as executor:
count_start = me.fetch('count', 0)
# reserve these numbers for our tasks
me.store('count', count_start + len(imgs))
# separate fetching and storing is usually not thread safe
# however, if only one thread modifies count (the one running fetch) then
# this will be safe (same goes for the files variable)
for cell in executor.map(get_pic, imgs, count(count_start)):
op('files').appendRow(cell)
# Start a thread so that the first http get doesn't block
thread = threading.Thread(target=fetch)
thread.start()
如果多个线程修改count,那么在修改count时应该使用锁。
。
lock = threading.Lock()
def fetch():
...
with lock:
# Do not release the lock between accessing and modifying count.
# Other threads wanting to modify count, must use the same lock object (not
# another instance of Lock).
count_start = me.fetch('count', 0)
me.store('count', count_start + len(imgs))
# use count_start here
唯一的问题是,如果一个作业因为某种原因失败了,那么你会得到一个丢失的文件号。任何引发的异常也会中断执行器的映射,通过在那里重新引发异常——这样你就可以在需要的时候做一些事情。
您可以通过使用tempfile
模块在将文件移动到某个永久位置之前找到临时存储文件的位置来避免使用计数器。
如果你是python多线程的新手,记得看看multiprocessing
和threading
。
你的代码看起来不错,虽然代码风格不是很容易阅读。您需要运行它,看看它是否像您期望的那样工作。
with
将确保您的锁被释放。acquire()方法将在进入块时被调用,release()方法将在块退出时被调用。
如果你添加更多的线程,确保他们不使用相同的地址队列和没有竞争条件(似乎是由Queue.get()
完成的,但你需要运行它来验证)。请记住,每个线程共享同一个进程,因此几乎所有内容都是共享的。您不希望两个线程处理相同的address
Lock
根本不做任何事情。你只有一个线程调用download_job
——那就是你分配给my_thread
的线程。另一个线程,即主线程,调用offToOn
,并在到达该函数结束时立即结束。所以没有第二个线程试图获取锁,因此没有第二个线程被阻塞。显然,您提到的表位于您显式打开和关闭的文件中。如果操作系统保护这个文件不被不同的程序同时访问,你就可以这样做;否则,它肯定是不安全的,因为您没有完成任何线程同步。
正确的线程同步要求不同的线程可以访问同一个锁;也就是说,一个锁可以被多个线程访问。还要注意,"线程"不是"进程"的同义词。Python两者都支持。如果你真的想避免访问主进程,你必须使用multiprocessing模块启动和管理第二个进程。
这段代码永远不会退出,因为总是有一个线程在无限循环中运行(在threader
中)。
以线程安全的方式访问资源需要这样做:
a_lock = Lock()
def use_resource():
with a_lock:
# do something
锁只在使用它的函数之外创建一次。在整个应用程序中,无论来自哪个线程,对资源的每次访问都必须通过调用use_resource
或其他等效的方法获得相同的锁。