我有一个名为"rules"的类对象数组。
self.rules看起来像这样
[<creative_task.rules.rules.Rule object at 0x7fde7a33f518>, <creative_task.rules.rules.Rule object at 0x7fde7a33f830>, <creative_task.rules.rules.Rule object at 0x7fde7a33f888>, <creative_task.rules.rules.Rule object at 0x7fde7a33f8e0>]
此 Rule 类具有验证存储在其中的数据的方法 self.check。
有很多规则需要验证:请求的值是 MORE、LESS、EQUALS、介于、正、负,相对于数据库中的值。 self.check 函数调用的每个此类规则都会调用 Clickhouse 数据库以验证自身。
基本上,当我需要验证规则数组时,我会调用check_all函数来验证所有规则,如下所示rules=[r.check(asset, context( for r in self.rules]:
def check_all(self, asset: dict, context: dict = None) -> RuleGroupResult:
context = {} if context is None else context
return RuleGroupResult(
identifier=self.identifier,
rules=[r.check(asset, context) for r in self.rules],
groups=[g.check(asset, context) for g in self.groups],
logical_operator=self.logical_operator,
)
这些规则并不相互依赖。
现在我想使这种验证成为一个并行过程。所以我需要为数组中的每个规则调用 Rule.check 方法,但此调用应同时发生
这可以通过Pool.map
和operator.methodcaller
相对容易地完成,因为您的参数都是相同的,并且该方法在每个对象上具有相同的名称:
import multiprocessing as mp # Top of file
from operator import methodcaller
def check_all(self, asset: dict, context: dict = None) -> RuleGroupResult:
context = {} if context is None else context
with mp.Pool() as pool: # Create pool
return RuleGroupResult(
identifier=self.identifier,
rules=pool.map(methodcaller('check', asset, context), self.rules), # Map in parallel
groups=pool.map(methodcaller('check', asset, context), self.groups),
logical_operator=self.logical_operator,
)