具有共享变量的 Python 多线程



我正在尝试并行化我的工作,但我是多线程的新手,所以对具体的实现感到困惑。

我有一个套接字侦听器,可将数据保存到缓冲区。当缓冲区达到他的容量时,我需要将其数据保存到数据库中。 在一个线程上,我想启动套接字侦听器,而在并行任务上,我想检查缓冲区状态。

BufferQueue只是pythonlist的扩展,其方法允许检查列表是否已达到指定的大小。

SocketManager是我正在收听的STREAM_URL的流数据提供者。它使用回调函数来处理消息

但是当我使用回调来检索数据时,我不确定使用共享变量是否是正确的最佳决定

buffer = BufferQueue(buffer_size=10000)
def start_listening_to_sokcet(client):
s = SocketManager(client)
s.start_socket(cb_new)
s.start()
def cb_new(message):
print("New message")
global buffer
for m in message:
#save data to buffer
def is_buffer_ready(buffer):
global buffer
print("Buffer state")
if buffer.ready():
#save buffer data to db

如果您能帮助我处理此案,我将不胜感激

我认为您正在寻找的只是queue模块。

queue.Queue是专为在线程之间传递对象而设计的自同步队列。

默认情况下,在队列上调用get将阻塞,直到对象可用,这是您通常想要执行的操作 - 在网络应用程序中使用线程进行并发的要点是,您的线程看起来都像普通的同步代码,但大部分时间都在等待套接字、文件、 排队,或者当他们无事可做时的任何内容。但是您可以使用block=False进行检查而不会阻止,或者在等待时放置一个timeout

您还可以在构造队列时指定maxsize。然后,默认情况下,put将阻止,直到队列不是太满而无法接受新对象。但是,同样,如果它太满,您可以使用blocktimeout来尝试失败。

所有同步都在内部getput内部处理,因此您不需要Lock来保证线程安全或Condition来向服务员发出信号。

队列甚至可以为您处理关闭。生产者可以put一个特殊的值,告诉消费者在get上看到它时退出。

对于生成器需要等到使用者完成的正常关闭,您可以在使用者处理完每个排队的对象后使用可选的task_done方法,并在join方法上让生产者块。但是,如果您不需要此功能,或者有其他方法等待关闭,例如加入使用者线程,则可以跳过此部分。

多线程为您提供资源(变量(的共享状态。无需使用全局变量,只需将缓冲区作为参数传递给您的方法,然后从/写入它。

您仍然需要控制对缓冲区资源的访问,因此两个线程不会同时读取/写入。您可以使用threading模块中的Lock来实现此目的:

lock = threading.Lock()
def cb_new(buffer_, lock_, message):
print("New message")
with lock_():
for m in message:
#save data to buffer
buffer.add(m)
def is_buffer_ready(buffer_, lock_):
print("Buffer state")
with lock_():
if buffer_.ready():
#save buffer data to db

请注意,如果您使用的是多处理而不是线程,则此解决方案将不起作用。

顺便说一句,正如@abarnert评论的那样,有更好的机制来检查缓冲区是否准备就绪(有数据要读取/有可用空间可以写入(,然后调用检查它的函数。查看阻止您的select.select(),直到缓冲区真正准备就绪


使用 select 时,将调用放入while True循环中,然后检查缓冲区是否已准备好读取。您可以在线程中启动此函数,传递标志变量和缓冲区。如果要停止线程,请将传递的标志更改为 False。对于缓冲区对象,请使用Queue.Queue()或类似的数据结构。

def read_select(flag, buff):
flag = 1
while flag:
r, _, _ = select.select([buff], [], [])
if r:
data = s.read(BUFFSIZE)
# process data

P.S- 选择也适用于插座。您可以传递套接字对象而不是缓冲区,它会检查套接字上的缓冲区是否已准备好读取。

相关内容

  • 没有找到相关文章

最新更新