pysnmp陷阱接收器-使用异步队列(w/o线程)进行处理



我的目标是拥有一个纯异步实现,用pysnmp接收和处理SNMP-TRAPS。到目前为止,我只通过使用(一个或多个(单独的线程来完成它。

我为pysnmp找到的例子是使用同步回调函数(cbFun(。我不知道如何在那里创建异步函数。

下面的例子对我来说很好,但使用了一个单独的线程(或池(。如果能有这样的东西就太好了:

queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(run_daemon)
loop.create_task(process_trap)
loop.run_forever()

但由于cbFun,我没能做到这一点。

这是我的工作(线程(示例:

#!/usr/bin/env python3.8
import asyncio
import concurrent.futures
from pysnmp.entity import engine, config
from pysnmp.carrier.asyncio.dgram import udp
from pysnmp.entity.rfc3413 import ntfrcv
class SnmpTrapDaemon():
@staticmethod
def run_daemon(pool):
# Create SNMP engine with autogenernated engineID and pre-bound
# to socket transport dispatcher
snmpEngine = engine.SnmpEngine()
# Transport Setup
config.addTransport(
snmpEngine,
udp.domainName,
udp.UdpTransport().openServerMode(('0.0.0.0', '162'))
)
# SNMPv1/2c setup
config.addV1System(
snmpEngine, 'public', 'public')
# Callback function for receiving notifications
# noinspection PyUnusedLocal
def cbFun(snmpEngine, stateReference, contextEngineId,
contextName, varBinds, cbCtx):
trap = {}
for oid, val in varBinds:
trap[oid.prettyPrint()] = val.prettyPrint()
pool.submit(asyncio.run, process_trap(trap))
# Register SNMP Application at the SNMP engine
ntfrcv.NotificationReceiver(snmpEngine, cbFun)
snmpEngine.transportDispatcher.jobStarted(1)
try:
print(f'Trap Listener started on port 162. Press Ctrl-c to quit.')
snmpEngine.transportDispatcher.runDispatcher()
except KeyboardInterrupt:
print('user quit')
finally:
snmpEngine.transportDispatcher.closeDispatcher()
async def process_trap(trap):
print('Processing TRAP - this might take while...')
await asyncio.sleep(3)
for item in trap.items():
print(item)
print('...done')
def main():
print('Starting SNMP-TRAP Processor. Test with "snmptrap -v2c -c public 127.0.0.1:162 123 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.5.0 s test"')
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
SnmpTrapDaemon.run_daemon(pool)
if __name__ == '__main__':
main()

这对我来说不需要线程:

#!/usr/bin/env python3.8
import asyncio
from pysnmp.entity import engine, config
from pysnmp.carrier.asyncio.dgram import udp
from pysnmp.entity.rfc3413 import ntfrcv
def run_daemon():
# Create SNMP engine with autogenernated engineID and pre-bound
# to socket transport dispatcher
snmpEngine = engine.SnmpEngine()
# Transport Setup
config.addTransport(
snmpEngine,
udp.domainName,
udp.UdpTransport().openServerMode(('0.0.0.0', '162'))
)
# SNMPv1/2c setup
config.addV1System(
snmpEngine, 'public', 'public')
# Callback function for receiving notifications
# noinspection PyUnusedLocal
def cbFun(snmpEngine, stateReference, contextEngineId,
contextName, varBinds, cbCtx):
trap = {}
print('Notification from ContextEngineId "%s", ContextName "%s"' % (contextEngineId.prettyPrint(),
      contextName.prettyPrint()))
for oid, val in varBinds:
trap[oid.prettyPrint()] = val.prettyPrint()
asyncio.create_task(process_trap(trap))

# Register SNMP Application at the SNMP engine
ntfrcv.NotificationReceiver(snmpEngine, cbFun)
snmpEngine.transportDispatcher.jobStarted(1)
try:
print(f'Trap Listener started on port 162. Press Ctrl-c to quit.')
snmpEngine.transportDispatcher.runDispatcher()
except KeyboardInterrupt:
print('user quit')
finally:
snmpEngine.transportDispatcher.closeDispatcher()
async def process_trap(trap):
print('Processing TRAP - this might take while...')
await asyncio.sleep(3)
for item in trap.items():
print(item)
print('...done')
def main():
print('Starting SNMP-TRAP Processor. Test with "snmptrap -v2c -c public 127.0.0.1:162 123 1.3.6.1.6.3.1.1.5.1 1.3.6.1.2.1.1.5.0 s test"')
run_daemon()
if __name__ == '__main__':
main()

最新更新