djangoviewflow,在handler中处理异常的正确方法是什么



假设我有以下节点:

perform_disk_size_change = ( 
flow.Handler(
this.perform_proxmox_api_request
).Next(this.notify_iaas_success)
)

如果perform_proxmox_api_request引发异常怎么办?我可以指定一个Exception节点,以便任何异常都将转到该节点吗?

perform_disk_size_change = ( 
flow.Handler(
this.perform_proxmox_api_request
).Next(this.notify_iaas_success).Except(this.notify_iaas_fail)
)

所以我提出了以下解决方案,不完全确定是否应该这样做,问题是HandlerActivation立即执行,所以没有机会将异常对象传递给下一个激活(如果它是Handler节点(,我仍然不知道如何在不需要使用另一个自定义节点的情况下将异常传递给下个激活。

import logging
from copy import copy
from viewflow.activation import Activation, STATUS, all_leading_canceled
from viewflow.nodes.handler import HandlerActivation
from viewflow.rest import flow
from viewflow.mixins import Edge
from viewflow.contrib import celery

log = logging.getLogger(__name__)

class HandlerActivationCatchExcept(HandlerActivation):
def __init__(self, *args, **kwargs):
self.caught_exception = None
super(HandlerActivationCatchExcept, self).__init__(*args, **kwargs)
def execute(self):
"""Run the callback."""
try:
self.flow_task.handler(self)
except Exception as ex:
self.caught_exception = ex
log.exception('Exception caught in CatchExceptionHandler')
@Activation.status.transition(source=STATUS.DONE, conditions=[all_leading_canceled])
def activate_next(self):
"""Activate all outgoing edges."""
if self.caught_exception:
for mapping in self.flow_task.exception_mappings:
if isinstance(self.caught_exception, mapping['cls']):
return mapping['node'].activate(prev_activation=self, token=self.task.token)
else:
return super(HandlerActivationCatchExcept, self).activate_next()

class CatchExceptionMixin(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exception_mappings = []
def _resolve(self, resolver):
super()._resolve(resolver)
for exception_mapping in self.exception_mappings:
exception_mapping['node'] = resolver.get_implementation(exception_mapping['node'])
def _outgoing(self):
if self._next:
yield Edge(src=self, dst=self._next, edge_class='next')
for exception_mapping in self.exception_mappings:
yield Edge(src=self, dst=exception_mapping['node'], edge_class='cond_false')
def Except(self, node, exception_cls=Exception):
self.exception_mappings.append({"cls": exception_cls, "node": node})
return copy(self)

class CatchExceptionHandler(CatchExceptionMixin, flow.Handler):
"""
Custom handler node to allow catching exception and route to certain node
usage: CatchExceptionHandler(method).Except(default_node)
CatchExceptionHandler(method).Except(specific_node, SpecificExceptionCls).Except(default_node)
"""
activation_class = HandlerActivationCatchExcept

class JobActivationCatchException(celery.JobActivation):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.caught_exception = None
def error(self, comments=""):
super().error(comments)
self.caught_exception = Exception(comments)
self.activate_exception_node()
@Activation.status.transition(source=STATUS.ERROR, conditions=[all_leading_canceled])
def activate_exception_node(self):
"""Activate all outgoing edges."""
self.flow_task.exception_mappings[0]['node'].activate(prev_activation=self, token=self.task.token)

class CatchExceptionJob(CatchExceptionMixin, celery.Job):
"""
Custom job node to allow catching exception and route to certain node
usage: CatchExceptionJob(method).Except(default_node)
"""
activation_class = JobActivationCatchException

class HandleExceptionActivation(HandlerActivation):
@classmethod
def activate(cls, flow_task, prev_activation, token):
"""Instantiate new task."""
task = flow_task.flow_class.task_class(
process=prev_activation.process,
flow_task=flow_task,
token=token)
task.save()
task.previous.add(prev_activation.task)
activation = cls()
# adds previous activation ref
activation.prev_activation = prev_activation
activation.initialize(flow_task, task)
activation.perform()
return activation

class ExceptionHandler(flow.Handler):
activation_class = HandleExceptionActivation

用法类似:

perform_disk_size_change = (
nodes.CatchExceptionHandler(
this.perform_proxmox_api_request
).Next(
this.notify_iaas_success
).Except(
this.notify_iaas_team, TechnicalExceptionCls
).Except(
this.notify_customer_failed
)
)
notify_iaas_team = ExceptionHandler(this.email_iaas)
notify_customer_decline = Handler(...)
def email_iaas(self, activation):
# access to previous activation
prev_activation = activation.prev_activation
# access to previously caught exception
exception = prev_activation.caught_exception
...
# OR Celery job
populate_current_info = (nodes.CatchExceptionJob(populate_proxmox_info).Next(this.determine_approval_needed).Except(this.notify_iaas_failed))

相关内容

最新更新