Pytransitions:AsyncMachine异步相关回调的顺序解析



注意:这个问题与Python的FSM库pytransitions 有关

我正在寻找一种方法,当方法回调在prepare或/和之前或/和之后被提及为列表时,可以按顺序解决它们。我正在使用transitions.extensions.asyncio中的AsyncMachine模块

预期结果:

1Done_2Done_3Done

获取:

None_3Done

复制当前情况的示例代码:

import asyncio
from transitions.extensions.asyncio import AsyncMachine

class Model:
STATES = ['A', 'B']
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
]
def __init__(self, name, state='initial'):
self.name = name
self.state = state
self.attribute_1 = None
self.attribute_2 = None
self.attribute_3 = None
async def initialize1(self):
await asyncio.sleep(1)  # This is expensive operation and will take some time.
self.attribute_1 = '1Done'
print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
async def initialize2(self):
await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
self.attribute_2 = f'{self.attribute_1}_2Done'
print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
async def initialize3(self):
self.attribute_3 = f'{self.attribute_2}_3Done'
print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
async def show_attributes(self):
print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')

machine = AsyncMachine(
model=None,
states=Model.STATES,
transitions=Model.TRANSITIONS,
initial=None,
queued='model'
# queued=True
)

async def main():
model1 = Model(name='Model1', state='A')
machine.add_model(model1, initial=model1.state)
await machine.dispatch('next')

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

正如代码'prepare': ['initialize1', 'initialize2', 'initialize3']中所示,我正在寻找一种方法,在解析initialize1后调用initialize2,在解析了initialize1和initialize2方法后调用initiaalize3目前,它们被并行调用,这是一个很好的功能,但如果有办法让它们按顺序执行/解析,那就太棒了

当然,我可以再添加一个像initialize_all这样的方法,然后在里面调用上面所有的方法。但想想我必须不断添加多少新方法才能处理现实世界中的问题。我想使我的函数可重复使用,并且只针对特定任务而更小。

我浏览了pytransitions源代码,找到了两种方法来实现我想要的功能

我想如果我能提到我是如何实现我想要的功能的,那就太好了

