如何在asyncio中使用kqueue进行文件监控



我想使用kqueue来监视文件的更改。我可以看到如何以线程方式使用select.kqueue((。

我正在寻找一种将其与asyncio一起使用的方法。我可能错过了一些非常明显的东西。我知道python在macos上使用kqueue进行异步。我很高兴任何解决方案只有在使用kqueue选择器时才能工作。

到目前为止,我能看到的唯一方法是创建一个线程,从另一个线程连续kqueue.control(),然后用asyncio.loop.call_soon_threadsafe()注入事件。我觉得应该有更好的方法。

您可以使用loop.add_reader((将kqueue objet中的FD作为读取器添加到控制循环中。然后控制循环将通知您事件已准备好收集。

这样做有两个功能,对于熟悉kqueue的人来说可能很奇怪:

  • select.kqueue.control是一种一次性方法,它首先更改监视器并等待新事件到达。因为我们不希望它被阻塞,所以这两个操作必须分为一个非阻塞调用来修改监视器,另一个稍后的非阻塞调用用来收集结果事件
  • 因为我们从来都不想阻止,所以永远不能使用超时。这可以通过asyncio.wait_for()重新实现

有更有效的方法来写这篇文章,但这里有一个如何用异步方法(这里称为kqueue_control(完全替换select.kqueue.control的例子:

async def kqueue_control(kqueue: select.kqueue,
changes: Optional[Iterable[select.kevent]],
max_events: int,
timeout: Optional[int]):
def receive_result():
try:
# Events are ready to collect; fetch them but do not block
results = kqueue.control(None, max_events, 0)
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(results)
finally:
loop.remove_reader(kqueue.fileno())

# If this call is non-blocking then just execute it
if timeout == 0 or max_events == 0:
return kqueue.control(changes, max_events, 0)

# Apply the changes, but DON'T wait for events
kqueue.control(changes, 0)
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(kqueue.fileno(), receive_result)
if timeout is None:
return await future
else:
return await asyncio.wait_for(future, timeout)

最新更新