Watchdog: OSError: [Errno 9]带Watchdog的文件描述符错误



我使用Watchdog来观察.hdf5文件何时被填充到文件夹中。当它们被填充时,将测试文件是否完成写入。当完成写入后,返回并使用该文件。问题是,当我运行main.py程序时,会出现错误。首先是watchdog_file.py代码:

import time
import traceback
import h5py
import queue
from typing import Union
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent

class NewFileHandler(FileSystemEventHandler):
"""h5 file creation handler for Watchdog"""
def __init__(self):
self.file_queue = queue.Queue()
# callback for File/Directory created event, called by Observer.
def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
if event.src_path[-3:] == ".hdf5":
# run callback with path string
self.file_queue.put(event.src_path)

class ObserverWrapper:
"""Encapsulated Observer boilerplate"""
def __init__(self, path: str, recursive=True):
self.path = path
self.recursive = recursive
self.observer = Observer()
self.handler = NewFileHandler()
self.observer.schedule(self.handler, path=path, recursive=recursive)
self.start()
def start(self):
"""
Starts observing for filesystem events. Runs self.routine() every 1 second.
:param blocking: If true, blocks main thread until keyboard interrupt.
"""
self.observer.start()
def stop(self):
"""
Stops the observer. When running self.start(blocking=True) then you don't need to call this.
"""
self.observer.stop()
self.observer.join()
def wait_for_file(self):
"""
Wait and Process newly created files
"""
max_retry_count = 30 # will try h5 file for a max of35 seconds (upper bound) to see if the file is finished.
# Files are usually finished within 20-30 seconds
retry_interval_seconds = 1 # every hundreth it will try the file to see if it finished writing
# wait for file to be added
file_path = self.handler.file_queue.get(block=True)
# try to open the file
retry_count = 0
try:
file = h5py.File(file_path, "r")
except OSError:
if retry_count < max_retry_count:
retry_count += 1
print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
time.sleep(retry_interval_seconds)
else:
print(f"h5 file <{file_path}> reached max retry count, skipping")
return None
except Exception as err:
print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
traceback.print_exc()
else:
file.close()
return file_path

main.py中是这样的

self.listener = watchdog_search.ObserverWrapper("/home/path/to/folder")
self.listener.start()
self.on_finished_run(self.listener.wait_for_file()) #this line uses the finished .hdf5 file. It's not important to the error at hand. 

我得到的错误如下:

Exception in thread Thread-4:
Traceback (most recent call last):
File "/home/anaconda3/envs/img/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_buffer.py", line 88, in run
inotify_events = self._inotify.read_events()
File "/home/anaconda3/envs/img/lib/python3.8/site-packages/watchdog/observers/inotify_c.py", line 285, in read_events
event_buffer = os.read(self._inotify_fd, event_buffer_size)
OSError: [Errno 9] Bad file descriptor

我通过重写代码解决了这个问题:

class NewFileHandler(FileSystemEventHandler):
def __init__(self, q, *a, **k):
super().__init__(*a, **k)
self._q = q
def on_created(self, event):
self._q.put(event)
class Worker(QObject):
new_file = pyqtSignal(str,str)
def __init__(self, path):
super().__init__()
self._q = queue.Queue()
observer = Observer()
handler = NewFileHandler(self._q)
observer.schedule(handler, path=path, recursive=True)
# starts a background thread! Thus we need to wait for the
# queue to receive the events in work.
observer.start()
def work(self):
while True:
event = self._q.get()
max_retry_count = 350  # for test purposes now but want to set an upper bound on verifying a file is finished.
retry_interval_seconds = .01  # every hundreth it will try the file to see if it finished writing
retry_count = 0
if event.event_type == "created" and event.src_path.lower().endswith(".hdf5"):
while True:
try:
file = h5py.File(event.src_path, "r")
file.close()
except OSError:
if retry_count < max_retry_count:
retry_count += 1
print(f"h5 file <{event.src_path}> is locked, retrying {retry_count}/{max_retry_count}")
time.sleep(retry_interval_seconds)
else:
print(f"h5 file <{event.src_path}> reached max retry count, skipping")
break  # <--- looks useful here
except Exception as err:
print(f"Got unexpected Error <{type(err).__name__}> while opening <{event.src_path}> ")
traceback.print_exc()
else:
self.new_file.emit(event.src_path, os.path.basename(event.src_path))
break

最新更新