Python - threading assert组在创建自定义线程类时为None



我想创建一个自定义Thread类,它能够将遇到的异常传播到主线程。我的实现如下:

class VerseThread(threading.Thread):
def __init__(self, args):
super().__init__(self, args=args)
# self.scraper = scraper
def run(self):
self.exc = None
try:
book, abbrev, template, chapter = self.args
self.parser.parse(book, abbrev, template, chapter)
except ChapterNotFoundError as e:
self.exc = e
def join(self):
threading.Thread.join(self)
if self.exc:
raise self.exc

这应该在下面的方法中运行,在Scraper类中(它都在ẁhile true中):

for book, abbrev, testament in self.books[init:end]:
base_chapter = 1
while True:
threads = []
if testament == 'ot':
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i)))
else:
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter+i)))

try:
for thread in threads:
if not thread.is_alive():
thread.start()
for thread in threads:
thread.join()
base_chapter += 3
except ChapterNotFoundError as e:
LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
break

问题是,如果我像这里一样运行它,我得到错误assert group is None, "group argument must be None for now"。然而,当我使用Thread(target=self.parse, args=(book, abbrev, OT_TEMPLATE, base_chapter+1))而不是VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i))运行它时,它工作得很好,但例外当然仍然存在。我的代码有什么问题?怎样才能消除这个错误呢?

编辑:经过进一步的测试,似乎我要做的工作很好,当我使用thread.run()而不是thread.start(),但只有一个线程被使用,这是一个问题。然而,这意味着错误必须在start()方法中,但我不知道该怎么做。

您有几个错误。首先,如果您像在super().__init__(self, target=target, args=args)中那样使用super(),则不需要将self显式地作为参数传递。其次,要处理任何可能的线程初始化参数,此方法的签名应该如下所示:

class VerseThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
... # rest of the code omitted

但是因为你的__init__方法什么也不做,只是用任何传递的参数调用父类的__init__方法,现在甚至不需要重写这个方法。

最后,您感兴趣的属性不是args,而是_args_kwargs(如果指定了关键字参数)。此外,您已经指定了self.parser,但我没有看到该属性设置在哪里。

import threading
class ChapterNotFoundError(Exception):
pass
class VerseThread(threading.Thread):
def run(self):
self.exc = None
try:
book, abbrev, template, chapter = self._args
self.parser.parse(book, abbrev, template, chapter)
except ChapterNotFoundError as e:
self.exc = e
def join(self):
threading.Thread.join(self)  # Or: super().join()
if self.exc:
raise self.exc
for book, abbrev, testament in self.books[init:end]:
base_chapter = 1
while True:
threads = []
if testament == 'ot':
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter+i)))
else:
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter+i)))

try:
for thread in threads:
if not thread.is_alive():
thread.start()
for thread in threads:
thread.join()
base_chapter += 3
except ChapterNotFoundError as e:
LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
break

访问像self._args这样的准私有属性是一个潜在的危险,应该避免。

我可以看到创建Thread子类的价值,该子类将捕获"worker"中的异常。它的功能是执行,然后"建议"。它在加入主线程时返回主线程。但我认为这样的类应该是通用的,可以使用任何类型的辅助函数。一般来说,我不喜欢在multithreading.Threadmultiprocessing.Pool子类中使用特定于应用程序的代码(业务逻辑)。相反,我更喜欢将业务逻辑编码在函数或类方法中,然后可以在您认为合适的情况下用于多线程、多处理或串行处理。以下是我如何编码Thread子类(我已将其命名为PropogateExceptionThread,但选择您希望的任何名称),我可能会使用它:

import threading
class PropogateExceptionThread(threading.Thread):
def run(self):
self.exc = None
try:
super().run()
except Exception as e:
self.exc = e
def join(self):
super().join()
if self.exc:
raise self.exc
def worker(x):
if x < 10 or x > 20:
raise ValueError(f'Bad value for argument x = {x}')
t = PropogateExceptionThread(target=worker, args=(1,))
t.start()
try:
t.join()
except Exception as e:
print('The thread raised an exception:', e)

打印:

The thread raised an exception: Bad value for argument x = 1

最新更新