由于我正在寻找一种方法来实现回调事件的异步解析(默认情况下(和按要求的顺序解析,因此我不得不重写AsyncMachinecallbacks方法。

方法1:

import asyncio
from functools import partial
from transitions.extensions.asyncio import AsyncMachine

class EnhancedMachine(AsyncMachine):
async def callbacks(self, funcs, event_data):
""" Overriding callbacks method:
Get `parallel_callback` keyword argument to decide whether
callback events should be resolved in parallel or in sequence.
"""
parallel_callback = event_data.kwargs.get('parallel_callback', None)
resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
if parallel_callback is False:
for func in resolved_funcs:
await func()
else:
await self.await_all(resolved_funcs)

class Model:
STATES = ['A', 'B']
TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
]
def __init__(self, name, state='initial'):
self.name = name
self.state = state
self.sequential_transition = True
self.attribute_1 = None
self.attribute_2 = None
self.attribute_3 = None
async def initialize1(self, ed):
await asyncio.sleep(1)  # This is expensive operation and will take some time.
self.attribute_1 = '1Done'
print(f'{self.name} {self.state} -> Initialized1: ', self.attribute_1)
async def initialize2(self, ed):
await asyncio.sleep(0.5)  # This is expensive operation and will take some time.
self.attribute_2 = f'{self.attribute_1}_2Done'
print(f'{self.name} {self.state} -> Initialized2: ', self.attribute_2)
async def initialize3(self, ed):
self.attribute_3 = f'{self.attribute_2}_3Done'
print(f'{self.name} {self.state} -> Initialized3: ', self.attribute_3)
async def show_attributes(self, ed):
print(f'{self.name} {self.state} -> Showing all: {self.attribute_3}')

machine = EnhancedMachine(
model=None,
states=Model.STATES,
transitions=Model.TRANSITIONS,
initial=None,
send_event=True,  # this will pass EventData instance for each method.
queued='model'
# queued=True
)

async def main():
model1 = Model(name='Model1', state='A')
machine.add_model(model1, initial=model1.state)
# Passing `parallel_callback` as False for synchronous events
await machine.dispatch('next', parallel_callback=False)

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

缺点:

添加了
  1. send_event=True,并且所有方法定义都添加了额外的参数ed(event_data(来处理parallel_callback关键字参数。

  2. 转换回调需要传递parallel_callback=False,并且必须更改代码中所有可能的位置。

  3. 如果下一个转换必须根据转换本身的定义来决定,那么不能传递关键字参数parallel_callback(至少我不确定如何做到这一点(:

    TRANSITIONS = [
    {'trigger': 'next', 'source': 'A', 'dest': 'B',
    'prepare': [], 'before': [], 'after': ['next2']},
    {'trigger': 'next2', 'source': 'B', 'dest': 'C',
    'prepare': ['initialize1', 'initialize2', 'initialize3'], 'before': [], 'after': ['show_attributes']}
    ]
    

方法2(我个人更喜欢这种方式(:

在转换的定义中,将相互依赖的回调分组在一起,并应按顺序进行解析。

使用此方法,最终转换看起来像这样

TRANSITIONS = [
{'trigger': 'next', 'source': 'A', 'dest': 'B',
'prepare': [('initialize1', 'initialize2', 'initialize3')], 'before': [],
'after': ['show_attributes']}
]

解释:

'prepare': [('callback1', 'callback2'), 'callback3']

这里,group1(callback1和callback2(、group2(callback3(将异步(并行(解析。但是组1中的callback1和callback2将被同步(按顺序(解析。

Overridencallbacks方法与新的静态方法await_sequential:现在看起来略有不同

class EnhancedMachine(AsyncMachine):
async def callbacks(self, func_groups, event_data):
""" Triggers a list of callbacks """
resolved_func_groups = []
for funcs in func_groups:
if isinstance(funcs, (list, tuple)):
resolved_funcs = [partial(event_data.machine.callback, func, event_data) for func in funcs]
else:
resolved_funcs = [partial(event_data.machine.callback, funcs, event_data)]
resolved_func_groups.append(resolved_funcs)
# await asyncio.gather(*[self.await_sequential(funcs) for funcs in resolved_func_groups])
await self.await_all([partial(self.await_sequential, funcs) for funcs in resolved_func_groups])
@staticmethod
async def await_sequential(funcs):
return [await func() for func in funcs]

缺点:

  1. 方法和方法调用的定义没有任何变化
  2. 改变了一个地方,固定了所有的地方

缺点:

  1. 你应该知道你的方法在做什么。有时,不必要的分组会导致不必要的事件解决延迟

使用这两种方法,我得到了相同的期望输出:

Model1 A -> Initialized1:  1Done
Model1 A -> Initialized2:  1Done_2Done
Model1 A -> Initialized3:  1Done_2Done_3Done
Model1 B -> Showing all: 1Done_2Done_3Done

我坚持第二种方法,尽管我很高兴知道实现这种功能的其他有效方法:(

我认为您的"方法2"看起来不错。如果您知道所有回调都应该按顺序执行,并且根本不需要并行执行,那么您也可以使用:覆盖await_all

class EnhancedMachine(AsyncMachine):
@staticmethod
async def await_all(callables):
return [await func() for func in callables]

如果你切换元组/列表的含义,你可以将代码缩短一点,如下所示:

class EnhancedMachine(AsyncMachine):
async def callbacks(self, func_groups, event_data):
results = []
for funcs in func_groups:
if isinstance(funcs, (list, tuple)):
results.extend(await self.await_all(
[partial(event_data.machine.callback, func, event_data)
for func in funcs]
))
else:
results.append(await self.callback(funcs, event_data))
return results

这启用了类似[stage_1, (stage_2a, stage_2b, stage_2c), stage_3]的回调注释,其中每个阶段按顺序执行,但子阶段并行调用。

最新更新