使用 asyncio 在后台运行 bleak(python 库)



我想使用Python中黯淡的库从低功耗蓝牙设备接收数据。这部分正在工作。我的问题是,我现在不知道如何在后台或并行运行此代码。

最终,我想构建一个处理来自蓝牙设备的数据的小型python应用程序。所以 bleak 一直在循环从蓝牙设备获取数据并将其发送到处理和显示数据的主进程。

出于某种原因,bleak 不会在线程中运行。是否可以为此使用 asyncio(因为它已经被 bleak 使用,也许是一种好方法(?

我检查了线程和多处理,但不知何故,我只找到了没有无限循环和发送数据的进程的示例。我对并行化和/或异步过程的主题完全陌生。也许你们中的一个人可以提示在哪里寻找适合这种情况的适当解决方案。

以下是我到目前为止的代码(现在我只是循环和打印数据(。

from bleak import BleakClient
import json
import time
current_index = 0
time_array = [0] * 20

def TicTocGenerator():
# Generator that returns time differences
ti = 0           # initial time
tf = time.time() # final time
while True:
ti = tf
tf = time.time()
yield tf-ti # returns the time difference
TicToc = TicTocGenerator() # create an instance of the TicTocGen generator
# This will be the main function through which we define both tic() and toc()
def toc(tempBool=True):
# Prints the time difference yielded by generator instance TicToc
tempTimeInterval = next(TicToc)
global current_index
if tempBool:
#print( "Elapsed time: %f seconds.n" %tempTimeInterval )
time_array[current_index] = tempTimeInterval
if current_index == 19:
current_index = 0
else:
current_index += 1

def tic():
# Records a time in TicToc, marks the beginning of a time interval
toc(False)

def Average(lst): 
return sum(lst) / len(lst) 

#address = "30:ae:a4:5d:bc:ba"
address = "CCA9907B-10EA-411E-9816-A5E247DCA0C7"
MODEL_NBR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"

async def run(address, loop):
async with BleakClient(address, loop=loop) as client:
while True: 
tic()
model_number = await client.read_gatt_char(MODEL_NBR_UUID)
toc()
json_payload=json.loads(model_number)
print()
print(json_payload)
print("Temp    [°C]: "+"{:.2f}".format(json_payload["Temp"]))
print("Volt     [V]: "+"{:.2f}".format(json_payload["Volt"]))
print("AngX:         "+str(json_payload["AngX"]))
print("AngY:         "+str(json_payload["AngY"]))
print("AngZ:         "+str(json_payload["AngZ"]))
#print("Millis: {0}".format("".join(map(chr, model_number))))
print("Average [ms]: {:.1f}".format(Average(time_array)*1000))

loop = asyncio.get_event_loop()
loop.run_until_complete(run(address, loop))

我必须为在多个BLE设备上自动执行FOUTA的应用程序制作GUI,所以我的解决方案是将bleak循环放在单独的线程中,以便能够在主线程中使用tkinter主循环。您需要使用 asyncio.run_coroutine_threadsafe 从主线程计划新任务。

from threading import Thread
import tkinter as tk
from Bleak import BleakScanner
async def scan():
device = await BleakScanner.discover()
for device in devices:
print(device)
def startScan():
# call startScan() from main thread
asyncio.run_coroutine_threadsafe(scan(), loop)
if __name__ == "__main__":
window = tk.Tk()
# ...
loop = asyncio.get_event_loop()
def bleak_thread(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
t = Thread(target=bleak_thread, args=(loop,))
t.start()
window.mainloop()
loop.call_soon_threadsafe(loop.stop)

相关内容

  • 没有找到相关文章

最新更新