在 Python 中维护一个可自动清理的线程列表



我维护一个threads列表,我想在完成后自动从列表中删除线程

我找到了这种方法:

import threading, time
def f(seconds, info):
print('starting', seconds)
time.sleep(seconds)
print('finished', seconds)
threads.remove(info['thread'])
def newaction(seconds):
info = {}
thread = threading.Thread(target=f, args=(seconds, info))
info['thread'] = thread
thread.start()
threads.append(thread)
threads = []
newaction(1)
newaction(2)
for _ in range(10):
time.sleep(0.3)
print(threads)

它的工作原理:

starting 1
starting 2
[<Thread(Thread-1, started 1612)>, <Thread(Thread-2, started 712)>]
[<Thread(Thread-1, started 1612)>, <Thread(Thread-2, started 712)>]
[<Thread(Thread-1, started 1612)>, <Thread(Thread-2, started 712)>]
finished 1
[<Thread(Thread-2, started 712)>]
[<Thread(Thread-2, started 712)>]
[<Thread(Thread-2, started 712)>]
finished 2
[]
[]
[]
[]

但是必须通过info的命令这一事实有点黑客。我使用它是因为显然我无法通过threadargs......

thread = threading.Thread(target=f, args=(seconds, thread))  
#                                                     ^ not created yet!

。当Thread对象尚未创建时!

Python 中是否有更自然的方式来维护可自动清理的线程列表?

你有current_thread()函数。

import threading, time
def f(seconds):
print('starting', seconds)
time.sleep(seconds)
print('finished', seconds)
threads.remove(threading.current_thread())
def newaction(seconds):
thread = threading.Thread(target=f, args=(seconds,))
thread.start()
threads.append(thread)
threads = []
newaction(1)
newaction(2)
for _ in range(10):
time.sleep(0.3)
print(threads)

输出:

starting 1
starting 2
[<Thread(Thread-1, started 4588)>, <Thread(Thread-2, started 4388)>]
[<Thread(Thread-1, started 4588)>, <Thread(Thread-2, started 4388)>]
[<Thread(Thread-1, started 4588)>, <Thread(Thread-2, started 4388)>]
finished 1
[<Thread(Thread-2, started 4388)>]
[<Thread(Thread-2, started 4388)>]
[<Thread(Thread-2, started 4388)>]
finished 2
[]
[]
[]
[]

子类化线程会产生具有自然语法和安全位置来保存线程列表的解决方案。您也不必在要在另一个线程中运行的每个函数的末尾包含删除线程的指令。只需使用子类。

import threading, time
class AutoRemovingThread(threading.Thread):
threads = []
def __init__(self, func, *args, **kwargs):
super().__init__()
self.threads.append(self)
self.func = func
self.args = args
self.kwargs = kwargs
def run(self):
self.func(*self.args, **self.kwargs)
self.threads.remove(self)
def f(seconds):
print('starting', seconds)
time.sleep(seconds)
print('finished', seconds)
def newaction(seconds):
AutoRemovingThread(f, seconds).start()
newaction(1)
newaction(2)
for _ in range(10):
time.sleep(0.3)
print(AutoRemovingThread.threads)

输出:

starting 1
starting 2
[<AutoRemovingThread(Thread-1, started 8436)>, <AutoRemovingThread(Thread-2, started 1072)>]
[<AutoRemovingThread(Thread-1, started 8436)>, <AutoRemovingThread(Thread-2, started 1072)>]
[<AutoRemovingThread(Thread-1, started 8436)>, <AutoRemovingThread(Thread-2, started 1072)>]
finished 1
[<AutoRemovingThread(Thread-2, started 1072)>]
[<AutoRemovingThread(Thread-2, started 1072)>]
[<AutoRemovingThread(Thread-2, started 1072)>]
finished 2
[]
[]
[]
[]

python-3.8

import threading
def get_status_of_threads():
current_threads = threading.enumerate()
thread_data = []
for item in current_threads:
try:
print(str(item.target))
except AttributeError:
print("item", str(item))
thread_data.append({"thread_name": item.getName(), "status": int(item.is_alive()), "id": item.ident})
return thread_data

上面的代码在 Python2.7 中进行了测试,如果你想持续监控线程,你可以在单独的线程中使用它,或者你可以将其公开为 API,这样你可以随时检查。这也将有助于减少资源的浪费。

对于 API,您可以使用json2html.convert({"thread_data":thread_data})函数以表格形式以更美观的方式显示它。

最新更新