使用Python中的事件监视系统



我正在用Python创建一个项目,我想添加一个使用事件和事件处理程序的监控系统。我希望这个系统在整个项目中都可用。我想到以下操作:

  • 定义事件。该事件可以将一些数据作为参数。
  • 定义监视器。监视器注册特定事件。多个监视器可以注册同一事件。我想创建不同类型的监视器,例如一个用于打印数据,一个用于创建带有数据的绘图等。因此,监视器应该是一个类,能够保存它收集的所有数据,直到调用某个方法(例如打印,创建日志等)。
  • 为监视器-事件对定义事件处理程序。这定义了给定监视器将如何响应给定事件。此操作主要是:将此数据添加到某个监视器类的实例的数据列表中。
  • 一个通知函数,可以在事件发生时发出通知。这将触发为该事件注册的所有监视器的事件处理程序。理想情况下,通知函数应该可以从项目中的任何位置调用。

如何创建这样的系统?有没有任何库可以帮助我解决这个问题?我特别想知道如何使这个系统在整个项目中透明可用。

你可以用四十行Python代码做大部分你想做的事情。 这是我自己的设计,我一直在使用。 选择函数名称是为了使其成为Qt的"信号"和"插槽"的直接替代品。

它使用起来很简单。 您创建一个PSignal。 通过调用connect方法注册处理程序。 处理程序可以是任何可调用的。 当事件发生时,您可以通过调用emit函数发出信号(即通知事件)。 每个已注册的可调用对象都在该点运行。 调用emit的对象不知道或关心是否有人在听,或者如果他们在听会发生什么。

您还可以断开处理程序的连接。

有很多调试代码,因为我发现否则某些错误可能很难跟踪。

在您的问题中,您希望每个处理程序都是监视器,而在我的设计中处理程序只是函数。 但在我看来,您的"监视器"概念与事件/处理程序机制无关。 你必须编写函数来使你的应用程序运行,并且让这些函数调用你的监视器应该很容易。

该代码在Python 3.3中进行了广泛的测试。

#! python3
import traceback
class PSignal:
def __init__(self, debug=False):
self.debug = debug
self.__handlers = []
def clear(self):
"""Deletes all the handlers."""
self.__handlers.clear()
def connect(self, f):
"""f is a python function."""
if not callable(f):
raise ValueError("Object {!r} is not callable".format(f))
self.__handlers.append(f)
if self.debug:
print("PSIGNAL: Connecting", f, self.__handlers)
def disconnect(self, f):
for f1 in self.__handlers:
if f == f1:
self.__handlers.remove(f)
return
def emit(self, *x, **y):
self._emit(*x, **y)
def check_debug(self):
if self.debug and self.__handlers:
print("PSIGNAL: Signal emitted")
traceback.print_stack()
def _emit(self, *x, **y):
self.check_debug()
for f in self.__handlers:
try:
if self.debug:
print("PSIGNAL: emit", f, len(x), x, y)
f(*x, **y)
except Exception:
print("PSIGNAL: Error in signal", f)
traceback.print_exc()

查看 Reactive Python (RxPy)

反应性X

GitHub/python

您可以使用分布式消息传递系统(如 zmq)和"发布者订户"模式创建自己的系统。

我已经做了类似的东西来构建一个可自定义的工作流引擎(Flows,https://github.com/mastro35/flows

)再见 D.

我将这个用于运行状况监视,它允许用户指定回调,并允许线程、主动监视器和被动监视器:

https://gist.github.com/earonesty/4ccf8fc9bde6feac30e5c155e54dfa5f

我粘贴了下面的代码,没有测试(超过代码):

class MonitorInstance:
def __init__(self, parent, label, func, threshold, active, metric):
self.parent = parent
self.label = label
self.func = func
self.threshold = threshold
self.active = active
self.metric = metric
self.__errors = None
def ok(self):
if self.__errors is None or self.__errors:
self.parent._ok(self)
self.__errors = 0
if self.metric:
self.metric.set(0)
def error(self):
if not self.__errors:
self.parent._error(self)
if self.__errors is None:
self.__errors = 0
self.__errors += 1
if self.metric:
self.metric.inc()
def check(self):
try:
self.func()
self.ok()
except Exception as e:
log.error("%s error: %s", self.label, e)
self.error()
@property
def healthy(self):
return self.__errors < self.threshold
DEFAULT_THRESHOLD = 1           # errors to cause fault
DEFAULT_CHECKSECS = 5           # time in secs between checks
class Monitor:
def __init__(self, health_callback=None, check_secs=DEFAULT_CHECKSECS, use_thread=False):
self.active = []        # active moniors
self.alerts = set()     # thresholds currently triggered (not healthy)
self.health_callback = health_callback
self.healthy = False    # default: not healthy unless a monitor is added!
self.check_secs = check_secs
self.last_check = 0
if use_thread:
assert self.check_secs > 0, "threads need to sleep"
threading.Thread(target=self._thread_loop, daemon=True).start()
def add(self, label, check, threshold=DEFAULT_THRESHOLD, active=False, metric=None):
inst = MonitorInstance(self, label, check, threshold, active, metric)
if active:
self.active.append(inst)
inst.check()
return inst
def _error(self, inst):
self.alerts.add(inst)
if self.healthy:
self._callback(False)
self.healthy = False
def _thread_loop(self):
while True:
self.check()
time.sleep(self.check_secs)
def _callback(self, value):
if not self.health_callback is None:
try:
self.health_callback(value)
except:
# health callback should always succeed!
log.exception("deadlyexes: error calling %s", self.health_callback)
def _ok(self, inst):
self.alerts.discard(inst)
if not self.healthy and not self.alerts:
self._callback(True)
self.healthy = True
def check(self, force=False):
if not force and (time.time() < (self.last_check + self.check_secs)):
return False
# returns true if check was done
checked=False
# convert to list prevents modifying iterators
for inst in list(self.alerts) + self.active:
try:
checked=True
inst.check()
except:
pass
return checked

最新更新