我正在尝试模拟在Python中并行运行的不同任务,并且每个并行进程以不同的频率运行(例如200 Hz,100 Hz和50 Hz(。我使用这个问题中的代码来创建一个 Timer 类来"实时"运行这些进程,但这些进程会随着时间的推移而不同步(例如,三个 200 Hz 任务有时会在两个 100 Hz 任务之间运行(。
为了同步我的进程,我在它们的共享内存中制作了即时报价计数器。200 Hz 进程的每次迭代都会递增一个计数器,然后在计数器达到 2 时等待计数器重置为 0,而 100 Hz 进程的每次迭代都会等待该计数器达到 2,然后再重置它。50 Hz 过程也是如此,但有另一个计数器。我使用while/pass方法进行等待。
这是代码:
from multiprocessing import Process, Event, Value
import time
# Add Timer class for multiprocessing
class Timer(Process):
def __init__(self, interval, iteration, function, args=[], kwargs={}):
super(Timer, self).__init__()
self.interval = interval
self.iteration = iteration
self.iterationLeft = self.iteration
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
def cancel(self):
"""Stop the timer if it hasn't finished yet"""
self.finished.set()
def run(self):
startTimeProcess = time.perf_counter()
while self.iterationLeft > 0:
startTimePeriod = time.perf_counter()
self.function(*self.args, **self.kwargs)
# print(self.interval-(time.clock() - startTimePeriod))
self.finished.wait(self.interval-(time.perf_counter() - startTimePeriod))
self.iterationLeft -= 1
print(f'Process finished in {round(time.perf_counter()-startTimeProcess, 5)} seconds')
def func0(id, freq, tick_p1):
# Wait for 2 runs of Process 1 (tick_p1)
while tick_p1.value < 2:
pass
tick_p1.value = 0 # Reset tick_p1
# Add fake computational time depending on the frequency of the process
print(f'id: {id} at {freq} Hz')
if freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
def func1(id, freq, tick_p1, tick_p2):
# Wait for tick_p1 to have been reset by Process0
while tick_p1.value >= 2:
pass
# Wait for 2 runs of Process 2 (tick_p2)
while tick_p2.value < 2:
pass
tick_p2.value = 0 # Reset tick_p2
# Add fake computational time depending on the frequency of the process
print(f'id: {id} at {freq} Hz')
if freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
# Increment tick_p1
tick_p1.value += 1
def func2(id, freq, tick_p2):
# Wait for tick_p2 to have been reset by Process1
while tick_p2.value >= 2:
pass
# Add fake computational time depending on the frequency of the process
print(f'id: {id} at {freq} Hz')
if freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
# Increment tick_p2
tick_p2.value += 1
if __name__ == '__main__':
freqs = [50,100,200]
# freqs = [0.25,0.5,1]
Tf = 10
tick_p1 = Value('i', 1)
tick_p2 = Value('i', 1)
processes = []
p0 = Timer(interval=1/freqs[0], iteration=round(Tf*freqs[0]), function = func0, args=(0,freqs[0], tick_p1))
p1 = Timer(interval=1/freqs[1], iteration=round(Tf*freqs[1]), function = func1, args=(1,freqs[1], tick_p1, tick_p2))
p2 = Timer(interval=1/freqs[2], iteration=round(Tf*freqs[2]), function = func2, args=(2,freqs[2], tick_p2))
processes.append(p0)
processes.append(p1)
processes.append(p2)
start = time.perf_counter()
for process in processes:
process.start()
for process in processes:
process.join()
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 5)} seconds')
如您所见,我在流程中添加了睡眠时间,以模拟计算时间。当我删除进程中的打印命令时,脚本需要 10.2 秒的运行时来模拟 10 秒的"实时"计算(增加 2%,这是可以接受的(。
我的问题是,这是实现我想要做的事情的最佳方式吗?有没有更好/更快的方法?
谢谢!
我想出了一种更干净的方法来做到这一点,但我仍然愿意接受其他建议。
基本上,我创建了一个调度程序,用于标记何时在每个进程上运行循环(使用共享值(,而不是等待执行下一次迭代的时刻。
有一个快速的进程(假设p2
以 400 Hz 运行(,所有其他进程必须是频率的较慢倍数(假设p1
和p0
200 和 100 Hz(。
调度程序不是等待正确的时刻来引发执行标志(带有wait()
或sleep()
(,而是使用while
循环循环并检查 p2 周期是否已结束。如果满足条件,它将引发p2Flag
并重新启动周期。每个进程都有自己的标志,较慢的进程的标志根据计数器引发,该计数器每p2
一段时间递增。如果自上次调用p1
以来运行了 2 个p2
时间步,则此调度程序将"等待"p1
完成,然后再引发p2
和p1
的标志。
这有点复杂,但这确保了速度较慢的机器将获得与可以"实时"运行的机器相同的结果。
from multiprocessing import Process, Value
import time
def func0(id, freq, endFlag, p0Flag, runIdx, Ts):
while (endFlag.value == 0):
if (p0Flag.value == 1):
t = round(runIdx.value*Ts, 4)
# Add fake computational time depending on the frequency of the process
# print(f'id: {id} at {freq} Hz at {t}s')
if freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
# Lower flag to confirm completion of cycle
p0Flag.value = 0
def func1(id, freq, endFlag, p1Flag, runIdx, Ts):
while (endFlag.value == 0):
if (p1Flag.value == 1):
t = round(runIdx.value*Ts, 4)
# Add fake computational time depending on the frequency of the process
# print(f'id: {id} at {freq} Hz at {t}s')
if freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
# Lower flag to confirm completion of cycle
p1Flag.value = 0
def func2(id, freq, endFlag, p2Flag, runIdx, Ts):
while (endFlag.value == 0):
if (p2Flag.value == 1):
t = round(runIdx.value*Ts, 4)
# Add fake computational time depending on the frequency of the process
# print(f'id: {id} at {freq} Hz at {t}s')
if freq == 500:
time.sleep(0.0015)
elif freq == 400:
time.sleep(0.002)
elif freq == 200:
time.sleep(0.003)
elif freq == 100:
time.sleep(0.007)
elif freq == 50:
time.sleep(0.015)
# Update time for next iteration
runIdx.value += 1
# Lower flag to confirm completion of cycle
p2Flag.value = 0
if __name__ == '__main__':
# Set frequencies of processes
# Last value of freqs is the fastest one, for process p2
freqs = [50,100,200] # Hz
freqs = [100,200,400] # Hz
# freqs = [0.25,0.5,1] # Hz
Tf = 10
Ts = round(1/freqs[-1], 4)
# Create shared values for time index (runIdx)
# Various flags to trigger the execution of the code in each process (p0Flag, ...)
# A flag to en all processes
runIdx = Value('I',0)
p0Flag = Value('b', 0)
p1Flag = Value('b', 0)
p2Flag = Value('b', 0)
endFlag = Value('b', 0)
# How many times the fastest process has to run before flagging the slower processes
p0_counter_exe = freqs[-1]/freqs[0]
p1_counter_exe = freqs[-1]/freqs[1]
if (not(freqs[-1] % freqs[0] == 0) or not(freqs[-1] % freqs[1] == 0)):
raise Exception("Update rates for processes must be a multiple of the dynamic's update rate.")
if (freqs[-1] < freqs[0]) or (freqs[-1] < freqs[1]):
raise Exception("Dynamics update rate must be the fastest.")
# p2 is at fastest frequency, p1 and p0 at lower frequencies
p0 = Process(target=func0, args=(0, freqs[0], endFlag, p0Flag, runIdx, Ts))
p1 = Process(target=func1, args=(1, freqs[1], endFlag, p1Flag, runIdx, Ts))
p2 = Process(target=func2, args=(2, freqs[2], endFlag, p2Flag, runIdx, Ts))
processes = []
processes.append(p0)
processes.append(p1)
processes.append(p2)
for process in processes:
process.start()
time.sleep(0.5)
# Start subprocesse's counters to execute directly at the first timestep
p0_counter = p0_counter_exe
p1_counter = p1_counter_exe
# Scheduler
#------------
startTime = time.perf_counter()
periodEnd = time.perf_counter()
while (runIdx.value*Ts < Tf):
periodTime = time.perf_counter()-periodEnd
do_p2 = False
# Wait for new timestep AND completion of p2
if (periodTime >= Ts and p2Flag.value == 0):
# If p0 or p1 are expected to finish before the new timestep, wait for their completion
# Depending on the situation, if slower processes have finished their cycle, make do_p2 True
if (p1_counter == p1_counter_exe) and (p0_counter == p0_counter_exe):
if (p1Flag.value == 0) and (p0Flag.value == 0):
do_p2 = True
elif (p1_counter == p1_counter_exe):
if (p1Flag.value == 0):
do_p2 = True
elif (p0_counter == p0_counter_exe):
if (p0Flag.value == 0):
do_p2 = True
else:
do_p2 = 1
# If do_p2 is True, raise p2Flag for the p2 process
if (do_p2):
periodEnd = time.perf_counter()
p2Flag.value = 1
# If it's time to start a cycle for the slower processes, raise their flag and reset their counter
if (p1_counter == p1_counter_exe):
p1Flag.value = 1
p1_counter = 0
if (p0_counter == p0_counter_exe):
p0Flag.value = 1
p0_counter = 0
# Increment slower processes counter
p1_counter += 1
p0_counter += 1
# Close all processes
endFlag.value = 1
for process in processes:
process.join()
print(f'Finished in {round(time.perf_counter()-startTime, 5)} seconds')
print(Ts*runIdx.value)