在多处理或多线程应用程序中保留 CPU 时间



我正在使用Raspberry Pi 3进行一个项目,用于一些环境控制,并在连续循环中具有许多简单的重复事件。RP3 胜任这项工作太过分了,但它让我专注于其他事情。

应用特点:

  1. 应用程序应从十几个传感器(温度、湿度、pH、ORP 等)收集传感器数据(间隔 n 秒可变)。
  2. 根据时间和这些传感器数据,控制器计算输出(开关、阀门和PWM驱动器)。
  3. 几乎没有事件需要按顺序运行。
  4. 有些事件属于"安全"类别,应在触发时立即运行(故障安全传感器、紧急按钮)。
  5. 大多数事件以秒间隔重复运行(每秒,直到每 30 秒)。
  6. 某些事件会触发操作,在 1 到 120 秒内激活中继。
  7. 某些事件使用时序值。此值需要每天计算多次,并且占用大量 CPU 资源(使用一些迭代插值公式,因此具有可变运行时)。
  8. 显示环境状态(连续循环)

我对 VB.NET 很熟悉(不是职业),但决定用 Python 3.6 做这个项目。 在过去的几个月里,我读了很多关于子主题的信息,比如设计模式、线程、流程、事件、平行实践等。

根据我的阅读,我认为 Asyncio 与执行器中的一些任务相结合即可完成这项工作。

大多数任务/事件都不是时间关键型的。控制器输出可以使用"最新"传感器数据。 另一方面,某些任务会在一段时间内激活中继。我想知道如何对这些任务进行编程,而不会在 CO2 阀门打开的时间段(例如)期间(例如)另一个"耗时"的任务阻塞处理器。这对我的环境可能是灾难性的。

因此,我需要一些建议。

到目前为止,请参阅下面的代码。我不确定我是否正确使用了Python中的Asyncio函数。 为了便于阅读,我将各种任务的内容存储在单独的模块中。

import asyncio
import concurrent.futures
import datetime
import time
import random
import math
# define a task...
async def firstTask():
while True:
await asyncio.sleep(1)
print("First task executed")
# define another task...        
async def secondTask():
while True:
await asyncio.sleep(5)
print("Second Worker Executed")
# define/simulate heavy CPU-bound task
def heavy_load():
while True:
print('Heavy_load started')
i = 0
for i in range(50000000):
f = math.sqrt(i)*math.sqrt(i)
print('Heavy_load finished')
time.sleep(4)
def main():
# Create a process pool (for CPU bound tasks).
processpool = concurrent.futures.ProcessPoolExecutor()
#  Create a thread pool (for I/O bound tasks).
threadpool = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
try:
# Add all tasks. (Correct use?)
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Loop will be ended")
loop.close()    
if __name__ == '__main__':
main()

大多数任务/事件不是时间关键型的。控制器输出可以使用"最新"传感器数据。另一方面,某些任务会在一段时间内激活中继。我想知道如何对这些任务进行编程,而不会在 CO2 阀门打开的时间段(例如)期间(例如)另一个"耗时"的任务阻塞处理器。这对我的环境可能是灾难性的。

请允许我强调,Python不是一种实时语言,asyncio也不是实时组件。它们既没有实时执行的基础设施(Python是垃圾回收的,通常在分时系统上运行),也没有在实践中在这样的环境中进行测试。因此,我强烈建议不要在任何失误可能对您的环境造成灾难性后果的情况下使用它们。

这样一来,您的代码就存在一个问题:虽然heavy_load计算不会阻塞事件循环,但它也永远不会完成,也不会提供有关其进度的信息。run_in_executor背后的想法是,您正在运行的计算最终将停止,并且事件循环将希望收到有关它的通知。run_in_executor的惯用法可能如下所示:

def do_heavy_calc(param):
print('Heavy_load started')
f = 0
for i in range(50000000):
f += math.sqrt(i)*math.sqrt(i)
return f
def heavy_calc(param):
loop = asyncio.get_event_loop()
return loop.run_in_executor(processpool, do_heavy_calc)

表达式heavy_calc(...)不仅在不阻塞事件循环的情况下运行,而且还是可等待的。这意味着异步代码可以等待其结果,也不会阻塞其他协程:

async def sum_params(p1, p2):
s1 = await heavy_calc(p1)
s2 = await heavy_calc(p2)
return s1 + s2

上面依次运行两个计算。它也可以并行完成:

async def sum_params_parallel(p1, p2):
s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
return s1 + s2

另一件可以改进的事情是设置代码:

asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()

调用asyncio.ensure_future然后从不等待结果在某种程度上是一种异步反模式。由未等待的任务引发的异常会被默默吞噬,这几乎肯定不是您想要的。有时人们只是忘记写await,这就是为什么 asyncio 在循环被破坏时抱怨未等待的待处理任务。

良好的编码习惯是安排某人等待每个任务,无论是立即与await还是gather将其与其他任务相结合,或者在以后。例如,如果任务需要在后台运行,则可以将其存储在某个位置,并在应用程序生命周期结束时await或取消它。在您的情况下,我会将gatherloop.run_until_complete结合起来:

everything = asyncio.gather(firstTask(), secondTask(),
loop.run_in_executor(processpool, heavy_load))
loop.run_until_complete(everything)

某些事件属于"安全"类别,应在触发时立即运行(故障安全传感器、紧急按钮)。

那么我强烈建议您不要依赖软件来实现此功能。切断电源的紧急停止按钮是通常做这种事情的方式。如果你有软件这样做,并且你真的有一个对生命构成威胁的情况需要处理,那么你将面临一大堆灾难 - 几乎可以肯定的是,你必须遵守大量的法规。

相关内容

  • 没有找到相关文章

最新更新