需要检测任何带有特定分区标签的usb块设备



我很惊讶这很难找到。

我需要检测何时使用python3添加(插入(具有特定分区标签的USB块设备。

有没有一种方法可以使用pyudev来提供USB块设备的列表?我怎样才能指定一个子系统="0"的过滤器;块";AND子系统=";usb";,它们似乎是互斥的过滤器。

当一个USB设备具有一个名为"的分区时;XYZ";插入,我需要运行一个脚本来装载它,并运行一个使用该分区上数据的程序。

我尝试了太多的变体,包括各种udev规则、systemd单元、许多脚本及其组合,但在使用以下代码之前没有取得任何成功它工作了,但造成了100%的CPU负载。当我在while循环的末尾添加睡眠时间时,它根本不起作用,甚至还阻止了PCmanFM的自动运行。

该问题发生在usbEvent.py进程中。我可以从命令行运行它,它运行得很好。它所做的第一件事是使用Popen来调用";grep devName/proc/mounts";等待自动装载程序装载分区。Popen是在循环中调用的,并增加了一些时间。睡眠消除了CPU负担,这令人惊讶,因为装载点在几秒钟内就出现了。

systemd运行下面的代码和它产生的usbEvent.py进程之间似乎有一些我不完全理解的相互作用。它们是独立的过程,所以我认为它们应该相互独立。

usbEvent.py处理程序可以工作,但识别装载并继续需要更长的时间。当它运行时,它消耗了大约5%的CPU,当它完成时只消耗0.3。为什么超时结束时它没有结束一定是由于p.communicate,但如果p.poll没有返回None,则进程应该完成,不应该阻止。。。但确实如此!为什么?

该平台是一款具有8GB RAM的复盆子Pi4,并于2021年1月发布复盆子Pi操作系统。

#!/usr/bin/env python3
import os
import time
import subprocess as sp
import pyudev
# This code is run on boot via systemd to detect when
# my custom USB storage device (USB stick, SSD etc) 
# is inserted or removed. It spawns a new process to 
# handle the event.
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by('block', device_type="partition")

def log_event(action, device):
    devName = device.get('DEVNAME')
    devLabel = device.get('ID_FS_LABEL')
    if devLabel == "MY_CUSTOM_USB":
        sp.Popen(["/home/user/bin/customUSB/usbEvent.py",
                  action, devName, devLabel],
                 stdin=sp.DEVNULL, stdout=sp.DEVNULL, stderr=sp.DEVNULL)

observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()
while True:
#    pass
    time.sleep(0.1)

以下是我为使usbEvent.py处理程序正常工作而更改的部分:

# Waits for mount point of "dev" to appear and returns it. Communnicate
def getMountPoint(dev):
    out = ""
    interval = 0.1
    timeout = 5 / interval
    while timeout > 0:
        p = sp.Popen(["grep", dev, "/proc/mounts"],
                     text=True, stdout=sp.PIPE, stderr=sp.PIPE)
        retCode = p.poll()
        if retCode is None:
            time.sleep(interval)
        else:
            out, err = p.communicate()  # This should not block but does!
            if retCode == 0 and len(out) > 0:
                out = out.split()[1]
                break
            else:
                lg.info(f"exit code: {retCode} Error: {err}")
                exit(1)
    if timeout == 0:
        p.terminate()
    return out

相关内容

  • 没有找到相关文章

最新更新