我需要实现以下功能
locked = False
while not locked:
file_path = get_file_path_from_poll()
locked = try_lock_file(file_path)
if locked:
# do some staff
locked = True
release_lock(file_path)
如何在python中实现这一点?我需要处理Linux文件,因为会有很多不同的进程,每个进程都应该锁定某个文件夹中的一个文件
以下是使用fcntl
的非常简单的示例
https://docs.python.org/3/library/fcntl.html#module-fcntl
第一个使用LOCK_NB
来避免阻塞,第二个不使用,这意味着它将阻塞,等待文件访问。
import fcntl
import time
print('Attempting to lock file')
f = open('./locked.txt', 'w')
fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
print ('Lock Acquired')
f.write("some text")
print('Attempting 2nd lock')
f2 = open('./locked.txt', 'r')
f2_locked = False
try:
fcntl.flock(f2.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB)
except BlockingIOError:
print ('The file is Locked 2')
f2_locked = True
print('Sleeping with file locked')
print('Run this program from another terminal to test')
time.sleep(15)
print('File closed try again')
f.close()
time.sleep(15)
此示例阻止,等待文件访问
import fcntl
import time
print('Attempting to lock file')
f = open('./locked.txt', 'w')
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
print ('Lock Acquired')
f.write("some text 2")
print('Sleeping with file locked')
print('Run this program from another terminal to test')
time.sleep(15)
print('File closed try again')
f.close()
time.sleep(15)